You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2020/01/19 21:25:57 UTC

[druid] branch 0.17.0 updated: fix topn aggregation on numeric columns with null values (#9183) (#9219)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch 0.17.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.17.0 by this push:
     new 5d309ff  fix topn aggregation on numeric columns with null values (#9183) (#9219)
5d309ff is described below

commit 5d309ff4e947b75811436b5971d75382f7f445d3
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Sun Jan 19 13:25:42 2020 -0800

    fix topn aggregation on numeric columns with null values (#9183) (#9219)
    
    * fix topn issue with aggregating on numeric columns with null values
    
    * adjustments
    
    * rename
    
    * add more tests
    
    * fix comments
    
    * more javadocs
    
    * computeIfAbsent
---
 .../apache/druid/query/topn/BaseTopNAlgorithm.java |   3 +-
 ...NAlgorithm.java => HeapBasedTopNAlgorithm.java} |  57 ++---
 .../query/topn/TimeExtractionTopNAlgorithm.java    |  18 +-
 .../org/apache/druid/query/topn/TopNAlgorithm.java |   4 +-
 .../org/apache/druid/query/topn/TopNMapFn.java     |  15 +-
 .../org/apache/druid/query/topn/TopNParams.java    |   9 +-
 .../apache/druid/query/topn/TopNQueryEngine.java   |  31 ++-
 .../types/DoubleTopNColumnAggregatesProcessor.java |  75 ++++++
 .../types/FloatTopNColumnAggregatesProcessor.java  |  75 ++++++
 .../types/LongTopNColumnAggregatesProcessor.java   |  71 ++++++
 ...llableNumericTopNColumnAggregatesProcessor.java | 156 ++++++++++++
 .../types/NumericTopNColumnSelectorStrategy.java   | 277 ---------------------
 ...va => StringTopNColumnAggregatesProcessor.java} |  99 ++++----
 ...egy.java => TopNColumnAggregatesProcessor.java} |  83 +++---
 .../TopNColumnAggregatesProcessorFactory.java      |  82 ++++++
 .../types/TopNColumnSelectorStrategyFactory.java   |  71 ------
 .../druid/segment/DimensionHandlerUtils.java       |  43 ++--
 .../org/apache/druid/segment/VirtualColumns.java   |  28 ++-
 .../druid/query/topn/TopNQueryRunnerTest.java      | 126 ++++++++--
 .../apache/druid/sql/calcite/CalciteQueryTest.java | 120 +++++++++
 20 files changed, 895 insertions(+), 548 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
index bb49c35..3a1fdbd 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
@@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
 import org.apache.druid.segment.Capabilities;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.DimensionSelector;
@@ -79,7 +78,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
       @Nullable TopNQueryMetrics queryMetrics
   )
   {
-    if (params.getCardinality() != TopNColumnSelectorStrategy.CARDINALITY_UNKNOWN) {
+    if (params.getCardinality() != TopNParams.CARDINALITY_UNKNOWN) {
       runWithCardinalityKnown(params, resultBuilder, dimValSelector, queryMetrics);
     } else {
       runWithCardinalityUnknown(params, resultBuilder, queryMetrics);
diff --git a/processing/src/main/java/org/apache/druid/query/topn/DimExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
similarity index 57%
rename from processing/src/main/java/org/apache/druid/query/topn/DimExtractionTopNAlgorithm.java
rename to processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
index 8f3941b..87f7956 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/DimExtractionTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
@@ -21,21 +21,22 @@ package org.apache.druid.query.topn;
 
 import org.apache.druid.query.ColumnSelectorPlus;
 import org.apache.druid.query.aggregation.Aggregator;
-import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
+import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.StorageAdapter;
 
-import java.util.Map;
-
 /**
- * This has to be its own strategy because the pooled topn algorithm assumes each index is unique, and cannot handle multiple index numerals referencing the same dimension value.
+ * Heap based topn algorithm that handles aggregates on dimension extractions and numeric typed dimension columns.
+ *
+ * This has to be its own strategy because the pooled topn algorithm assumes each index is unique, and cannot handle
+ * multiple index numerals referencing the same dimension value.
  */
-public class DimExtractionTopNAlgorithm
-    extends BaseTopNAlgorithm<Aggregator[][], Map<Comparable, Aggregator[]>, TopNParams>
+public class HeapBasedTopNAlgorithm
+    extends BaseTopNAlgorithm<Aggregator[][], TopNColumnAggregatesProcessor, TopNParams>
 {
   private final TopNQuery query;
 
-  public DimExtractionTopNAlgorithm(
+  public HeapBasedTopNAlgorithm(
       StorageAdapter storageAdapter,
       TopNQuery query
   )
@@ -47,7 +48,7 @@ public class DimExtractionTopNAlgorithm
 
   @Override
   public TopNParams makeInitParams(
-      final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus,
+      final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus,
       final Cursor cursor
   )
   {
@@ -64,8 +65,8 @@ public class DimExtractionTopNAlgorithm
     if (params.getCardinality() < 0) {
       throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
     }
-    ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
-    return selectorPlus.getColumnSelectorStrategy().getDimExtractionRowSelector(query, params, storageAdapter);
+    ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus = params.getSelectorPlus();
+    return selectorPlus.getColumnSelectorStrategy().getRowSelector(query, params, storageAdapter);
   }
 
   @Override
@@ -75,54 +76,46 @@ public class DimExtractionTopNAlgorithm
   }
 
   @Override
-  protected Map<Comparable, Aggregator[]> makeDimValAggregateStore(TopNParams params)
+  protected TopNColumnAggregatesProcessor makeDimValAggregateStore(TopNParams params)
   {
-    final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
-    return selectorPlus.getColumnSelectorStrategy().makeDimExtractionAggregateStore();
+    final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus = params.getSelectorPlus();
+    return selectorPlus.getColumnSelectorStrategy();
   }
 
   @Override
-  public long scanAndAggregate(
+  protected long scanAndAggregate(
       TopNParams params,
       Aggregator[][] rowSelector,
-      Map<Comparable, Aggregator[]> aggregatesStore
+      TopNColumnAggregatesProcessor processor
   )
   {
     final Cursor cursor = params.getCursor();
-    final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
+    final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus = params.getSelectorPlus();
 
-    return selectorPlus.getColumnSelectorStrategy().dimExtractionScanAndAggregate(
+    processor.initAggregateStore();
+    return processor.scanAndAggregate(
         query,
         selectorPlus.getSelector(),
         cursor,
-        rowSelector,
-        aggregatesStore
+        rowSelector
     );
   }
 
   @Override
   protected void updateResults(
       TopNParams params,
-      Aggregator[][] rowSelector,
-      Map<Comparable, Aggregator[]> aggregatesStore,
+      Aggregator[][] aggregators,
+      TopNColumnAggregatesProcessor processor,
       TopNResultBuilder resultBuilder
   )
   {
-    final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
-    selectorPlus.getColumnSelectorStrategy().updateDimExtractionResults(
-        aggregatesStore,
-        resultBuilder
-    );
+    processor.updateResults(resultBuilder);
   }
 
   @Override
-  protected void closeAggregators(Map<Comparable, Aggregator[]> valueMap)
+  protected void closeAggregators(TopNColumnAggregatesProcessor processor)
   {
-    for (Aggregator[] aggregators : valueMap.values()) {
-      for (Aggregator agg : aggregators) {
-        agg.close();
-      }
-    }
+    processor.closeAggregators();
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
index 7d4c23c..a23518a 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
@@ -27,10 +27,11 @@ import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.column.ValueType;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Function;
 
-public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Comparable, Aggregator[]>, TopNParams>
+public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Comparable<?>, Aggregator[]>, TopNParams>
 {
   private static final int[] EMPTY_INTS = new int[]{};
 
@@ -74,17 +75,16 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Co
   }
 
   @Override
-  @SuppressWarnings("unchecked")
-  protected Map<Comparable, Aggregator[]> makeDimValAggregateStore(TopNParams params)
+  protected Map<Comparable<?>, Aggregator[]> makeDimValAggregateStore(TopNParams params)
   {
-    return params.getSelectorPlus().getColumnSelectorStrategy().makeDimExtractionAggregateStore();
+    return new HashMap<>();
   }
 
   @Override
   protected long scanAndAggregate(
       TopNParams params,
       int[] dimValSelector,
-      Map<Comparable, Aggregator[]> aggregatesStore
+      Map<Comparable<?>, Aggregator[]> aggregatesStore
   )
   {
     if (params.getCardinality() < 0) {
@@ -96,7 +96,7 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Co
 
     long processedRows = 0;
     while (!cursor.isDone()) {
-      final Comparable key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));
+      final Comparable<?> key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));
 
       Aggregator[] theAggregators = aggregatesStore.get(key);
       if (theAggregators == null) {
@@ -118,11 +118,11 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Co
   protected void updateResults(
       TopNParams params,
       int[] dimValSelector,
-      Map<Comparable, Aggregator[]> aggregatesStore,
+      Map<Comparable<?>, Aggregator[]> aggregatesStore,
       TopNResultBuilder resultBuilder
   )
   {
-    for (Map.Entry<Comparable, Aggregator[]> entry : aggregatesStore.entrySet()) {
+    for (Map.Entry<Comparable<?>, Aggregator[]> entry : aggregatesStore.entrySet()) {
       Aggregator[] aggs = entry.getValue();
       if (aggs != null) {
         Object[] vals = new Object[aggs.length];
@@ -140,7 +140,7 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Co
   }
 
   @Override
-  protected void closeAggregators(Map<Comparable, Aggregator[]> stringMap)
+  protected void closeAggregators(Map<Comparable<?>, Aggregator[]> stringMap)
   {
     for (Aggregator[] aggregators : stringMap.values()) {
       for (Aggregator agg : aggregators) {
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java
index 0f7a527..31b4d92 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java
@@ -21,7 +21,7 @@ package org.apache.druid.query.topn;
 
 import org.apache.druid.query.ColumnSelectorPlus;
 import org.apache.druid.query.aggregation.Aggregator;
-import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
+import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
 import org.apache.druid.segment.Cursor;
 
 import javax.annotation.Nullable;
@@ -34,7 +34,7 @@ public interface TopNAlgorithm<DimValSelector, Parameters extends TopNParams>
   int INIT_POSITION_VALUE = -1;
   int SKIP_POSITION_VALUE = -2;
 
-  TopNParams makeInitParams(ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus, Cursor cursor);
+  TopNParams makeInitParams(ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus, Cursor cursor);
 
   void run(
       Parameters params,
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java b/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java
index af12047..96fb62f 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java
@@ -21,7 +21,8 @@ package org.apache.druid.query.topn;
 
 import org.apache.druid.query.ColumnSelectorPlus;
 import org.apache.druid.query.Result;
-import org.apache.druid.query.topn.types.TopNColumnSelectorStrategyFactory;
+import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
+import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessorFactory;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.DimensionHandlerUtils;
 
@@ -42,13 +43,15 @@ public class TopNMapFn
   }
 
   @SuppressWarnings("unchecked")
+  @Nullable
   public Result<TopNResultValue> apply(final Cursor cursor, final @Nullable TopNQueryMetrics queryMetrics)
   {
-    final ColumnSelectorPlus selectorPlus = DimensionHandlerUtils.createColumnSelectorPlus(
-        new TopNColumnSelectorStrategyFactory(query.getDimensionSpec().getOutputType()),
-        query.getDimensionSpec(),
-        cursor.getColumnSelectorFactory()
-    );
+    final ColumnSelectorPlus<TopNColumnAggregatesProcessor<?>> selectorPlus =
+        DimensionHandlerUtils.createColumnSelectorPlus(
+            new TopNColumnAggregatesProcessorFactory(query.getDimensionSpec().getOutputType()),
+            query.getDimensionSpec(),
+            cursor.getColumnSelectorFactory()
+        );
 
     if (selectorPlus.getSelector() == null) {
       return null;
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java b/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java
index a80e1bd..cdc541f 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java
@@ -20,7 +20,7 @@
 package org.apache.druid.query.topn;
 
 import org.apache.druid.query.ColumnSelectorPlus;
-import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
+import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.DimensionSelector;
 
@@ -28,13 +28,14 @@ import org.apache.druid.segment.DimensionSelector;
  */
 public class TopNParams
 {
+  public static final int CARDINALITY_UNKNOWN = -1;
   private final Cursor cursor;
   private final int cardinality;
   private final int numValuesPerPass;
-  private final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus;
+  private final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus;
 
   protected TopNParams(
-      ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus,
+      ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus,
       Cursor cursor,
       int numValuesPerPass
   )
@@ -52,7 +53,7 @@ public class TopNParams
     return (DimensionSelector) selectorPlus.getSelector();
   }
 
-  public ColumnSelectorPlus<TopNColumnSelectorStrategy> getSelectorPlus()
+  public ColumnSelectorPlus<TopNColumnAggregatesProcessor> getSelectorPlus()
   {
     return selectorPlus;
   }
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
index 0b2fb48..dfe1809 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.query.topn;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import org.apache.druid.collections.NonBlockingPool;
@@ -30,7 +29,6 @@ import org.apache.druid.query.Result;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.filter.Filter;
-import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.SegmentMissingException;
 import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.column.ColumnCapabilities;
@@ -86,16 +84,11 @@ public class TopNQueryEngine
                 query.isDescending(),
                 queryMetrics
             ),
-            new Function<Cursor, Result<TopNResultValue>>()
-            {
-              @Override
-              public Result<TopNResultValue> apply(Cursor input)
-              {
-                if (queryMetrics != null) {
-                  queryMetrics.cursor(input);
-                }
-                return mapFn.apply(input, queryMetrics);
+            input -> {
+              if (queryMetrics != null) {
+                queryMetrics.cursor(input);
               }
+              return mapFn.apply(input, queryMetrics);
             }
         ),
         Predicates.notNull()
@@ -125,7 +118,8 @@ public class TopNQueryEngine
     final ColumnCapabilities columnCapabilities = query.getVirtualColumns()
                                                        .getColumnCapabilitiesWithFallback(adapter, dimension);
 
-    final TopNAlgorithm topNAlgorithm;
+
+    final TopNAlgorithm<?, ?> topNAlgorithm;
     if (
         selector.isHasExtractionFn() &&
         // TimeExtractionTopNAlgorithm can work on any single-value dimension of type long.
@@ -137,20 +131,23 @@ public class TopNQueryEngine
       // currently relies on the dimension cardinality to support lexicographic sorting
       topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
     } else if (selector.isHasExtractionFn()) {
-      topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
+      topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
     } else if (columnCapabilities != null && !(columnCapabilities.getType() == ValueType.STRING
                                                && columnCapabilities.isDictionaryEncoded())) {
-      // Use DimExtraction for non-Strings and for non-dictionary-encoded Strings.
-      topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
+      // Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings.
+      topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
     } else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
-      // Use DimExtraction when the dimension output type is a non-String. (It's like an extractionFn: there can be
+      // Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be
       // a many-to-one mapping, since numeric types can't represent all possible values of other types.)
-      topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
+      topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
     } else if (selector.isAggregateAllMetrics()) {
+      // sorted by dimension
       topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
     } else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
+      // high cardinality dimensions with larger result sets
       topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool);
     } else {
+      // anything else
       topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
     }
     if (queryMetrics != null) {
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java
new file mode 100644
index 0000000..2a2781d
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.topn.types;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.topn.BaseTopNAlgorithm;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+
+import java.util.Map;
+import java.util.function.Function;
+
+public class DoubleTopNColumnAggregatesProcessor
+    extends NullableNumericTopNColumnAggregatesProcessor<BaseDoubleColumnValueSelector>
+{
+  private Long2ObjectMap<Aggregator[]> aggregatesStore;
+
+  protected DoubleTopNColumnAggregatesProcessor(Function<Object, Comparable<?>> converter)
+  {
+    super(converter);
+  }
+
+  @Override
+  Aggregator[] getValueAggregators(
+      TopNQuery query,
+      BaseDoubleColumnValueSelector selector,
+      Cursor cursor
+  )
+  {
+    long key = Double.doubleToLongBits(selector.getDouble());
+    return aggregatesStore.computeIfAbsent(
+        key,
+        k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
+    );
+  }
+
+  @Override
+  public void initAggregateStore()
+  {
+    nullValueAggregates = null;
+    aggregatesStore = new Long2ObjectOpenHashMap<>();
+  }
+
+  @Override
+  Map<?, Aggregator[]> getAggregatesStore()
+  {
+    return aggregatesStore;
+  }
+
+  @Override
+  Comparable<?> convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
+  {
+    return converter.apply(Double.longBitsToDouble((Long) aggregatorStoreKey));
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java
new file mode 100644
index 0000000..4e8dd88
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.topn.types;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.topn.BaseTopNAlgorithm;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.segment.BaseFloatColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+
+import java.util.Map;
+import java.util.function.Function;
+
+public class FloatTopNColumnAggregatesProcessor
+    extends NullableNumericTopNColumnAggregatesProcessor<BaseFloatColumnValueSelector>
+{
+  private Int2ObjectMap<Aggregator[]> aggregatesStore;
+
+  protected FloatTopNColumnAggregatesProcessor(Function<Object, Comparable<?>> converter)
+  {
+    super(converter);
+  }
+
+  @Override
+  Aggregator[] getValueAggregators(
+      TopNQuery query,
+      BaseFloatColumnValueSelector selector,
+      Cursor cursor
+  )
+  {
+    int key = Float.floatToIntBits(selector.getFloat());
+    return aggregatesStore.computeIfAbsent(
+        key,
+        k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
+    );
+  }
+
+  @Override
+  public void initAggregateStore()
+  {
+    nullValueAggregates = null;
+    this.aggregatesStore = new Int2ObjectOpenHashMap<>();
+  }
+
+  @Override
+  Map<?, Aggregator[]> getAggregatesStore()
+  {
+    return aggregatesStore;
+  }
+
+  @Override
+  Comparable<?> convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
+  {
+    return converter.apply(Float.intBitsToFloat((Integer) aggregatorStoreKey));
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java
new file mode 100644
index 0000000..d28d8a8
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.topn.types;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.topn.BaseTopNAlgorithm;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+
+import java.util.Map;
+import java.util.function.Function;
+
+public class LongTopNColumnAggregatesProcessor
+    extends NullableNumericTopNColumnAggregatesProcessor<BaseLongColumnValueSelector>
+{
+  private Long2ObjectMap<Aggregator[]> aggregatesStore;
+
+  public LongTopNColumnAggregatesProcessor(Function<Object, Comparable<?>> converter)
+  {
+    super(converter);
+  }
+
+  @Override
+  Aggregator[] getValueAggregators(TopNQuery query, BaseLongColumnValueSelector selector, Cursor cursor)
+  {
+    long key = selector.getLong();
+    return aggregatesStore.computeIfAbsent(
+        key,
+        k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
+    );
+  }
+
+  @Override
+  public void initAggregateStore()
+  {
+    nullValueAggregates = null;
+    aggregatesStore = new Long2ObjectOpenHashMap<>();
+  }
+
+  @Override
+  Map<?, Aggregator[]> getAggregatesStore()
+  {
+    return aggregatesStore;
+  }
+
+  @Override
+  Comparable<?> convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
+  {
+    return converter.apply(aggregatorStoreKey);
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java
new file mode 100644
index 0000000..0b8e90c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.topn.types;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.topn.BaseTopNAlgorithm;
+import org.apache.druid.query.topn.TopNParams;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.query.topn.TopNResultBuilder;
+import org.apache.druid.segment.BaseNullableColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.StorageAdapter;
+
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Base {@link TopNColumnAggregatesProcessor} for {@link BaseNullableColumnValueSelector}. Non-null selector values
+ * aggregates are stored in a type appropriate primitive map, created by {@link #initAggregateStore()} and available
+ * via {@link #getAggregatesStore()}, and null valued row aggregates are stored in a separate
+ * {@link #nullValueAggregates} {@link Aggregator} array.
+ *
+ * {@link #updateResults} will combine both the map and null aggregates to populate the {@link TopNResultBuilder} with
+ * the values produced by {@link #scanAndAggregate}.
+ */
+public abstract class NullableNumericTopNColumnAggregatesProcessor<Selector extends BaseNullableColumnValueSelector>
+    implements TopNColumnAggregatesProcessor<Selector>
+{
+  private final boolean hasNulls = !NullHandling.replaceWithDefault();
+  final Function<Object, Comparable<?>> converter;
+  Aggregator[] nullValueAggregates;
+
+  protected NullableNumericTopNColumnAggregatesProcessor(Function<Object, Comparable<?>> converter)
+  {
+    this.converter = converter;
+  }
+
+  /**
+   * Get {@link Aggregator} set for the current {@param Selector} row value for a given {@link Cursor}
+   */
+  abstract Aggregator[] getValueAggregators(TopNQuery query, Selector selector, Cursor cursor);
+
+  /**
+   * Get primitive numeric map for value aggregates created by {@link #scanAndAggregate}, to be used by
+   * {@link #updateResults} to apply to the {@link TopNResultBuilder}
+   */
+  abstract Map<?, Aggregator[]> getAggregatesStore();
+
+  /**
+   * Method to convert primitive numeric value keys used by {@link #getAggregatesStore} into the correct representation
+   * for the {@link TopNResultBuilder}, called by {@link #updateResults}
+   */
+  abstract Comparable<?> convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey);
+
+  @Override
+  public int getCardinality(Selector selector)
+  {
+    return TopNParams.CARDINALITY_UNKNOWN;
+  }
+
+  @Override
+  public Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter)
+  {
+    return null;
+  }
+
+  @Override
+  public long scanAndAggregate(
+      TopNQuery query,
+      Selector selector,
+      Cursor cursor,
+      Aggregator[][] rowSelector
+  )
+  {
+    long processedRows = 0;
+    while (!cursor.isDone()) {
+      if (hasNulls && selector.isNull()) {
+        if (nullValueAggregates == null) {
+          nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
+        }
+        for (Aggregator aggregator : nullValueAggregates) {
+          aggregator.aggregate();
+        }
+      } else {
+        Aggregator[] valueAggregates = getValueAggregators(query, selector, cursor);
+        for (Aggregator aggregator : valueAggregates) {
+          aggregator.aggregate();
+        }
+      }
+      cursor.advance();
+      processedRows++;
+    }
+    return processedRows;
+  }
+
+
+  @Override
+  public void updateResults(TopNResultBuilder resultBuilder)
+  {
+    for (Map.Entry<?, Aggregator[]> entry : getAggregatesStore().entrySet()) {
+      Aggregator[] aggs = entry.getValue();
+      if (aggs != null) {
+        Object[] vals = new Object[aggs.length];
+        for (int i = 0; i < aggs.length; i++) {
+          vals[i] = aggs[i].get();
+        }
+
+        final Comparable<?> key = convertAggregatorStoreKeyToColumnValue(entry.getKey());
+        resultBuilder.addEntry(key, key, vals);
+      }
+    }
+
+    if (nullValueAggregates != null) {
+      Object[] nullVals = new Object[nullValueAggregates.length];
+      for (int i = 0; i < nullValueAggregates.length; i++) {
+        nullVals[i] = nullValueAggregates[i].get();
+      }
+
+      resultBuilder.addEntry(null, null, nullVals);
+    }
+  }
+
+  @Override
+  public void closeAggregators()
+  {
+    for (Aggregator[] aggregators : getAggregatesStore().values()) {
+      for (Aggregator agg : aggregators) {
+        agg.close();
+      }
+    }
+
+    if (nullValueAggregates != null) {
+      for (Aggregator nullAgg : nullValueAggregates) {
+        nullAgg.close();
+      }
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java
deleted file mode 100644
index e27baf4..0000000
--- a/processing/src/main/java/org/apache/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.topn.types;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.query.aggregation.Aggregator;
-import org.apache.druid.query.topn.BaseTopNAlgorithm;
-import org.apache.druid.query.topn.TopNParams;
-import org.apache.druid.query.topn.TopNQuery;
-import org.apache.druid.query.topn.TopNResultBuilder;
-import org.apache.druid.segment.BaseDoubleColumnValueSelector;
-import org.apache.druid.segment.BaseFloatColumnValueSelector;
-import org.apache.druid.segment.BaseLongColumnValueSelector;
-import org.apache.druid.segment.Cursor;
-import org.apache.druid.segment.DimensionHandlerUtils;
-import org.apache.druid.segment.StorageAdapter;
-import org.apache.druid.segment.column.ValueType;
-
-import java.util.Map;
-import java.util.function.Function;
-
-public abstract class NumericTopNColumnSelectorStrategy<
-    ValueSelectorType,
-    DimExtractionAggregateStoreType extends Map<?, Aggregator[]>>
-    implements TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggregateStoreType>
-{
-  public static TopNColumnSelectorStrategy ofType(final ValueType selectorType, final ValueType dimensionType)
-  {
-    final Function<Object, Comparable<?>> converter = DimensionHandlerUtils.converterFromTypeToType(
-        selectorType,
-        dimensionType
-    );
-
-    switch (selectorType) {
-      case LONG:
-        return new OfLong(converter);
-      case FLOAT:
-        return new OfFloat(converter);
-      case DOUBLE:
-        return new OfDouble(converter);
-      default:
-        throw new IAE("No strategy for type[%s]", selectorType);
-    }
-  }
-
-  @Override
-  public int getCardinality(ValueSelectorType selector)
-  {
-    return TopNColumnSelectorStrategy.CARDINALITY_UNKNOWN;
-  }
-
-  @Override
-  public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter)
-  {
-    return null;
-  }
-
-  static long floatDimExtractionScanAndAggregate(
-      TopNQuery query,
-      BaseFloatColumnValueSelector selector,
-      Cursor cursor,
-      Int2ObjectMap<Aggregator[]> aggregatesStore
-  )
-  {
-    long processedRows = 0;
-    while (!cursor.isDone()) {
-      int key = Float.floatToIntBits(selector.getFloat());
-      Aggregator[] theAggregators = aggregatesStore.get(key);
-      if (theAggregators == null) {
-        theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
-        aggregatesStore.put(key, theAggregators);
-      }
-      for (Aggregator aggregator : theAggregators) {
-        aggregator.aggregate();
-      }
-      cursor.advance();
-      processedRows++;
-    }
-    return processedRows;
-  }
-
-  static long doubleDimExtractionScanAndAggregate(
-      TopNQuery query,
-      BaseDoubleColumnValueSelector selector,
-      Cursor cursor,
-      Long2ObjectMap<Aggregator[]> aggregatesStore
-  )
-  {
-    long processedRows = 0;
-    while (!cursor.isDone()) {
-      long key = Double.doubleToLongBits(selector.getDouble());
-      Aggregator[] theAggregators = aggregatesStore.get(key);
-      if (theAggregators == null) {
-        theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
-        aggregatesStore.put(key, theAggregators);
-      }
-      for (Aggregator aggregator : theAggregators) {
-        aggregator.aggregate();
-      }
-      cursor.advance();
-      processedRows++;
-    }
-    return processedRows;
-  }
-
-  static long longDimExtractionScanAndAggregate(
-      TopNQuery query,
-      BaseLongColumnValueSelector selector,
-      Cursor cursor,
-      Long2ObjectMap<Aggregator[]> aggregatesStore
-  )
-  {
-    long processedRows = 0;
-    while (!cursor.isDone()) {
-      long key = selector.getLong();
-      Aggregator[] theAggregators = aggregatesStore.get(key);
-      if (theAggregators == null) {
-        theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
-        aggregatesStore.put(key, theAggregators);
-      }
-      for (Aggregator aggregator : theAggregators) {
-        aggregator.aggregate();
-      }
-      cursor.advance();
-      processedRows++;
-    }
-    return processedRows;
-  }
-
-  @Override
-  public void updateDimExtractionResults(
-      final DimExtractionAggregateStoreType aggregatesStore,
-      final TopNResultBuilder resultBuilder
-  )
-  {
-    for (Map.Entry<?, Aggregator[]> entry : aggregatesStore.entrySet()) {
-      Aggregator[] aggs = entry.getValue();
-      if (aggs != null) {
-        Object[] vals = new Object[aggs.length];
-        for (int i = 0; i < aggs.length; i++) {
-          vals[i] = aggs[i].get();
-        }
-
-        final Comparable key = convertAggregatorStoreKeyToColumnValue(entry.getKey());
-        resultBuilder.addEntry(key, key, vals);
-      }
-    }
-  }
-
-  abstract Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey);
-
-  static class OfFloat
-      extends NumericTopNColumnSelectorStrategy<BaseFloatColumnValueSelector, Int2ObjectMap<Aggregator[]>>
-  {
-    private final Function<Object, Comparable<?>> converter;
-
-    OfFloat(final Function<Object, Comparable<?>> converter)
-    {
-      this.converter = converter;
-    }
-
-    @Override
-    public Int2ObjectMap<Aggregator[]> makeDimExtractionAggregateStore()
-    {
-      return new Int2ObjectOpenHashMap<>();
-    }
-
-    @Override
-    Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
-    {
-      return converter.apply(Float.intBitsToFloat((Integer) aggregatorStoreKey));
-    }
-
-    @Override
-    public long dimExtractionScanAndAggregate(
-        TopNQuery query,
-        BaseFloatColumnValueSelector selector,
-        Cursor cursor,
-        Aggregator[][] rowSelector,
-        Int2ObjectMap<Aggregator[]> aggregatesStore
-    )
-    {
-      return floatDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore);
-    }
-  }
-
-  static class OfLong
-      extends NumericTopNColumnSelectorStrategy<BaseLongColumnValueSelector, Long2ObjectMap<Aggregator[]>>
-  {
-    private final Function<Object, Comparable<?>> converter;
-
-    OfLong(final Function<Object, Comparable<?>> converter)
-    {
-      this.converter = converter;
-    }
-
-    @Override
-    public Long2ObjectMap<Aggregator[]> makeDimExtractionAggregateStore()
-    {
-      return new Long2ObjectOpenHashMap<>();
-    }
-
-    @Override
-    Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
-    {
-      return converter.apply(aggregatorStoreKey);
-    }
-
-    @Override
-    public long dimExtractionScanAndAggregate(
-        TopNQuery query,
-        BaseLongColumnValueSelector selector,
-        Cursor cursor,
-        Aggregator[][] rowSelector,
-        Long2ObjectMap<Aggregator[]> aggregatesStore
-    )
-    {
-      return longDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore);
-    }
-  }
-
-  static class OfDouble
-      extends NumericTopNColumnSelectorStrategy<BaseDoubleColumnValueSelector, Long2ObjectMap<Aggregator[]>>
-  {
-    private final Function<Object, Comparable<?>> converter;
-
-    OfDouble(final Function<Object, Comparable<?>> converter)
-    {
-      this.converter = converter;
-    }
-
-    @Override
-    public Long2ObjectMap<Aggregator[]> makeDimExtractionAggregateStore()
-    {
-      return new Long2ObjectOpenHashMap<>();
-    }
-
-    @Override
-    Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
-    {
-      return converter.apply(Double.longBitsToDouble((Long) aggregatorStoreKey));
-    }
-
-    @Override
-    public long dimExtractionScanAndAggregate(
-        TopNQuery query,
-        BaseDoubleColumnValueSelector selector,
-        Cursor cursor,
-        Aggregator[][] rowSelector,
-        Long2ObjectMap<Aggregator[]> aggregatesStore
-    )
-    {
-      return doubleDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore);
-    }
-  }
-}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
similarity index 66%
rename from processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnSelectorStrategy.java
rename to processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
index f5e838d..cea7c77 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnSelectorStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
@@ -36,12 +36,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Function;
 
-public class StringTopNColumnSelectorStrategy
-    implements TopNColumnSelectorStrategy<DimensionSelector, Map<Comparable<?>, Aggregator[]>>
+public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregatesProcessor<DimensionSelector>
 {
   private final Function<Object, Comparable<?>> dimensionValueConverter;
+  private HashMap<Comparable<?>, Aggregator[]> aggregatesStore;
 
-  public StringTopNColumnSelectorStrategy(final ValueType dimensionType)
+  public StringTopNColumnAggregatesProcessor(final ValueType dimensionType)
   {
     this.dimensionValueConverter = DimensionHandlerUtils.converterFromTypeToType(ValueType.STRING, dimensionType);
   }
@@ -53,7 +53,7 @@ public class StringTopNColumnSelectorStrategy
   }
 
   @Override
-  public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter)
+  public Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter)
   {
     if (params.getCardinality() < 0) {
       throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
@@ -74,53 +74,58 @@ public class StringTopNColumnSelectorStrategy
   }
 
   @Override
-  public Map<Comparable<?>, Aggregator[]> makeDimExtractionAggregateStore()
+  public void updateResults(TopNResultBuilder resultBuilder)
   {
-    return new HashMap<>();
+    for (Map.Entry<?, Aggregator[]> entry : aggregatesStore.entrySet()) {
+      Aggregator[] aggs = entry.getValue();
+      if (aggs != null) {
+        Object[] vals = new Object[aggs.length];
+        for (int i = 0; i < aggs.length; i++) {
+          vals[i] = aggs[i].get();
+        }
+
+        final Comparable<?> key = dimensionValueConverter.apply(entry.getKey());
+        resultBuilder.addEntry(key, key, vals);
+      }
+    }
+  }
+
+  @Override
+  public void closeAggregators()
+  {
+    for (Aggregator[] aggregators : aggregatesStore.values()) {
+      for (Aggregator agg : aggregators) {
+        agg.close();
+      }
+    }
   }
 
   @Override
-  public long dimExtractionScanAndAggregate(
+  public long scanAndAggregate(
       TopNQuery query,
       DimensionSelector selector,
       Cursor cursor,
-      Aggregator[][] rowSelector,
-      Map<Comparable<?>, Aggregator[]> aggregatesStore
+      Aggregator[][] rowSelector
   )
   {
     if (selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN) {
-      return dimExtractionScanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector, aggregatesStore);
+      return scanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector);
     } else {
-      return dimExtractionScanAndAggregateWithCardinalityUnknown(query, cursor, selector, aggregatesStore);
+      return scanAndAggregateWithCardinalityUnknown(query, cursor, selector);
     }
   }
 
   @Override
-  public void updateDimExtractionResults(
-      final Map<Comparable<?>, Aggregator[]> aggregatesStore,
-      final TopNResultBuilder resultBuilder
-  )
+  public void initAggregateStore()
   {
-    for (Map.Entry<Comparable<?>, Aggregator[]> entry : aggregatesStore.entrySet()) {
-      Aggregator[] aggs = entry.getValue();
-      if (aggs != null) {
-        Object[] vals = new Object[aggs.length];
-        for (int i = 0; i < aggs.length; i++) {
-          vals[i] = aggs[i].get();
-        }
-
-        final Comparable<?> key = dimensionValueConverter.apply(entry.getKey());
-        resultBuilder.addEntry(key, key, vals);
-      }
-    }
+    this.aggregatesStore = new HashMap<>();
   }
 
-  private long dimExtractionScanAndAggregateWithCardinalityKnown(
+  private long scanAndAggregateWithCardinalityKnown(
       TopNQuery query,
       Cursor cursor,
       DimensionSelector selector,
-      Aggregator[][] rowSelector,
-      Map<Comparable<?>, Aggregator[]> aggregatesStore
+      Aggregator[][] rowSelector
   )
   {
     long processedRows = 0;
@@ -128,18 +133,17 @@ public class StringTopNColumnSelectorStrategy
       final IndexedInts dimValues = selector.getRow();
       for (int i = 0, size = dimValues.size(); i < size; ++i) {
         final int dimIndex = dimValues.get(i);
-        Aggregator[] theAggregators = rowSelector[dimIndex];
-        if (theAggregators == null) {
+        Aggregator[] aggs = rowSelector[dimIndex];
+        if (aggs == null) {
           final Comparable<?> key = dimensionValueConverter.apply(selector.lookupName(dimIndex));
-          theAggregators = aggregatesStore.get(key);
-          if (theAggregators == null) {
-            theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
-            aggregatesStore.put(key, theAggregators);
-          }
-          rowSelector[dimIndex] = theAggregators;
+          aggs = aggregatesStore.computeIfAbsent(
+              key,
+              k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
+          );
+          rowSelector[dimIndex] = aggs;
         }
 
-        for (Aggregator aggregator : theAggregators) {
+        for (Aggregator aggregator : aggs) {
           aggregator.aggregate();
         }
       }
@@ -149,11 +153,10 @@ public class StringTopNColumnSelectorStrategy
     return processedRows;
   }
 
-  private long dimExtractionScanAndAggregateWithCardinalityUnknown(
+  private long scanAndAggregateWithCardinalityUnknown(
       TopNQuery query,
       Cursor cursor,
-      DimensionSelector selector,
-      Map<Comparable<?>, Aggregator[]> aggregatesStore
+      DimensionSelector selector
   )
   {
     long processedRows = 0;
@@ -162,13 +165,11 @@ public class StringTopNColumnSelectorStrategy
       for (int i = 0, size = dimValues.size(); i < size; ++i) {
         final int dimIndex = dimValues.get(i);
         final Comparable<?> key = dimensionValueConverter.apply(selector.lookupName(dimIndex));
-
-        Aggregator[] theAggregators = aggregatesStore.get(key);
-        if (theAggregators == null) {
-          theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
-          aggregatesStore.put(key, theAggregators);
-        }
-        for (Aggregator aggregator : theAggregators) {
+        Aggregator[] aggs = aggregatesStore.computeIfAbsent(
+            key,
+            k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
+        );
+        for (Aggregator aggregator : aggs) {
           aggregator.aggregate();
         }
       }
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java
similarity index 51%
rename from processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategy.java
rename to processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java
index 464189d..ac5b21f 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java
@@ -21,28 +21,39 @@ package org.apache.druid.query.topn.types;
 
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.query.dimension.ColumnSelectorStrategy;
+import org.apache.druid.query.topn.HeapBasedTopNAlgorithm;
 import org.apache.druid.query.topn.TopNParams;
 import org.apache.druid.query.topn.TopNQuery;
 import org.apache.druid.query.topn.TopNResultBuilder;
+import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.StorageAdapter;
 
-import java.util.Map;
+import javax.annotation.Nullable;
 
-public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggregateStoreType extends Map>
-    extends ColumnSelectorStrategy
+/**
+ * This {@link ColumnSelectorStrategy} is used by all {@link org.apache.druid.query.topn.TopNAlgorithm} to provide
+ * selector value cardinality to {@link TopNParams} (perhaps unecessarily, but that is another matter), but is primarily
+ * used by {@link HeapBasedTopNAlgorithm} to serve as its value aggregates store.
+ *
+ * Given a query, column value selector, and cursor to process, the aggregates store is populated by calling
+ * {@link #scanAndAggregate} and can be applied to {@link TopNResultBuilder} through {@link #updateResults}.
+ */
+public interface TopNColumnAggregatesProcessor<ValueSelectorType> extends ColumnSelectorStrategy
 {
-  int CARDINALITY_UNKNOWN = -1;
-
+  /**
+   * Get value cardinality of underlying {@link ColumnValueSelector}
+   */
   int getCardinality(ValueSelectorType selector);
 
   /**
-   * Used by DimExtractionTopNAlgorithm.
+   * Used by {@link HeapBasedTopNAlgorithm}.
    *
-   * Create an Aggregator[][] using BaseTopNAlgorithm.AggregatorArrayProvider and the given parameters.
+   * Create an Aggregator[][] using {@link org.apache.druid.query.topn.BaseTopNAlgorithm.AggregatorArrayProvider} and
+   * the given parameters.
    *
    * As the Aggregator[][] is used as an integer-based lookup, this method is only applicable for dimension types
-   * that use integer row values.
+   * that use integer row values, e.g. string columns where the value cardinality is known.
    *
    * A dimension type that does not have integer values should return null.
    *
@@ -53,30 +64,24 @@ public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggr
    *
    * @return an Aggregator[][] for integer-valued dimensions, null otherwise
    */
-  Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter);
+  @Nullable
+  Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter);
 
   /**
-   * Used by DimExtractionTopNAlgorithm.
+   * Used by {@link HeapBasedTopNAlgorithm}. The contract of this method requires calling {@link #initAggregateStore()}
+   * prior to calling this method.
    *
-   * Creates an aggregate store map suitable for this strategy's type that will be
-   * passed to dimExtractionScanAndAggregate() and updateDimExtractionResults().
-   *
-   * @return Aggregate store map
-   */
-  DimExtractionAggregateStoreType makeDimExtractionAggregateStore();
-
-  /**
-   * Used by DimExtractionTopNAlgorithm.
+   * Iterate through the {@link Cursor}, reading the current row from a dimension value selector, and for each row
+   * value:
+   *  1. Retrieve the Aggregator[] for the row value from rowSelector (fast integer lookup), usable if value cardinality
+   *     is known, or from aggregatesStore (slower map).
    *
-   * Iterate through the cursor, reading the current row from a dimension value selector, and for each row value:
-   * 1. Retrieve the Aggregator[] for the row value from rowSelector (fast integer lookup) or from
-   * aggregatesStore (slower map).
+   *  2. If the rowSelector/aggregatesStore did not have an entry for a particular row value, this function
+   *     should retrieve the current Aggregator[] using
+   *     {@link org.apache.druid.query.topn.BaseTopNAlgorithm#makeAggregators} and the provided cursor and query,
+   *     storing them in rowSelector/aggregatesStore
    *
-   * 2. If the rowSelector and/or aggregatesStore did not have an entry for a particular row value,
-   * this function should retrieve the current Aggregator[] using BaseTopNAlgorithm.makeAggregators() and the
-   * provided cursor and query, storing them in rowSelector and aggregatesStore
-   *
-   * 3. Call aggregate() on each of the aggregators.
+   * 3. Call {@link Aggregator#aggregate()} on each of the aggregators.
    *
    * If a dimension type doesn't have integer values, it should ignore rowSelector and use the aggregatesStore map only.
    *
@@ -84,29 +89,33 @@ public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggr
    * @param selector        Dimension value selector
    * @param cursor          Cursor for the segment being queried
    * @param rowSelector     Integer lookup containing aggregators
-   * @param aggregatesStore Map containing aggregators
    *
    * @return the number of processed rows (after postFilters are applied inside the cursor being processed)
    */
-  long dimExtractionScanAndAggregate(
+  long scanAndAggregate(
       TopNQuery query,
       ValueSelectorType selector,
       Cursor cursor,
-      Aggregator[][] rowSelector,
-      DimExtractionAggregateStoreType aggregatesStore
+      Aggregator[][] rowSelector
   );
 
   /**
-   * Used by DimExtractionTopNAlgorithm.
+   * Used by {@link HeapBasedTopNAlgorithm}.
    *
    * Read entries from the aggregates store, adding the keys and associated values to the resultBuilder, applying the
    * valueTransformer to the keys if present
    *
-   * @param aggregatesStore Map created by makeDimExtractionAggregateStore()
    * @param resultBuilder   TopN result builder
    */
-  void updateDimExtractionResults(
-      DimExtractionAggregateStoreType aggregatesStore,
-      TopNResultBuilder resultBuilder
-  );
+  void updateResults(TopNResultBuilder resultBuilder);
+
+  /**
+   * Initializes the underlying aggregates store to something nice and seleector type appropriate
+   */
+  void initAggregateStore();
+
+  /**
+   * Closes all on heap {@link Aggregator} associated withe the aggregates processor
+   */
+  void closeAggregators();
 }
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java
new file mode 100644
index 0000000..56a2943
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.topn.types;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ValueType;
+
+import java.util.function.Function;
+
+public class TopNColumnAggregatesProcessorFactory
+    implements ColumnSelectorStrategyFactory<TopNColumnAggregatesProcessor<?>>
+{
+  private final ValueType dimensionType;
+
+  public TopNColumnAggregatesProcessorFactory(final ValueType dimensionType)
+  {
+    this.dimensionType = Preconditions.checkNotNull(dimensionType, "dimensionType");
+  }
+
+  @Override
+  public TopNColumnAggregatesProcessor<?> makeColumnSelectorStrategy(
+      ColumnCapabilities capabilities,
+      ColumnValueSelector selector
+  )
+  {
+    final ValueType selectorType = capabilities.getType();
+
+    if (selectorType.equals(ValueType.STRING)) {
+      return new StringTopNColumnAggregatesProcessor(dimensionType);
+    } else if (selectorType.isNumeric()) {
+      final Function<Object, Comparable<?>> converter;
+      final ValueType strategyType;
+      // When the selector is numeric, we want to use NumericTopNColumnSelectorStrategy. It aggregates using
+      // a numeric type and then converts to the desired output type after aggregating. We must be careful not to
+      // convert to an output type that cannot represent all possible values of the input type.
+      if (ValueType.isNumeric(dimensionType)) {
+        // Return strategy that aggregates using the _output_ type, because this allows us to collapse values
+        // properly (numeric types cannot always represent all values of other numeric types).
+        converter = DimensionHandlerUtils.converterFromTypeToType(dimensionType, dimensionType);
+        strategyType = dimensionType;
+      } else {
+        // Return strategy that aggregates using the _input_ type. Here we are assuming that the output type can
+        // represent all possible values of the input type. This will be true for STRING, which is the only
+        // non-numeric type currently supported.
+        converter = DimensionHandlerUtils.converterFromTypeToType(selectorType, dimensionType);
+        strategyType = selectorType;
+      }
+      switch (strategyType) {
+        case LONG:
+          return new LongTopNColumnAggregatesProcessor(converter);
+        case FLOAT:
+          return new FloatTopNColumnAggregatesProcessor(converter);
+        case DOUBLE:
+          return new DoubleTopNColumnAggregatesProcessor(converter);
+      }
+    }
+
+    throw new IAE("Cannot create query type helper from invalid type [%s]", selectorType);
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java
deleted file mode 100644
index 8cc820d..0000000
--- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.topn.types;
-
-import com.google.common.base.Preconditions;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.column.ColumnCapabilities;
-import org.apache.druid.segment.column.ValueType;
-
-public class TopNColumnSelectorStrategyFactory implements ColumnSelectorStrategyFactory<TopNColumnSelectorStrategy>
-{
-  private final ValueType dimensionType;
-
-  public TopNColumnSelectorStrategyFactory(final ValueType dimensionType)
-  {
-    this.dimensionType = Preconditions.checkNotNull(dimensionType, "dimensionType");
-  }
-
-  @Override
-  public TopNColumnSelectorStrategy makeColumnSelectorStrategy(
-      ColumnCapabilities capabilities,
-      ColumnValueSelector selector
-  )
-  {
-    final ValueType selectorType = capabilities.getType();
-
-    switch (selectorType) {
-      case STRING:
-        // Return strategy that reads strings and outputs dimensionTypes.
-        return new StringTopNColumnSelectorStrategy(dimensionType);
-      case LONG:
-      case FLOAT:
-      case DOUBLE:
-        // When the selector is numeric, we want to use NumericTopNColumnSelectorStrategy. It aggregates using
-        // a numeric type and then converts to the desired output type after aggregating. We must be careful not to
-        // convert to an output type that cannot represent all possible values of the input type.
-
-        if (ValueType.isNumeric(dimensionType)) {
-          // Return strategy that aggregates using the _output_ type, because this allows us to collapse values
-          // properly (numeric types cannot always represent all values of other numeric types).
-          return NumericTopNColumnSelectorStrategy.ofType(dimensionType, dimensionType);
-        } else {
-          // Return strategy that aggregates using the _input_ type. Here we are assuming that the output type can
-          // represent all possible values of the input type. This will be true for STRING, which is the only
-          // non-numeric type currently supported.
-          return NumericTopNColumnSelectorStrategy.ofType(selectorType, dimensionType);
-        }
-      default:
-        throw new IAE("Cannot create query type helper from invalid type [%s]", selectorType);
-    }
-  }
-}
diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java
index e9396c5..c8a49e7 100644
--- a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java
@@ -64,9 +64,9 @@ public final class DimensionHandlerUtils
                                   .setDictionaryEncoded(true)
                                   .setHasBitmapIndexes(true);
 
-  public static DimensionHandler getHandlerFromCapabilities(
+  public static DimensionHandler<?, ?, ?> getHandlerFromCapabilities(
       String dimensionName,
-      ColumnCapabilities capabilities,
+      @Nullable ColumnCapabilities capabilities,
       @Nullable MultiValueHandling multiValueHandling
   )
   {
@@ -113,15 +113,15 @@ public final class DimensionHandlerUtils
    * {@link #createColumnSelectorPluses(ColumnSelectorStrategyFactory, List, ColumnSelectorFactory)} with a singleton
    * list of dimensionSpecs and then retrieving the only element in the returned array.
    *
-   * @param <ColumnSelectorStrategyClass> The strategy type created by the provided strategy factory.
+   * @param <Strategy> The strategy type created by the provided strategy factory.
    * @param strategyFactory               A factory provided by query engines that generates type-handling strategies
    * @param dimensionSpec                 column to generate a ColumnSelectorPlus object for
    * @param cursor                        Used to create value selectors for columns.
    *
    * @return A ColumnSelectorPlus object
    */
-  public static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy> ColumnSelectorPlus<ColumnSelectorStrategyClass> createColumnSelectorPlus(
-      ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass> strategyFactory,
+  public static <Strategy extends ColumnSelectorStrategy> ColumnSelectorPlus<Strategy> createColumnSelectorPlus(
+      ColumnSelectorStrategyFactory<Strategy> strategyFactory,
       DimensionSpec dimensionSpec,
       ColumnSelectorFactory cursor
   )
@@ -140,39 +140,36 @@ public final class DimensionHandlerUtils
    * A caller should define a strategy factory that provides an interface for type-specific operations
    * in a query engine. See GroupByStrategyFactory for a reference.
    *
-   * @param <ColumnSelectorStrategyClass> The strategy type created by the provided strategy factory.
+   * @param <Strategy>                    The strategy type created by the provided strategy factory.
    * @param strategyFactory               A factory provided by query engines that generates type-handling strategies
    * @param dimensionSpecs                The set of columns to generate ColumnSelectorPlus objects for
    * @param columnSelectorFactory         Used to create value selectors for columns.
    *
    * @return An array of ColumnSelectorPlus objects, in the order of the columns specified in dimensionSpecs
    */
-  public static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy>
-  //CHECKSTYLE.OFF: Indentation
-  ColumnSelectorPlus<ColumnSelectorStrategyClass>[] createColumnSelectorPluses(
-      //CHECKSTYLE.ON: Indentation
-      ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass> strategyFactory,
+  public static <Strategy extends ColumnSelectorStrategy> ColumnSelectorPlus<Strategy>[] createColumnSelectorPluses(
+      ColumnSelectorStrategyFactory<Strategy> strategyFactory,
       List<DimensionSpec> dimensionSpecs,
       ColumnSelectorFactory columnSelectorFactory
   )
   {
     int dimCount = dimensionSpecs.size();
     @SuppressWarnings("unchecked")
-    ColumnSelectorPlus<ColumnSelectorStrategyClass>[] dims = new ColumnSelectorPlus[dimCount];
+    ColumnSelectorPlus<Strategy>[] dims = new ColumnSelectorPlus[dimCount];
     for (int i = 0; i < dimCount; i++) {
       final DimensionSpec dimSpec = dimensionSpecs.get(i);
       final String dimName = dimSpec.getDimension();
-      final ColumnValueSelector selector = getColumnValueSelectorFromDimensionSpec(
+      final ColumnValueSelector<?> selector = getColumnValueSelectorFromDimensionSpec(
           dimSpec,
           columnSelectorFactory
       );
-      ColumnSelectorStrategyClass strategy = makeStrategy(
+      Strategy strategy = makeStrategy(
           strategyFactory,
           dimSpec,
           columnSelectorFactory.getColumnCapabilities(dimSpec.getDimension()),
           selector
       );
-      final ColumnSelectorPlus<ColumnSelectorStrategyClass> selectorPlus = new ColumnSelectorPlus<>(
+      final ColumnSelectorPlus<Strategy> selectorPlus = new ColumnSelectorPlus<>(
           dimName,
           dimSpec.getOutputName(),
           strategy,
@@ -183,7 +180,7 @@ public final class DimensionHandlerUtils
     return dims;
   }
 
-  private static ColumnValueSelector getColumnValueSelectorFromDimensionSpec(
+  private static ColumnValueSelector<?> getColumnValueSelectorFromDimensionSpec(
       DimensionSpec dimSpec,
       ColumnSelectorFactory columnSelectorFactory
   )
@@ -191,12 +188,10 @@ public final class DimensionHandlerUtils
     String dimName = dimSpec.getDimension();
     ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(dimName);
     capabilities = getEffectiveCapabilities(dimSpec, capabilities);
-    switch (capabilities.getType()) {
-      case STRING:
-        return columnSelectorFactory.makeDimensionSelector(dimSpec);
-      default:
-        return columnSelectorFactory.makeColumnValueSelector(dimSpec.getDimension());
+    if (capabilities.getType() == ValueType.STRING) {
+      return columnSelectorFactory.makeDimensionSelector(dimSpec);
     }
+    return columnSelectorFactory.makeColumnValueSelector(dimSpec.getDimension());
   }
 
   /**
@@ -235,11 +230,11 @@ public final class DimensionHandlerUtils
     return capabilities;
   }
 
-  private static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy> ColumnSelectorStrategyClass makeStrategy(
-      ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass> strategyFactory,
+  private static <Strategy extends ColumnSelectorStrategy> Strategy makeStrategy(
+      ColumnSelectorStrategyFactory<Strategy> strategyFactory,
       DimensionSpec dimSpec,
       @Nullable ColumnCapabilities capabilities,
-      ColumnValueSelector selector
+      ColumnValueSelector<?> selector
   )
   {
     capabilities = getEffectiveCapabilities(dimSpec, capabilities);
diff --git a/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java b/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java
index 9802aad..7a8d7d4 100644
--- a/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java
+++ b/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java
@@ -141,6 +141,7 @@ public class VirtualColumns implements Cacheable
     return getVirtualColumn(columnName) != null;
   }
 
+  @Nullable
   public VirtualColumn getVirtualColumn(String columnName)
   {
     final VirtualColumn vc = withoutDotSupport.get(columnName);
@@ -180,11 +181,19 @@ public class VirtualColumns implements Cacheable
     if (virtualColumn == null) {
       throw new IAE("No such virtual column[%s]", columnName);
     } else {
-      return virtualColumn.capabilities(columnName).hasBitmapIndexes() ? virtualColumn.getBitmapIndex(columnName, columnSelector) : null;
+      return virtualColumn.capabilities(columnName).hasBitmapIndexes() ? virtualColumn.getBitmapIndex(
+          columnName,
+          columnSelector
+      ) : null;
     }
   }
 
-  public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec, ColumnSelector columnSelector, ReadableOffset offset)
+  @Nullable
+  public DimensionSelector makeDimensionSelector(
+      DimensionSpec dimensionSpec,
+      ColumnSelector columnSelector,
+      ReadableOffset offset
+  )
   {
     final VirtualColumn virtualColumn = getVirtualColumn(dimensionSpec.getDimension());
     if (virtualColumn == null) {
@@ -194,7 +203,12 @@ public class VirtualColumns implements Cacheable
     }
   }
 
-  public ColumnValueSelector<?> makeColumnValueSelector(String columnName, ColumnSelector columnSelector, ReadableOffset offset)
+  @Nullable
+  public ColumnValueSelector<?> makeColumnValueSelector(
+      String columnName,
+      ColumnSelector columnSelector,
+      ReadableOffset offset
+  )
   {
     final VirtualColumn virtualColumn = getVirtualColumn(columnName);
     if (virtualColumn == null) {
@@ -226,6 +240,7 @@ public class VirtualColumns implements Cacheable
     }
   }
 
+  @Nullable
   public ColumnCapabilities getColumnCapabilities(String columnName)
   {
     final VirtualColumn virtualColumn = getVirtualColumn(columnName);
@@ -240,6 +255,7 @@ public class VirtualColumns implements Cacheable
     }
   }
 
+  @Nullable
   public ColumnCapabilities getColumnCapabilitiesWithFallback(StorageAdapter adapter, String columnName)
   {
     final ColumnCapabilities virtualColumnCapabilities = getColumnCapabilities(columnName);
@@ -264,7 +280,11 @@ public class VirtualColumns implements Cacheable
 
   public ColumnSelectorFactory wrap(final ColumnSelectorFactory baseFactory)
   {
-    return new VirtualizedColumnSelectorFactory(baseFactory, this);
+    if (virtualColumns.isEmpty()) {
+      return baseFactory;
+    } else {
+      return new VirtualizedColumnSelectorFactory(baseFactory, this);
+    }
   }
 
   @Override
diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
index 0ba5860..7340da5 100644
--- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
@@ -100,6 +100,7 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -5832,9 +5833,9 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
                         .put("index_alias", 147L)
                         .put("longNumericNull", 10L)
                         .build(),
-                    makeNumericNullRowHelper("index_alias", 114L, "longNumericNull", NullHandling.defaultLongValue()),
-                    makeNumericNullRowHelper("index_alias", 126L, "longNumericNull", NullHandling.defaultLongValue()),
-                    makeNumericNullRowHelper("index_alias", 166L, "longNumericNull", NullHandling.defaultLongValue())
+                    makeRowWithNulls("index_alias", 114L, "longNumericNull", NullHandling.defaultLongValue()),
+                    makeRowWithNulls("index_alias", 126L, "longNumericNull", NullHandling.defaultLongValue()),
+                    makeRowWithNulls("index_alias", 166L, "longNumericNull", NullHandling.defaultLongValue())
                 )
             )
         )
@@ -5900,9 +5901,9 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
                         .put("index_alias", 147L)
                         .put("floatNumericNull", 10f)
                         .build(),
-                    makeNumericNullRowHelper("index_alias", 114L, "floatNumericNull", NullHandling.defaultFloatValue()),
-                    makeNumericNullRowHelper("index_alias", 126L, "floatNumericNull", NullHandling.defaultFloatValue()),
-                    makeNumericNullRowHelper("index_alias", 166L, "floatNumericNull", NullHandling.defaultFloatValue())
+                    makeRowWithNulls("index_alias", 114L, "floatNumericNull", NullHandling.defaultFloatValue()),
+                    makeRowWithNulls("index_alias", 126L, "floatNumericNull", NullHandling.defaultFloatValue()),
+                    makeRowWithNulls("index_alias", 166L, "floatNumericNull", NullHandling.defaultFloatValue())
                 )
             )
         )
@@ -5968,9 +5969,9 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
                         .put("index_alias", 147L)
                         .put("doubleNumericNull", 10d)
                         .build(),
-                    makeNumericNullRowHelper("index_alias", 114L, "doubleNumericNull", NullHandling.defaultDoubleValue()),
-                    makeNumericNullRowHelper("index_alias", 126L, "doubleNumericNull", NullHandling.defaultDoubleValue()),
-                    makeNumericNullRowHelper("index_alias", 166L, "doubleNumericNull", NullHandling.defaultDoubleValue())
+                    makeRowWithNulls("index_alias", 114L, "doubleNumericNull", NullHandling.defaultDoubleValue()),
+                    makeRowWithNulls("index_alias", 126L, "doubleNumericNull", NullHandling.defaultDoubleValue()),
+                    makeRowWithNulls("index_alias", 166L, "doubleNumericNull", NullHandling.defaultDoubleValue())
                 )
             )
         )
@@ -5978,16 +5979,113 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
     assertExpectedResults(expectedResults, query);
   }
 
-  private static Map<String, Object> makeNumericNullRowHelper(
+
+  @Test
+  public void testAggregateOnLongNumericNull()
+  {
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .granularity(QueryRunnerTestHelper.ALL_GRAN)
+        .dimension(new DefaultDimensionSpec("longNumericNull", "dim", ValueType.LONG))
+        .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+        .threshold(10000)
+        .aggregators(new CountAggregatorFactory("count"))
+        .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+        new Result<>(
+            DateTimes.of("2011-01-12T00:00:00.000Z"),
+            new TopNResultValue(
+                Arrays.asList(
+                    makeRowWithNulls("dim", NullHandling.defaultLongValue(), "count", 279L),
+                    makeRowWithNulls("dim", 10L, "count", 93L),
+                    makeRowWithNulls("dim", 20L, "count", 93L),
+                    makeRowWithNulls("dim", 40L, "count", 93L),
+                    makeRowWithNulls("dim", 50L, "count", 279L),
+                    makeRowWithNulls("dim", 70L, "count", 279L),
+                    makeRowWithNulls("dim", 80L, "count", 93L)
+                )
+            )
+        )
+    );
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testAggregateOnDoubleNumericNull()
+  {
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .granularity(QueryRunnerTestHelper.ALL_GRAN)
+        .dimension(new DefaultDimensionSpec("doubleNumericNull", "dim", ValueType.DOUBLE))
+        .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+        .threshold(10000)
+        .aggregators(new CountAggregatorFactory("count"))
+        .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+        new Result<>(
+            DateTimes.of("2011-01-12T00:00:00.000Z"),
+            new TopNResultValue(
+                Arrays.asList(
+                    makeRowWithNulls("dim", NullHandling.defaultDoubleValue(), "count", 279L),
+                    makeRowWithNulls("dim", 10.0, "count", 93L),
+                    makeRowWithNulls("dim", 20.0, "count", 93L),
+                    makeRowWithNulls("dim", 40.0, "count", 93L),
+                    makeRowWithNulls("dim", 50.0, "count", 279L),
+                    makeRowWithNulls("dim", 70.0, "count", 279L),
+                    makeRowWithNulls("dim", 80.0, "count", 93L)
+                )
+            )
+        )
+    );
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testAggregateOnFloatNumericNull()
+  {
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .granularity(QueryRunnerTestHelper.ALL_GRAN)
+        .dimension(new DefaultDimensionSpec("floatNumericNull", "dim", ValueType.FLOAT))
+        .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+        .threshold(10000)
+        .aggregators(new CountAggregatorFactory("count"))
+        .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+        new Result<>(
+            DateTimes.of("2011-01-12T00:00:00.000Z"),
+            new TopNResultValue(
+                Arrays.asList(
+                    makeRowWithNulls("dim", NullHandling.defaultFloatValue(), "count", 279L),
+                    makeRowWithNulls("dim", 10.0f, "count", 93L),
+                    makeRowWithNulls("dim", 20.0f, "count", 93L),
+                    makeRowWithNulls("dim", 40.0f, "count", 93L),
+                    makeRowWithNulls("dim", 50.0f, "count", 279L),
+                    makeRowWithNulls("dim", 70.0f, "count", 279L),
+                    makeRowWithNulls("dim", 80.0f, "count", 93L)
+                )
+            )
+        )
+    );
+    assertExpectedResults(expectedResults, query);
+  }
+
+  private static Map<String, Object> makeRowWithNulls(
       String dimName,
-      Object dimValue,
-      String nameOfColumnWithNull,
-      Object defaultNullValue
+      @Nullable Object dimValue,
+      String metric,
+      @Nullable Object metricVal
   )
   {
     Map<String, Object> nullRow = new HashMap<>();
     nullRow.put(dimName, dimValue);
-    nullRow.put(nameOfColumnWithNull, defaultNullValue);
+    nullRow.put(metric, metricVal);
     return nullRow;
   }
 }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 8fbb1ad..1cc2161 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -2481,6 +2481,126 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
   }
 
   @Test
+  public void testNullDoubleTopN() throws Exception
+  {
+    List<Object[]> expected;
+    if (useDefault) {
+      expected = ImmutableList.of(
+          new Object[]{1.7, 1L},
+          new Object[]{1.0, 1L},
+          new Object[]{0.0, 4L}
+      );
+    } else {
+      expected = ImmutableList.of(
+          new Object[]{null, 3L},
+          new Object[]{1.7, 1L},
+          new Object[]{1.0, 1L},
+          new Object[]{0.0, 1L}
+      );
+    }
+    testQuery(
+        "SELECT d1, COUNT(*) FROM druid.numfoo GROUP BY d1 ORDER BY d1 DESC LIMIT 10",
+        QUERY_CONTEXT_DEFAULT,
+        ImmutableList.of(
+            new TopNQueryBuilder()
+                .dataSource(CalciteTests.DATASOURCE3)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .dimension(new DefaultDimensionSpec("d1", "_d0", ValueType.DOUBLE))
+                .threshold(10)
+                .aggregators(aggregators(new CountAggregatorFactory("a0")))
+                .metric(
+                    new InvertedTopNMetricSpec(
+                        new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)
+                    )
+                )
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        expected
+    );
+  }
+
+  @Test
+  public void testNullFloatTopN() throws Exception
+  {
+    List<Object[]> expected;
+    if (useDefault) {
+      expected = ImmutableList.of(
+          new Object[]{1.0f, 1L},
+          new Object[]{0.1f, 1L},
+          new Object[]{0.0f, 4L}
+      );
+    } else {
+      expected = ImmutableList.of(
+          new Object[]{null, 3L},
+          new Object[]{1.0f, 1L},
+          new Object[]{0.1f, 1L},
+          new Object[]{0.0f, 1L}
+      );
+    }
+    testQuery(
+        "SELECT f1, COUNT(*) FROM druid.numfoo GROUP BY f1 ORDER BY f1 DESC LIMIT 10",
+        QUERY_CONTEXT_DEFAULT,
+        ImmutableList.of(
+            new TopNQueryBuilder()
+                .dataSource(CalciteTests.DATASOURCE3)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .dimension(new DefaultDimensionSpec("f1", "_d0", ValueType.FLOAT))
+                .threshold(10)
+                .aggregators(aggregators(new CountAggregatorFactory("a0")))
+                .metric(
+                    new InvertedTopNMetricSpec(
+                        new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)
+                    )
+                )
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        expected
+    );
+  }
+
+  @Test
+  public void testNullLongTopN() throws Exception
+  {
+    List<Object[]> expected;
+    if (useDefault) {
+      expected = ImmutableList.of(
+          new Object[]{325323L, 1L},
+          new Object[]{7L, 1L},
+          new Object[]{0L, 4L}
+      );
+    } else {
+      expected = ImmutableList.of(
+          new Object[]{null, 3L},
+          new Object[]{325323L, 1L},
+          new Object[]{7L, 1L},
+          new Object[]{0L, 1L}
+      );
+    }
+    testQuery(
+        "SELECT l1, COUNT(*) FROM druid.numfoo GROUP BY l1 ORDER BY l1 DESC LIMIT 10",
+        QUERY_CONTEXT_DEFAULT,
+        ImmutableList.of(
+            new TopNQueryBuilder()
+                .dataSource(CalciteTests.DATASOURCE3)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .dimension(new DefaultDimensionSpec("l1", "_d0", ValueType.LONG))
+                .threshold(10)
+                .aggregators(aggregators(new CountAggregatorFactory("a0")))
+                .metric(
+                    new InvertedTopNMetricSpec(
+                        new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)
+                    )
+                )
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        expected
+    );
+  }
+
+  @Test
   public void testEmptyStringEquality() throws Exception
   {
     if (NullHandling.replaceWithDefault()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org