You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2020/06/22 16:09:10 UTC

[druid] branch master updated: minor rework of topn algorithm selection for clarity and more javadocs (#10058)

This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new eee99ff  minor rework of topn algorithm selection for clarity and more javadocs (#10058)
eee99ff is described below

commit eee99ff0d5b365b4568c75c64985ab730c2fbdca
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Jun 22 09:08:50 2020 -0700

    minor rework of topn algorithm selection for clarity and more javadocs (#10058)
    
    * minor refactor of topn engine algorithm selection for clarity
    
    * adjust
    
    * more javadoc
---
 .../topn/AggregateTopNMetricFirstAlgorithm.java    |  7 +++
 .../druid/query/topn/PooledTopNAlgorithm.java      | 12 +++++
 .../apache/druid/query/topn/TopNQueryEngine.java   | 63 ++++++++++++++--------
 3 files changed, 60 insertions(+), 22 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java
index 270ec16..ab5eef2 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java
@@ -36,6 +36,13 @@ import java.util.Iterator;
 import java.util.List;
 
 /**
+ * This {@link TopNAlgorithm} is tailored to processing aggregates on high cardility columns which are likely to have
+ * larger result sets. Internally it uses a 2 phase approach to compute the top-n result using the
+ * {@link PooledTopNAlgorithm} for each phase. The first phase is to process the segment with only the order-by
+ * aggregator to compute which values constitute the top 'n' results. With this information, a actual result set
+ * is computed by a second run of the {@link PooledTopNAlgorithm}, this time with all aggregators, but only considering
+ * the values from the 'n' results to avoid performing any aggregations that would have been thrown away for results
+ * that didn't make the top-n.
  */
 public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], TopNParams>
 {
diff --git a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
index 7cd9340..19ebb54 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
@@ -36,6 +36,7 @@ import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.FilteredOffset;
 import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.data.IndexedInts;
 import org.apache.druid.segment.data.Offset;
 import org.apache.druid.segment.historical.HistoricalColumnSelector;
@@ -49,7 +50,18 @@ import java.util.Arrays;
 import java.util.List;
 
 /**
+ * This {@link TopNAlgorithm} is highly specialized for processing aggregates on string columns that are
+ * {@link ColumnCapabilities#isDictionaryEncoded()} and {@link ColumnCapabilities#areDictionaryValuesUnique()}. This
+ * algorithm is built around using a direct {@link ByteBuffer} from the 'processing pool' of intermediary results
+ * buffers, to aggregate using the dictionary id directly as the key, to defer looking up the value until is necessary.
  *
+ * At runtime, this implementation is specialized with wizardry to optimize for processing common top-n query shapes,
+ * see {@link #computeSpecializedScanAndAggregateImplementations},
+ * {@link Generic1AggPooledTopNScanner} and {@link Generic1AggPooledTopNScannerPrototype},
+ * {@link Generic2AggPooledTopNScanner} and {@link Generic2AggPooledTopNScannerPrototype},
+ * {@link org.apache.druid.query.monomorphicprocessing.CalledFromHotLoop},
+ * {@link org.apache.druid.query.monomorphicprocessing.HotLoopCallee},
+ * {@link org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector} for more details.
  */
 public class PooledTopNAlgorithm
     extends BaseTopNAlgorithm<int[], BufferAggregator[], PooledTopNAlgorithm.PooledTopNParams>
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
index f1eab7f..eb22dc9 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
@@ -53,6 +53,12 @@ public class TopNQueryEngine
     this.bufferPool = bufferPool;
   }
 
+  /**
+   * Do the thing - process a {@link StorageAdapter} into a {@link Sequence} of {@link TopNResultValue}, with one of the
+   * fine {@link TopNAlgorithm} available chosen based on the type of column being aggregated. The algorithm provides a
+   * mapping function to process rows from the adapter {@link org.apache.druid.segment.Cursor} to apply
+   * {@link AggregatorFactory} and create or update {@link TopNResultValue}
+   */
   public Sequence<Result<TopNResultValue>> query(
       final TopNQuery query,
       final StorageAdapter adapter,
@@ -71,7 +77,9 @@ public class TopNQueryEngine
     final TopNMapFn mapFn = getMapFn(query, adapter, queryMetrics);
 
     Preconditions.checkArgument(
-        queryIntervals.size() == 1, "Can only handle a single interval, got[%s]", queryIntervals
+        queryIntervals.size() == 1,
+        "Can only handle a single interval, got[%s]",
+        queryIntervals
     );
 
     return Sequences.filter(
@@ -95,6 +103,9 @@ public class TopNQueryEngine
     );
   }
 
+  /**
+   * Choose the best {@link TopNAlgorithm} for the given query.
+   */
   private TopNMapFn getMapFn(
       final TopNQuery query,
       final StorageAdapter adapter,
@@ -120,22 +131,8 @@ public class TopNQueryEngine
 
 
     final TopNAlgorithm<?, ?> topNAlgorithm;
-    if (requiresHeapAlgorithm(selector, query, columnCapabilities)) {
-      // heap based algorithm selection
-      if (selector.isHasExtractionFn() && dimension.equals(ColumnHolder.TIME_COLUMN_NAME)) {
-        // TimeExtractionTopNAlgorithm can work on any single-value dimension of type long.
-        // We might be able to use this for any long column with an extraction function, that is
-        //  ValueType.LONG.equals(columnCapabilities.getType())
-        // but this needs investigation to ensure that it is an improvement over HeapBasedTopNAlgorithm
-
-        // A special TimeExtractionTopNAlgorithm is required since DimExtractionTopNAlgorithm
-        // currently relies on the dimension cardinality to support lexicographic sorting
-        topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
-      } else {
-        topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
-      }
-    } else {
-      // pool based algorithm selection
+    if (canUsePooledAlgorithm(selector, query, columnCapabilities)) {
+      // pool based algorithm selection, if we can
       if (selector.isAggregateAllMetrics()) {
         // if sorted by dimension we should aggregate all metrics in a single pass, use the regular pooled algorithm for
         // this
@@ -148,6 +145,20 @@ public class TopNQueryEngine
         // anything else, use the regular pooled algorithm
         topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
       }
+    } else {
+      // heap based algorithm selection, if we must
+      if (selector.isHasExtractionFn() && dimension.equals(ColumnHolder.TIME_COLUMN_NAME)) {
+        // TimeExtractionTopNAlgorithm can work on any single-value dimension of type long.
+        // We might be able to use this for any long column with an extraction function, that is
+        //  ValueType.LONG.equals(columnCapabilities.getType())
+        // but this needs investigation to ensure that it is an improvement over HeapBasedTopNAlgorithm
+
+        // A special TimeExtractionTopNAlgorithm is required since HeapBasedTopNAlgorithm
+        // currently relies on the dimension cardinality to support lexicographic sorting
+        topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
+      } else {
+        topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
+      }
     }
     if (queryMetrics != null) {
       queryMetrics.algorithm(topNAlgorithm);
@@ -166,7 +177,7 @@ public class TopNQueryEngine
    * (and {@link TimeExtractionTopNAlgorithm} for a specialized form for long columns) which aggregates on values of
    * selectors.
    */
-  private static boolean requiresHeapAlgorithm(
+  private static boolean canUsePooledAlgorithm(
       final TopNAlgorithmSelector selector,
       final TopNQuery query,
       final ColumnCapabilities capabilities
@@ -174,22 +185,30 @@ public class TopNQueryEngine
   {
     if (selector.isHasExtractionFn()) {
       // extraction functions can have a many to one mapping, and should use a heap algorithm
-      return true;
+      return false;
     }
 
     if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
       // non-string output cannot use the pooled algorith, even if the underlying selector supports it
-      return true;
+      return false;
     }
     if (capabilities != null && capabilities.getType() == ValueType.STRING) {
       // string columns must use the on heap algorithm unless they have the following capabilites
-      return !(capabilities.isDictionaryEncoded() && capabilities.areDictionaryValuesUnique().isTrue());
+      return capabilities.isDictionaryEncoded() && capabilities.areDictionaryValuesUnique().isTrue();
     } else {
       // non-strings are not eligible to use the pooled algorithm, and should use a heap algorithm
-      return true;
+      return false;
     }
   }
 
+  /**
+   * {@link ExtractionFn} which are one to one may have their execution deferred until as late as possible, since the
+   * which value is used as the grouping key itself doesn't particularly matter. For top-n, this method allows the
+   * query to be transformed in {@link TopNQueryQueryToolChest#preMergeQueryDecoration} to strip off the
+   * {@link ExtractionFn} on the broker, so that a more optimized algorithm (e.g. {@link PooledTopNAlgorithm}) can be
+   * chosen for processing segments, and then added back and evaluated against the final merged result sets on the
+   * broker via {@link TopNQueryQueryToolChest#postMergeQueryDecoration}.
+   */
   public static boolean canApplyExtractionInPost(TopNQuery query)
   {
     return query.getDimensionSpec() != null


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org