You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2020/06/22 16:09:10 UTC
[druid] branch master updated: minor rework of topn algorithm
selection for clarity and more javadocs (#10058)
This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new eee99ff minor rework of topn algorithm selection for clarity and more javadocs (#10058)
eee99ff is described below
commit eee99ff0d5b365b4568c75c64985ab730c2fbdca
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Jun 22 09:08:50 2020 -0700
minor rework of topn algorithm selection for clarity and more javadocs (#10058)
* minor refactor of topn engine algorithm selection for clarity
* adjust
* more javadoc
---
.../topn/AggregateTopNMetricFirstAlgorithm.java | 7 +++
.../druid/query/topn/PooledTopNAlgorithm.java | 12 +++++
.../apache/druid/query/topn/TopNQueryEngine.java | 63 ++++++++++++++--------
3 files changed, 60 insertions(+), 22 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java
index 270ec16..ab5eef2 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java
@@ -36,6 +36,13 @@ import java.util.Iterator;
import java.util.List;
/**
+ * This {@link TopNAlgorithm} is tailored to processing aggregates on high cardility columns which are likely to have
+ * larger result sets. Internally it uses a 2 phase approach to compute the top-n result using the
+ * {@link PooledTopNAlgorithm} for each phase. The first phase is to process the segment with only the order-by
+ * aggregator to compute which values constitute the top 'n' results. With this information, a actual result set
+ * is computed by a second run of the {@link PooledTopNAlgorithm}, this time with all aggregators, but only considering
+ * the values from the 'n' results to avoid performing any aggregations that would have been thrown away for results
+ * that didn't make the top-n.
*/
public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], TopNParams>
{
diff --git a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
index 7cd9340..19ebb54 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
@@ -36,6 +36,7 @@ import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.FilteredOffset;
import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.data.Offset;
import org.apache.druid.segment.historical.HistoricalColumnSelector;
@@ -49,7 +50,18 @@ import java.util.Arrays;
import java.util.List;
/**
+ * This {@link TopNAlgorithm} is highly specialized for processing aggregates on string columns that are
+ * {@link ColumnCapabilities#isDictionaryEncoded()} and {@link ColumnCapabilities#areDictionaryValuesUnique()}. This
+ * algorithm is built around using a direct {@link ByteBuffer} from the 'processing pool' of intermediary results
+ * buffers, to aggregate using the dictionary id directly as the key, to defer looking up the value until is necessary.
*
+ * At runtime, this implementation is specialized with wizardry to optimize for processing common top-n query shapes,
+ * see {@link #computeSpecializedScanAndAggregateImplementations},
+ * {@link Generic1AggPooledTopNScanner} and {@link Generic1AggPooledTopNScannerPrototype},
+ * {@link Generic2AggPooledTopNScanner} and {@link Generic2AggPooledTopNScannerPrototype},
+ * {@link org.apache.druid.query.monomorphicprocessing.CalledFromHotLoop},
+ * {@link org.apache.druid.query.monomorphicprocessing.HotLoopCallee},
+ * {@link org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector} for more details.
*/
public class PooledTopNAlgorithm
extends BaseTopNAlgorithm<int[], BufferAggregator[], PooledTopNAlgorithm.PooledTopNParams>
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
index f1eab7f..eb22dc9 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
@@ -53,6 +53,12 @@ public class TopNQueryEngine
this.bufferPool = bufferPool;
}
+ /**
+ * Do the thing - process a {@link StorageAdapter} into a {@link Sequence} of {@link TopNResultValue}, with one of the
+ * fine {@link TopNAlgorithm} available chosen based on the type of column being aggregated. The algorithm provides a
+ * mapping function to process rows from the adapter {@link org.apache.druid.segment.Cursor} to apply
+ * {@link AggregatorFactory} and create or update {@link TopNResultValue}
+ */
public Sequence<Result<TopNResultValue>> query(
final TopNQuery query,
final StorageAdapter adapter,
@@ -71,7 +77,9 @@ public class TopNQueryEngine
final TopNMapFn mapFn = getMapFn(query, adapter, queryMetrics);
Preconditions.checkArgument(
- queryIntervals.size() == 1, "Can only handle a single interval, got[%s]", queryIntervals
+ queryIntervals.size() == 1,
+ "Can only handle a single interval, got[%s]",
+ queryIntervals
);
return Sequences.filter(
@@ -95,6 +103,9 @@ public class TopNQueryEngine
);
}
+ /**
+ * Choose the best {@link TopNAlgorithm} for the given query.
+ */
private TopNMapFn getMapFn(
final TopNQuery query,
final StorageAdapter adapter,
@@ -120,22 +131,8 @@ public class TopNQueryEngine
final TopNAlgorithm<?, ?> topNAlgorithm;
- if (requiresHeapAlgorithm(selector, query, columnCapabilities)) {
- // heap based algorithm selection
- if (selector.isHasExtractionFn() && dimension.equals(ColumnHolder.TIME_COLUMN_NAME)) {
- // TimeExtractionTopNAlgorithm can work on any single-value dimension of type long.
- // We might be able to use this for any long column with an extraction function, that is
- // ValueType.LONG.equals(columnCapabilities.getType())
- // but this needs investigation to ensure that it is an improvement over HeapBasedTopNAlgorithm
-
- // A special TimeExtractionTopNAlgorithm is required since DimExtractionTopNAlgorithm
- // currently relies on the dimension cardinality to support lexicographic sorting
- topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
- } else {
- topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
- }
- } else {
- // pool based algorithm selection
+ if (canUsePooledAlgorithm(selector, query, columnCapabilities)) {
+ // pool based algorithm selection, if we can
if (selector.isAggregateAllMetrics()) {
// if sorted by dimension we should aggregate all metrics in a single pass, use the regular pooled algorithm for
// this
@@ -148,6 +145,20 @@ public class TopNQueryEngine
// anything else, use the regular pooled algorithm
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
}
+ } else {
+ // heap based algorithm selection, if we must
+ if (selector.isHasExtractionFn() && dimension.equals(ColumnHolder.TIME_COLUMN_NAME)) {
+ // TimeExtractionTopNAlgorithm can work on any single-value dimension of type long.
+ // We might be able to use this for any long column with an extraction function, that is
+ // ValueType.LONG.equals(columnCapabilities.getType())
+ // but this needs investigation to ensure that it is an improvement over HeapBasedTopNAlgorithm
+
+ // A special TimeExtractionTopNAlgorithm is required since HeapBasedTopNAlgorithm
+ // currently relies on the dimension cardinality to support lexicographic sorting
+ topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
+ } else {
+ topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
+ }
}
if (queryMetrics != null) {
queryMetrics.algorithm(topNAlgorithm);
@@ -166,7 +177,7 @@ public class TopNQueryEngine
* (and {@link TimeExtractionTopNAlgorithm} for a specialized form for long columns) which aggregates on values of
* selectors.
*/
- private static boolean requiresHeapAlgorithm(
+ private static boolean canUsePooledAlgorithm(
final TopNAlgorithmSelector selector,
final TopNQuery query,
final ColumnCapabilities capabilities
@@ -174,22 +185,30 @@ public class TopNQueryEngine
{
if (selector.isHasExtractionFn()) {
// extraction functions can have a many to one mapping, and should use a heap algorithm
- return true;
+ return false;
}
if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
// non-string output cannot use the pooled algorith, even if the underlying selector supports it
- return true;
+ return false;
}
if (capabilities != null && capabilities.getType() == ValueType.STRING) {
// string columns must use the on heap algorithm unless they have the following capabilites
- return !(capabilities.isDictionaryEncoded() && capabilities.areDictionaryValuesUnique().isTrue());
+ return capabilities.isDictionaryEncoded() && capabilities.areDictionaryValuesUnique().isTrue();
} else {
// non-strings are not eligible to use the pooled algorithm, and should use a heap algorithm
- return true;
+ return false;
}
}
+ /**
+ * {@link ExtractionFn} which are one to one may have their execution deferred until as late as possible, since the
+ * which value is used as the grouping key itself doesn't particularly matter. For top-n, this method allows the
+ * query to be transformed in {@link TopNQueryQueryToolChest#preMergeQueryDecoration} to strip off the
+ * {@link ExtractionFn} on the broker, so that a more optimized algorithm (e.g. {@link PooledTopNAlgorithm}) can be
+ * chosen for processing segments, and then added back and evaluated against the final merged result sets on the
+ * broker via {@link TopNQueryQueryToolChest#postMergeQueryDecoration}.
+ */
public static boolean canApplyExtractionInPost(TopNQuery query)
{
return query.getDimensionSpec() != null
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org