You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2018/08/26 23:15:03 UTC

[incubator-druid] branch 0.12.3 updated: Fix four bugs with numeric dimension output types. (#6220) (#6230)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.12.3
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.3 by this push:
     new 0b79e76  Fix four bugs with numeric dimension output types. (#6220) (#6230)
0b79e76 is described below

commit 0b79e76720bb41dfc316632d1709898c8019fce8
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Sun Aug 26 16:14:59 2018 -0700

    Fix four bugs with numeric dimension output types. (#6220) (#6230)
    
    * Fix four bugs with numeric dimension output types.
    
    This patch includes the following bug fixes:
    
    - TopNColumnSelectorStrategyFactory: Cast dimension values to the output type
      during dimExtractionScanAndAggregate instead of updateDimExtractionResults.
      This fixes a bug where, for example, grouping on doubles-cast-to-longs would
      fail to merge two doubles that should have been combined into the same long value.
    - TopNQueryEngine: Use DimExtractionTopNAlgorithm when treating string columns
      as numeric dimensions. This fixes a similar bug: grouping on string-cast-to-long
      would fail to merge two strings that should have been combined.
    - GroupByQuery: Cast numeric types to the expected output type before comparing them
      in compareDimsForLimitPushDown. This fixes #6123.
    - GroupByQueryQueryToolChest: Convert Jackson-deserialized dimension values into
      the proper output type. This fixes an inconsistency between results that came
      from cache vs. not-cache: for example, Jackson sometimes deserializes integers
      as Integers and sometimes as Longs.
    
    And the following code-cleanup changes, related to the fixes above:
    
    - DimensionHandlerUtils: Introduce convertObjectToType, compareObjectsAsType,
      and converterFromTypeToType to make it easier to handle casting operations.
    - TopN in general: Rename various "dimName" variables to "dimValue" where they
      actually represent dimension values. The old names were confusing.
    
    * Remove unused imports.
---
 .../java/io/druid/query/groupby/GroupByQuery.java  |  51 +++-----
 .../query/groupby/GroupByQueryQueryToolChest.java  |  10 +-
 .../epinephelinae/GroupByQueryEngineV2.java        |  23 +---
 .../epinephelinae/RowBasedGrouperHelper.java       |  32 +----
 .../query/topn/DimExtractionTopNAlgorithm.java     |  14 ---
 .../java/io/druid/query/topn/DimValHolder.java     |  20 ++--
 .../io/druid/query/topn/PooledTopNAlgorithm.java   |  27 +----
 .../query/topn/TopNLexicographicResultBuilder.java |  52 ++------
 .../main/java/io/druid/query/topn/TopNMapFn.java   |  40 +------
 .../druid/query/topn/TopNNumericResultBuilder.java |  71 ++++-------
 .../java/io/druid/query/topn/TopNQueryEngine.java  |   4 +
 .../druid/query/topn/TopNQueryQueryToolChest.java  |  12 +-
 .../io/druid/query/topn/TopNResultBuilder.java     |   2 +-
 .../types/NumericTopNColumnSelectorStrategy.java   |  57 ++++++---
 .../types/StringTopNColumnSelectorStrategy.java    |  60 +++++-----
 .../topn/types/TopNColumnSelectorStrategy.java     |  11 +-
 .../types/TopNColumnSelectorStrategyFactory.java   |  31 +++--
 .../io/druid/segment/DimensionHandlerUtils.java    | 127 +++++++++++++++++++-
 .../io/druid/query/topn/TopNQueryRunnerTest.java   | 132 +++++++++++++++++++++
 19 files changed, 441 insertions(+), 335 deletions(-)

diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
index 1f9b45e..7d3dbc0 100644
--- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
@@ -58,6 +58,7 @@ import io.druid.query.ordering.StringComparator;
 import io.druid.query.ordering.StringComparators;
 import io.druid.query.spec.LegacySegmentSpec;
 import io.druid.query.spec.QuerySegmentSpec;
+import io.druid.segment.DimensionHandlerUtils;
 import io.druid.segment.VirtualColumn;
 import io.druid.segment.VirtualColumns;
 import io.druid.segment.column.Column;
@@ -377,7 +378,7 @@ public class GroupByQuery extends BaseQuery<Row>
     final List<String> orderedFieldNames = new ArrayList<>();
     final Set<Integer> dimsInOrderBy = new HashSet<>();
     final List<Boolean> needsReverseList = new ArrayList<>();
-    final List<Boolean> isNumericField = new ArrayList<>();
+    final List<ValueType> dimensionTypes = new ArrayList<>();
     final List<StringComparator> comparators = new ArrayList<>();
 
     for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) {
@@ -389,7 +390,7 @@ public class GroupByQuery extends BaseQuery<Row>
         dimsInOrderBy.add(dimIndex);
         needsReverseList.add(needsReverse);
         final ValueType type = dimensions.get(dimIndex).getOutputType();
-        isNumericField.add(ValueType.isNumeric(type));
+        dimensionTypes.add(type);
         comparators.add(orderSpec.getDimensionComparator());
       }
     }
@@ -399,7 +400,7 @@ public class GroupByQuery extends BaseQuery<Row>
         orderedFieldNames.add(dimensions.get(i).getOutputName());
         needsReverseList.add(false);
         final ValueType type = dimensions.get(i).getOutputType();
-        isNumericField.add(ValueType.isNumeric(type));
+        dimensionTypes.add(type);
         comparators.add(StringComparators.LEXICOGRAPHIC);
       }
     }
@@ -416,7 +417,7 @@ public class GroupByQuery extends BaseQuery<Row>
               return compareDimsForLimitPushDown(
                   orderedFieldNames,
                   needsReverseList,
-                  isNumericField,
+                  dimensionTypes,
                   comparators,
                   lhs,
                   rhs
@@ -434,7 +435,7 @@ public class GroupByQuery extends BaseQuery<Row>
               final int cmp = compareDimsForLimitPushDown(
                   orderedFieldNames,
                   needsReverseList,
-                  isNumericField,
+                  dimensionTypes,
                   comparators,
                   lhs,
                   rhs
@@ -463,7 +464,7 @@ public class GroupByQuery extends BaseQuery<Row>
               return compareDimsForLimitPushDown(
                   orderedFieldNames,
                   needsReverseList,
-                  isNumericField,
+                  dimensionTypes,
                   comparators,
                   lhs,
                   rhs
@@ -530,28 +531,12 @@ public class GroupByQuery extends BaseQuery<Row>
   private static int compareDims(List<DimensionSpec> dimensions, Row lhs, Row rhs)
   {
     for (DimensionSpec dimension : dimensions) {
-      final int dimCompare;
-      if (dimension.getOutputType() == ValueType.LONG) {
-        dimCompare = Long.compare(
-            ((Number) lhs.getRaw(dimension.getOutputName())).longValue(),
-            ((Number) rhs.getRaw(dimension.getOutputName())).longValue()
-        );
-      } else if (dimension.getOutputType() == ValueType.FLOAT) {
-        dimCompare = Float.compare(
-            ((Number) lhs.getRaw(dimension.getOutputName())).floatValue(),
-            ((Number) rhs.getRaw(dimension.getOutputName())).floatValue()
-        );
-      } else if (dimension.getOutputType() == ValueType.DOUBLE) {
-        dimCompare = Double.compare(
-            ((Number) lhs.getRaw(dimension.getOutputName())).doubleValue(),
-            ((Number) rhs.getRaw(dimension.getOutputName())).doubleValue()
-        );
-      } else {
-        dimCompare = ((Ordering) Comparators.naturalNullsFirst()).compare(
-            lhs.getRaw(dimension.getOutputName()),
-            rhs.getRaw(dimension.getOutputName())
-        );
-      }
+      //noinspection unchecked
+      final int dimCompare = DimensionHandlerUtils.compareObjectsAsType(
+          lhs.getRaw(dimension.getOutputName()),
+          rhs.getRaw(dimension.getOutputName()),
+          dimension.getOutputType()
+      );
       if (dimCompare != 0) {
         return dimCompare;
       }
@@ -563,7 +548,7 @@ public class GroupByQuery extends BaseQuery<Row>
   private static int compareDimsForLimitPushDown(
       final List<String> fields,
       final List<Boolean> needsReverseList,
-      final List<Boolean> isNumericField,
+      final List<ValueType> dimensionTypes,
       final List<StringComparator> comparators,
       Row lhs,
       Row rhs
@@ -572,17 +557,15 @@ public class GroupByQuery extends BaseQuery<Row>
     for (int i = 0; i < fields.size(); i++) {
       final String fieldName = fields.get(i);
       final StringComparator comparator = comparators.get(i);
+      final ValueType dimensionType = dimensionTypes.get(i);
 
       final int dimCompare;
       final Object lhsObj = lhs.getRaw(fieldName);
       final Object rhsObj = rhs.getRaw(fieldName);
 
-      if (isNumericField.get(i)) {
+      if (ValueType.isNumeric(dimensionType)) {
         if (comparator.equals(StringComparators.NUMERIC)) {
-          dimCompare = ((Ordering) Comparators.naturalNullsFirst()).compare(
-              lhsObj,
-              rhsObj
-          );
+          dimCompare = DimensionHandlerUtils.compareObjectsAsType(lhsObj, rhsObj, dimensionType);
         } else {
           dimCompare = comparator.compare(String.valueOf(lhsObj), String.valueOf(rhsObj));
         }
diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java
index 28d5acb..0460781 100644
--- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java
+++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java
@@ -57,6 +57,7 @@ import io.druid.query.extraction.ExtractionFn;
 import io.druid.query.groupby.resource.GroupByQueryResource;
 import io.druid.query.groupby.strategy.GroupByStrategy;
 import io.druid.query.groupby.strategy.GroupByStrategySelector;
+import io.druid.segment.DimensionHandlerUtils;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -451,8 +452,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
             Map<String, Object> event = Maps.newLinkedHashMap();
             Iterator<DimensionSpec> dimsIter = dims.iterator();
             while (dimsIter.hasNext() && results.hasNext()) {
-              final DimensionSpec factory = dimsIter.next();
-              event.put(factory.getOutputName(), results.next());
+              final DimensionSpec dimensionSpec = dimsIter.next();
+
+              // Must convert generic Jackson-deserialized type into the proper type.
+              event.put(
+                  dimensionSpec.getOutputName(),
+                  DimensionHandlerUtils.convertObjectToType(results.next(), dimensionSpec.getOutputType())
+              );
             }
 
             Iterator<AggregatorFactory> aggsIter = aggs.iterator();
diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
index dffc4f6..d570335 100644
--- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
+++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
@@ -682,28 +682,7 @@ public class GroupByQueryEngineV2
       final ValueType outputType = dimSpec.getOutputType();
       rowMap.compute(
           dimSpec.getOutputName(),
-          (dimName, baseVal) -> {
-            switch (outputType) {
-              case STRING:
-                baseVal = baseVal == null ? "" : baseVal.toString();
-                break;
-              case LONG:
-                baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal);
-                baseVal = baseVal == null ? 0L : baseVal;
-                break;
-              case FLOAT:
-                baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal);
-                baseVal = baseVal == null ? 0.f : baseVal;
-                break;
-              case DOUBLE:
-                baseVal = DimensionHandlerUtils.convertObjectToDouble(baseVal);
-                baseVal = baseVal == null ? 0.d : baseVal;
-                break;
-              default:
-                throw new IAE("Unsupported type: " + outputType);
-            }
-            return baseVal;
-          }
+          (dimName, baseVal) -> DimensionHandlerUtils.convertObjectToTypeNonNull(baseVal, outputType)
       );
     }
   }
diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
index 290cf13..fa4f2a9 100644
--- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
+++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
@@ -602,38 +602,10 @@ public class RowBasedGrouperHelper
   {
     final Function<Comparable, Comparable>[] functions = new Function[valueTypes.size()];
     for (int i = 0; i < functions.length; i++) {
-      ValueType type = valueTypes.get(i);
       // Subquery post-aggs aren't added to the rowSignature (see rowSignatureFor() in GroupByQueryHelper) because
       // their types aren't known, so default to String handling.
-      type = type == null ? ValueType.STRING : type;
-      switch (type) {
-        case STRING:
-          functions[i] = input -> input == null ? "" : input.toString();
-          break;
-
-        case LONG:
-          functions[i] = input -> {
-            final Long val = DimensionHandlerUtils.convertObjectToLong(input);
-            return val == null ? 0L : val;
-          };
-          break;
-
-        case FLOAT:
-          functions[i] = input -> {
-            final Float val = DimensionHandlerUtils.convertObjectToFloat(input);
-            return val == null ? 0.f : val;
-          };
-          break;
-
-        case DOUBLE:
-          functions[i] = input -> {
-            Double val = DimensionHandlerUtils.convertObjectToDouble(input);
-            return val == null ? 0.0 : val;
-          };
-          break;
-        default:
-          throw new IAE("invalid type: [%s]", type);
-      }
+      final ValueType type = valueTypes.get(i) == null ? ValueType.STRING : valueTypes.get(i);
+      functions[i] = input -> DimensionHandlerUtils.convertObjectToTypeNonNull(input, type);
     }
     return functions;
   }
diff --git a/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java
index 4737372..77ba5de 100644
--- a/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java
+++ b/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java
@@ -19,7 +19,6 @@
 
 package io.druid.query.topn;
 
-import com.google.common.base.Function;
 import io.druid.query.ColumnSelectorPlus;
 import io.druid.query.aggregation.Aggregator;
 import io.druid.query.topn.types.TopNColumnSelectorStrategy;
@@ -110,14 +109,8 @@ public class DimExtractionTopNAlgorithm
   )
   {
     final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
-    final boolean needsResultTypeConversion = needsResultTypeConversion(params);
-    final Function<Object, Object> valueTransformer = TopNMapFn.getValueTransformer(
-        query.getDimensionSpec().getOutputType()
-    );
-
     selectorPlus.getColumnSelectorStrategy().updateDimExtractionResults(
         aggregatesStore,
-        needsResultTypeConversion ? valueTransformer : null,
         resultBuilder
     );
   }
@@ -136,11 +129,4 @@ public class DimExtractionTopNAlgorithm
   public void cleanup(TopNParams params)
   {
   }
-
-  private boolean needsResultTypeConversion(TopNParams params)
-  {
-    ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
-    TopNColumnSelectorStrategy strategy = selectorPlus.getColumnSelectorStrategy();
-    return query.getDimensionSpec().getOutputType() != strategy.getValueType();
-  }
 }
diff --git a/processing/src/main/java/io/druid/query/topn/DimValHolder.java b/processing/src/main/java/io/druid/query/topn/DimValHolder.java
index f983d89..cc96767 100644
--- a/processing/src/main/java/io/druid/query/topn/DimValHolder.java
+++ b/processing/src/main/java/io/druid/query/topn/DimValHolder.java
@@ -26,19 +26,19 @@ import java.util.Map;
 public class DimValHolder
 {
   private final Object topNMetricVal;
-  private final Comparable dimName;
+  private final Comparable dimValue;
   private final Object dimValIndex;
   private final Map<String, Object> metricValues;
 
   public DimValHolder(
       Object topNMetricVal,
-      Comparable dimName,
+      Comparable dimValue,
       Object dimValIndex,
       Map<String, Object> metricValues
   )
   {
     this.topNMetricVal = topNMetricVal;
-    this.dimName = dimName;
+    this.dimValue = dimValue;
     this.dimValIndex = dimValIndex;
     this.metricValues = metricValues;
   }
@@ -48,9 +48,9 @@ public class DimValHolder
     return topNMetricVal;
   }
 
-  public Comparable getDimName()
+  public Comparable getDimValue()
   {
-    return dimName;
+    return dimValue;
   }
 
   public Object getDimValIndex()
@@ -66,14 +66,14 @@ public class DimValHolder
   public static class Builder
   {
     private Object topNMetricVal;
-    private Comparable dimName;
+    private Comparable dimValue;
     private Object dimValIndex;
     private Map<String, Object> metricValues;
 
     public Builder()
     {
       topNMetricVal = null;
-      dimName = null;
+      dimValue = null;
       dimValIndex = null;
       metricValues = null;
     }
@@ -84,9 +84,9 @@ public class DimValHolder
       return this;
     }
 
-    public Builder withDimName(Comparable dimName)
+    public Builder withDimValue(Comparable dimValue)
     {
-      this.dimName = dimName;
+      this.dimValue = dimValue;
       return this;
     }
 
@@ -104,7 +104,7 @@ public class DimValHolder
 
     public DimValHolder build()
     {
-      return new DimValHolder(topNMetricVal, dimName, dimValIndex, metricValues);
+      return new DimValHolder(topNMetricVal, dimValue, dimValIndex, metricValues);
     }
   }
 }
diff --git a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java
index c191f4a..b1e2d92 100644
--- a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java
+++ b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java
@@ -20,7 +20,6 @@
 package io.druid.query.topn;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
 import io.druid.collections.NonBlockingPool;
 import io.druid.collections.ResourceHolder;
@@ -37,7 +36,6 @@ import io.druid.segment.Cursor;
 import io.druid.segment.DimensionSelector;
 import io.druid.segment.FilteredOffset;
 import io.druid.segment.StorageAdapter;
-import io.druid.segment.column.ValueType;
 import io.druid.segment.data.IndexedInts;
 import io.druid.segment.data.Offset;
 import io.druid.segment.historical.HistoricalColumnSelector;
@@ -736,10 +734,6 @@ public class PooledTopNAlgorithm
     final int[] aggregatorSizes = params.getAggregatorSizes();
     final DimensionSelector dimSelector = params.getDimSelector();
 
-    final ValueType outType = query.getDimensionSpec().getOutputType();
-    final boolean needsResultConversion = outType != ValueType.STRING;
-    final Function<Object, Object> valueTransformer = TopNMapFn.getValueTransformer(outType);
-
     for (int i = 0; i < positions.length; i++) {
       int position = positions[i];
       if (position >= 0) {
@@ -749,14 +743,9 @@ public class PooledTopNAlgorithm
           position += aggregatorSizes[j];
         }
 
-        Object retVal = dimSelector.lookupName(i);
-        if (needsResultConversion) {
-          retVal = valueTransformer.apply(retVal);
-        }
-
-
+        // Output type must be STRING in order for PooledTopNAlgorithm to make sense; so no need to convert value.
         resultBuilder.addEntry(
-            (Comparable) retVal,
+            dimSelector.lookupName(i),
             i,
             vals
         );
@@ -854,18 +843,6 @@ public class PooledTopNAlgorithm
       private int numValuesPerPass;
       private TopNMetricSpecBuilder<int[]> arrayProvider;
 
-      public Builder()
-      {
-        selectorPlus = null;
-        cursor = null;
-        resultsBufHolder = null;
-        resultsBuf = null;
-        aggregatorSizes = null;
-        numBytesPerRecord = 0;
-        numValuesPerPass = 0;
-        arrayProvider = null;
-      }
-
       public Builder withSelectorPlus(ColumnSelectorPlus selectorPlus)
       {
         this.selectorPlus = selectorPlus;
diff --git a/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java
index 4cfc39e..b402c59 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java
@@ -19,7 +19,6 @@
 
 package io.druid.query.topn;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import io.druid.query.Result;
@@ -66,32 +65,22 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
     this.threshold = threshold;
     this.pQueue = new PriorityQueue<>(
         threshold + 1,
-        new Comparator<DimValHolder>()
-        {
-          @Override
-          public int compare(
-              DimValHolder o1,
-              DimValHolder o2
-          )
-          {
-            return comparator.compare(o2.getDimName(), o1.getDimName());
-          }
-        }
+        (o1, o2) -> comparator.compare(o2.getDimValue(), o1.getDimValue())
     );
   }
 
   @Override
   public TopNResultBuilder addEntry(
-      Comparable dimNameObj,
+      Comparable dimValueObj,
       Object dimValIndex,
       Object[] metricVals
   )
   {
-    final String dimName = Objects.toString(dimNameObj, null);
+    final String dimValue = Objects.toString(dimValueObj, null);
     final Map<String, Object> metricValues = Maps.newHashMapWithExpectedSize(metricVals.length + 1);
 
-    if (shouldAdd(dimName)) {
-      metricValues.put(dimSpec.getOutputName(), dimName);
+    if (shouldAdd(dimValue)) {
+      metricValues.put(dimSpec.getOutputName(), dimValueObj);
       final int extra = metricVals.length % LOOP_UNROLL_COUNT;
       switch (extra) {
         case 7:
@@ -126,7 +115,7 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
         metricValues.put(aggFactoryNames[i + 7], metricVals[i + 7]);
       }
 
-      pQueue.add(new DimValHolder.Builder().withDimName(dimName).withMetricValues(metricValues).build());
+      pQueue.add(new DimValHolder.Builder().withDimValue(dimValue).withMetricValues(metricValues).build());
       if (pQueue.size() > threshold) {
         pQueue.poll();
       }
@@ -143,7 +132,7 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
 
     if (shouldAdd(dimensionValue)) {
       pQueue.add(
-          new DimValHolder.Builder().withDimName(dimensionValue)
+          new DimValHolder.Builder().withDimValue(dimensionValue)
                                     .withMetricValues(dimensionAndMetricValueExtractor.getBaseObject())
                                     .build()
       );
@@ -167,30 +156,11 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
     final DimValHolder[] holderValueArray = pQueue.toArray(new DimValHolder[0]);
     Arrays.sort(
         holderValueArray,
-        new Comparator<DimValHolder>()
-        {
-          @Override
-          public int compare(DimValHolder o1, DimValHolder o2)
-          {
-            return comparator.compare(o1.getDimName(), o2.getDimName());
-          }
-        }
-
+        (o1, o2) -> comparator.compare(o1.getDimValue(), o2.getDimValue())
     );
-    return new Result(
-        timestamp, new TopNResultValue(
-        Lists.transform(
-            Arrays.asList(holderValueArray),
-            new Function<DimValHolder, Object>()
-            {
-              @Override
-              public Object apply(DimValHolder dimValHolder)
-              {
-                return dimValHolder.getMetricValues();
-              }
-            }
-        )
-    )
+    return new Result<>(
+        timestamp,
+        new TopNResultValue(Lists.transform(Arrays.asList(holderValueArray), DimValHolder::getMetricValues))
     );
   }
 
diff --git a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java
index 24d05fa..7bdb086 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java
@@ -19,54 +19,16 @@
 
 package io.druid.query.topn;
 
-import com.google.common.base.Function;
-import io.druid.java.util.common.IAE;
 import io.druid.query.ColumnSelectorPlus;
 import io.druid.query.Result;
 import io.druid.query.topn.types.TopNColumnSelectorStrategyFactory;
 import io.druid.segment.Cursor;
 import io.druid.segment.DimensionHandlerUtils;
-import io.druid.segment.column.ValueType;
 
 import javax.annotation.Nullable;
-import java.util.Objects;
 
 public class TopNMapFn
 {
-  public static Function<Object, Object> getValueTransformer(ValueType outputType)
-  {
-    switch (outputType) {
-      case STRING:
-        return STRING_TRANSFORMER;
-      case LONG:
-        return LONG_TRANSFORMER;
-      case FLOAT:
-        return FLOAT_TRANSFORMER;
-      case DOUBLE:
-        return DOUBLE_TRANSFORMER;
-      default:
-        throw new IAE("invalid type: %s", outputType);
-    }
-  }
-
-  private static Function<Object, Object> STRING_TRANSFORMER = input -> Objects.toString(input, null);
-
-  private static Function<Object, Object> LONG_TRANSFORMER = input -> {
-    final Long longVal = DimensionHandlerUtils.convertObjectToLong(input);
-    return longVal == null ? DimensionHandlerUtils.ZERO_LONG : longVal;
-  };
-
-  private static Function<Object, Object> FLOAT_TRANSFORMER = input -> {
-    final Float floatVal = DimensionHandlerUtils.convertObjectToFloat(input);
-    return floatVal == null ? DimensionHandlerUtils.ZERO_FLOAT : floatVal;
-  };
-  private static Function<Object, Object> DOUBLE_TRANSFORMER = input -> {
-    final Double doubleValue = DimensionHandlerUtils.convertObjectToDouble(input);
-    return doubleValue == null ? DimensionHandlerUtils.ZERO_DOUBLE : doubleValue;
-  };
-
-  private static final TopNColumnSelectorStrategyFactory STRATEGY_FACTORY = new TopNColumnSelectorStrategyFactory();
-
   private final TopNQuery query;
   private final TopNAlgorithm topNAlgorithm;
 
@@ -83,7 +45,7 @@ public class TopNMapFn
   public Result<TopNResultValue> apply(final Cursor cursor, final @Nullable TopNQueryMetrics queryMetrics)
   {
     final ColumnSelectorPlus selectorPlus = DimensionHandlerUtils.createColumnSelectorPlus(
-        STRATEGY_FACTORY,
+        new TopNColumnSelectorStrategyFactory(query.getDimensionSpec().getOutputType()),
         query.getDimensionSpec(),
         cursor.getColumnSelectorFactory()
     );
diff --git a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java
index fd5aa3f..9fecef4 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java
@@ -19,7 +19,6 @@
 
 package io.druid.query.topn;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -48,9 +47,9 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
   private final String metricName;
   private final List<PostAggregator> postAggs;
   private final PriorityQueue<DimValHolder> pQueue;
-  private final Comparator<DimValHolder> dimValComparator;
+  private final Comparator<DimValHolder> dimValHolderComparator;
   private final String[] aggFactoryNames;
-  private static final Comparator<Comparable> dimNameComparator = new Comparator<Comparable>()
+  private static final Comparator<Comparable> dimValueComparator = new Comparator<Comparable>()
   {
     @Override
     public int compare(Comparable o1, Comparable o2)
@@ -65,6 +64,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
       } else if (null == o2) {
         retval = 1;
       } else {
+        //noinspection unchecked
         retval = o1.compareTo(o2);
       }
       return retval;
@@ -91,30 +91,26 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
     this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName);
     this.threshold = threshold;
     this.metricComparator = comparator;
-    this.dimValComparator = new Comparator<DimValHolder>()
-    {
-      @Override
-      public int compare(DimValHolder d1, DimValHolder d2)
-      {
-        int retVal = metricComparator.compare(d1.getTopNMetricVal(), d2.getTopNMetricVal());
-
-        if (retVal == 0) {
-          retVal = dimNameComparator.compare(d1.getDimName(), d2.getDimName());
-        }
+    this.dimValHolderComparator = (d1, d2) -> {
+      //noinspection unchecked
+      int retVal = metricComparator.compare(d1.getTopNMetricVal(), d2.getTopNMetricVal());
 
-        return retVal;
+      if (retVal == 0) {
+        retVal = dimValueComparator.compare(d1.getDimValue(), d2.getDimValue());
       }
+
+      return retVal;
     };
 
     // The logic in addEntry first adds, then removes if needed. So it can at any point have up to threshold + 1 entries.
-    pQueue = new PriorityQueue<>(this.threshold + 1, this.dimValComparator);
+    pQueue = new PriorityQueue<>(this.threshold + 1, dimValHolderComparator);
   }
 
   private static final int LOOP_UNROLL_COUNT = 8;
 
   @Override
   public TopNNumericResultBuilder addEntry(
-      Comparable dimName,
+      Comparable dimValueObj,
       Object dimValIndex,
       Object[] metricVals
   )
@@ -126,7 +122,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
 
     final Map<String, Object> metricValues = Maps.newHashMapWithExpectedSize(metricVals.length + postAggs.size() + 1);
 
-    metricValues.put(dimSpec.getOutputName(), dimName);
+    metricValues.put(dimSpec.getOutputName(), dimValueObj);
 
     final int extra = metricVals.length % LOOP_UNROLL_COUNT;
 
@@ -173,7 +169,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
     if (shouldAdd(topNMetricVal)) {
       DimValHolder dimValHolder = new DimValHolder.Builder()
           .withTopNMetricVal(topNMetricVal)
-          .withDimName(dimName)
+          .withDimValue(dimValueObj)
           .withDimValIndex(dimValIndex)
           .withMetricValues(metricValues)
           .build();
@@ -202,7 +198,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
     if (shouldAdd(dimValue)) {
       final DimValHolder valHolder = new DimValHolder.Builder()
           .withTopNMetricVal(dimValue)
-          .withDimName((Comparable) dimensionAndMetricValueExtractor.getDimensionValue(dimSpec.getOutputName()))
+          .withDimValue((Comparable) dimensionAndMetricValueExtractor.getDimensionValue(dimSpec.getOutputName()))
           .withMetricValues(dimensionAndMetricValueExtractor.getBaseObject())
           .build();
       pQueue.add(valHolder);
@@ -224,39 +220,24 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
   {
     final DimValHolder[] holderValueArray = pQueue.toArray(new DimValHolder[0]);
     Arrays.sort(
-        holderValueArray, new Comparator<DimValHolder>()
-        {
-          @Override
-          public int compare(DimValHolder d1, DimValHolder d2)
-          {
-            // Values flipped compared to earlier
-            int retVal = metricComparator.compare(d2.getTopNMetricVal(), d1.getTopNMetricVal());
+        holderValueArray,
+        (d1, d2) -> {
+          // Metric values flipped compared to dimValueHolderComparator.
 
-            if (retVal == 0) {
-              retVal = dimNameComparator.compare(d1.getDimName(), d2.getDimName());
-            }
+          //noinspection unchecked
+          int retVal = metricComparator.compare(d2.getTopNMetricVal(), d1.getTopNMetricVal());
 
-            return retVal;
+          if (retVal == 0) {
+            retVal = dimValueComparator.compare(d1.getDimValue(), d2.getDimValue());
           }
+
+          return retVal;
         }
     );
     List<DimValHolder> holderValues = Arrays.asList(holderValueArray);
 
     // Pull out top aggregated values
-    final List<Map<String, Object>> values = Lists.transform(
-        holderValues,
-        new Function<DimValHolder, Map<String, Object>>()
-        {
-          @Override
-          public Map<String, Object> apply(DimValHolder valHolder)
-          {
-            return valHolder.getMetricValues();
-          }
-        }
-    );
-    return new Result<TopNResultValue>(
-        timestamp,
-        new TopNResultValue(values)
-    );
+    final List<Map<String, Object>> values = Lists.transform(holderValues, DimValHolder::getMetricValues);
+    return new Result<>(timestamp, new TopNResultValue(values));
   }
 }
diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java
index bc573b9..cf2468f 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java
@@ -142,6 +142,10 @@ public class TopNQueryEngine
                                                && columnCapabilities.isDictionaryEncoded())) {
       // Use DimExtraction for non-Strings and for non-dictionary-encoded Strings.
       topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
+    } else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
+      // Use DimExtraction when the dimension output type is a non-String. (It's like an extractionFn: there can be
+      // a many-to-one mapping, since numeric types can't represent all possible values of other types.)
+      topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
     } else if (selector.isAggregateAllMetrics()) {
       topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
     } else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java
index 774543d..53bd2ba 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java
@@ -50,6 +50,7 @@ import io.druid.query.aggregation.PostAggregator;
 import io.druid.query.cache.CacheKeyBuilder;
 import io.druid.query.dimension.DefaultDimensionSpec;
 import io.druid.query.dimension.DimensionSpec;
+import io.druid.segment.DimensionHandlerUtils;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -384,11 +385,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
             Iterator<Object> inputIter = results.iterator();
             DateTime timestamp = granularity.toDateTime(((Number) inputIter.next()).longValue());
 
-            // Need a value transformer to convert generic Jackson-deserialized type into the proper type.
-            final Function<Object, Object> dimValueTransformer = TopNMapFn.getValueTransformer(
-                query.getDimensionSpec().getOutputType()
-            );
-
             while (inputIter.hasNext()) {
               List<Object> result = (List<Object>) inputIter.next();
               Map<String, Object> vals = Maps.newLinkedHashMap();
@@ -396,7 +392,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
               Iterator<AggregatorFactory> aggIter = aggs.iterator();
               Iterator<Object> resultIter = result.iterator();
 
-              vals.put(query.getDimensionSpec().getOutputName(), dimValueTransformer.apply(resultIter.next()));
+              // Must convert generic Jackson-deserialized type into the proper type.
+              vals.put(
+                  query.getDimensionSpec().getOutputName(),
+                  DimensionHandlerUtils.convertObjectToType(resultIter.next(), query.getDimensionSpec().getOutputType())
+              );
 
               while (aggIter.hasNext() && resultIter.hasNext()) {
                 final AggregatorFactory factory = aggIter.next();
diff --git a/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java
index 5ccc8d4..4b2d617 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java
@@ -28,7 +28,7 @@ import java.util.Iterator;
 public interface TopNResultBuilder
 {
   TopNResultBuilder addEntry(
-      Comparable dimNameObj,
+      Comparable dimValueObj,
       Object dimValIndex,
       Object[] metricVals
   );
diff --git a/processing/src/main/java/io/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java
index 24084c1..0ae56a7 100644
--- a/processing/src/main/java/io/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java
+++ b/processing/src/main/java/io/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java
@@ -19,7 +19,7 @@
 
 package io.druid.query.topn.types;
 
-import com.google.common.base.Function;
+import io.druid.java.util.common.IAE;
 import io.druid.query.aggregation.Aggregator;
 import io.druid.query.topn.BaseTopNAlgorithm;
 import io.druid.query.topn.TopNParams;
@@ -29,6 +29,7 @@ import io.druid.segment.BaseDoubleColumnValueSelector;
 import io.druid.segment.BaseFloatColumnValueSelector;
 import io.druid.segment.BaseLongColumnValueSelector;
 import io.druid.segment.Cursor;
+import io.druid.segment.DimensionHandlerUtils;
 import io.druid.segment.StorageAdapter;
 import io.druid.segment.column.ValueType;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
@@ -37,12 +38,32 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
 
 import java.util.Map;
+import java.util.function.Function;
 
 public abstract class NumericTopNColumnSelectorStrategy<
     ValueSelectorType,
     DimExtractionAggregateStoreType extends Map<?, Aggregator[]>>
     implements TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggregateStoreType>
 {
+  public static TopNColumnSelectorStrategy ofType(final ValueType selectorType, final ValueType dimensionType)
+  {
+    final Function<Object, Comparable<?>> converter = DimensionHandlerUtils.converterFromTypeToTypeNonNull(
+        selectorType,
+        dimensionType
+    );
+
+    switch (selectorType) {
+      case LONG:
+        return new OfLong(converter);
+      case FLOAT:
+        return new OfFloat(converter);
+      case DOUBLE:
+        return new OfDouble(converter);
+      default:
+        throw new IAE("No strategy for type[%s]", selectorType);
+    }
+  }
+
   @Override
   public int getCardinality(ValueSelectorType selector)
   {
@@ -132,7 +153,6 @@ public abstract class NumericTopNColumnSelectorStrategy<
   @Override
   public void updateDimExtractionResults(
       final DimExtractionAggregateStoreType aggregatesStore,
-      final Function<Object, Object> valueTransformer,
       final TopNResultBuilder resultBuilder
   )
   {
@@ -144,11 +164,7 @@ public abstract class NumericTopNColumnSelectorStrategy<
           vals[i] = aggs[i].get();
         }
 
-        Comparable key = convertAggregatorStoreKeyToColumnValue(entry.getKey());
-        if (valueTransformer != null) {
-          key = (Comparable) valueTransformer.apply(key);
-        }
-
+        final Comparable key = convertAggregatorStoreKeyToColumnValue(entry.getKey());
         resultBuilder.addEntry(key, key, vals);
       }
     }
@@ -159,10 +175,11 @@ public abstract class NumericTopNColumnSelectorStrategy<
   static class OfFloat
       extends NumericTopNColumnSelectorStrategy<BaseFloatColumnValueSelector, Int2ObjectMap<Aggregator[]>>
   {
-    @Override
-    public ValueType getValueType()
+    private final Function<Object, Comparable<?>> converter;
+
+    OfFloat(final Function<Object, Comparable<?>> converter)
     {
-      return ValueType.FLOAT;
+      this.converter = converter;
     }
 
     @Override
@@ -174,7 +191,7 @@ public abstract class NumericTopNColumnSelectorStrategy<
     @Override
     Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
     {
-      return Float.intBitsToFloat((Integer) aggregatorStoreKey);
+      return converter.apply(Float.intBitsToFloat((Integer) aggregatorStoreKey));
     }
 
     @Override
@@ -193,10 +210,11 @@ public abstract class NumericTopNColumnSelectorStrategy<
   static class OfLong
       extends NumericTopNColumnSelectorStrategy<BaseLongColumnValueSelector, Long2ObjectMap<Aggregator[]>>
   {
-    @Override
-    public ValueType getValueType()
+    private final Function<Object, Comparable<?>> converter;
+
+    OfLong(final Function<Object, Comparable<?>> converter)
     {
-      return ValueType.LONG;
+      this.converter = converter;
     }
 
     @Override
@@ -208,7 +226,7 @@ public abstract class NumericTopNColumnSelectorStrategy<
     @Override
     Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
     {
-      return (Long) aggregatorStoreKey;
+      return converter.apply(aggregatorStoreKey);
     }
 
     @Override
@@ -227,10 +245,11 @@ public abstract class NumericTopNColumnSelectorStrategy<
   static class OfDouble
       extends NumericTopNColumnSelectorStrategy<BaseDoubleColumnValueSelector, Long2ObjectMap<Aggregator[]>>
   {
-    @Override
-    public ValueType getValueType()
+    private final Function<Object, Comparable<?>> converter;
+
+    OfDouble(final Function<Object, Comparable<?>> converter)
     {
-      return ValueType.DOUBLE;
+      this.converter = converter;
     }
 
     @Override
@@ -242,7 +261,7 @@ public abstract class NumericTopNColumnSelectorStrategy<
     @Override
     Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
     {
-      return Double.longBitsToDouble((Long) aggregatorStoreKey);
+      return converter.apply(Double.longBitsToDouble((Long) aggregatorStoreKey));
     }
 
     @Override
diff --git a/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java
index 724c4f0..17b7298 100644
--- a/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java
+++ b/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java
@@ -19,34 +19,47 @@
 
 package io.druid.query.topn.types;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Maps;
 import io.druid.query.aggregation.Aggregator;
 import io.druid.query.topn.BaseTopNAlgorithm;
 import io.druid.query.topn.TopNParams;
 import io.druid.query.topn.TopNQuery;
 import io.druid.query.topn.TopNResultBuilder;
 import io.druid.segment.Cursor;
+import io.druid.segment.DimensionHandlerUtils;
 import io.druid.segment.DimensionSelector;
 import io.druid.segment.StorageAdapter;
 import io.druid.segment.column.ValueType;
 import io.druid.segment.data.IndexedInts;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Function;
 
 public class StringTopNColumnSelectorStrategy
-    implements TopNColumnSelectorStrategy<DimensionSelector, Map<String, Aggregator[]>>
+    implements TopNColumnSelectorStrategy<DimensionSelector, Map<Comparable, Aggregator[]>>
 {
-  @Override
-  public int getCardinality(DimensionSelector selector)
+  private final Function<Object, Comparable<?>> dimensionValueConverter;
+
+  public StringTopNColumnSelectorStrategy(final ValueType dimensionType)
   {
-    return selector.getValueCardinality();
+    // We can handle null strings, but not null numbers.
+    if (dimensionType == ValueType.STRING) {
+      this.dimensionValueConverter = DimensionHandlerUtils.converterFromTypeToType(
+          ValueType.STRING,
+          dimensionType
+      );
+    } else {
+      this.dimensionValueConverter = DimensionHandlerUtils.converterFromTypeToTypeNonNull(
+          ValueType.STRING,
+          dimensionType
+      );
+    }
   }
 
   @Override
-  public ValueType getValueType()
+  public int getCardinality(DimensionSelector selector)
   {
-    return ValueType.STRING;
+    return selector.getValueCardinality();
   }
 
   @Override
@@ -71,9 +84,9 @@ public class StringTopNColumnSelectorStrategy
   }
 
   @Override
-  public Map<String, Aggregator[]> makeDimExtractionAggregateStore()
+  public Map<Comparable, Aggregator[]> makeDimExtractionAggregateStore()
   {
-    return Maps.newHashMap();
+    return new HashMap<>();
   }
 
   @Override
@@ -82,7 +95,7 @@ public class StringTopNColumnSelectorStrategy
       DimensionSelector selector,
       Cursor cursor,
       Aggregator[][] rowSelector,
-      Map<String, Aggregator[]> aggregatesStore
+      Map<Comparable, Aggregator[]> aggregatesStore
   )
   {
     if (selector.getValueCardinality() != DimensionSelector.CARDINALITY_UNKNOWN) {
@@ -94,12 +107,11 @@ public class StringTopNColumnSelectorStrategy
 
   @Override
   public void updateDimExtractionResults(
-      final Map<String, Aggregator[]> aggregatesStore,
-      final Function<Object, Object> valueTransformer,
+      final Map<Comparable, Aggregator[]> aggregatesStore,
       final TopNResultBuilder resultBuilder
   )
   {
-    for (Map.Entry<String, Aggregator[]> entry : aggregatesStore.entrySet()) {
+    for (Map.Entry<Comparable, Aggregator[]> entry : aggregatesStore.entrySet()) {
       Aggregator[] aggs = entry.getValue();
       if (aggs != null) {
         Object[] vals = new Object[aggs.length];
@@ -107,16 +119,8 @@ public class StringTopNColumnSelectorStrategy
           vals[i] = aggs[i].get();
         }
 
-        Comparable key = entry.getKey();
-        if (valueTransformer != null) {
-          key = (Comparable) valueTransformer.apply(key);
-        }
-
-        resultBuilder.addEntry(
-            key,
-            key,
-            vals
-        );
+        final Comparable key = dimensionValueConverter.apply(entry.getKey());
+        resultBuilder.addEntry(key, key, vals);
       }
     }
   }
@@ -126,7 +130,7 @@ public class StringTopNColumnSelectorStrategy
       Cursor cursor,
       DimensionSelector selector,
       Aggregator[][] rowSelector,
-      Map<String, Aggregator[]> aggregatesStore
+      Map<Comparable, Aggregator[]> aggregatesStore
   )
   {
     long processedRows = 0;
@@ -136,7 +140,7 @@ public class StringTopNColumnSelectorStrategy
         final int dimIndex = dimValues.get(i);
         Aggregator[] theAggregators = rowSelector[dimIndex];
         if (theAggregators == null) {
-          final String key = selector.lookupName(dimIndex);
+          final Comparable<?> key = dimensionValueConverter.apply(selector.lookupName(dimIndex));
           theAggregators = aggregatesStore.get(key);
           if (theAggregators == null) {
             theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
@@ -159,7 +163,7 @@ public class StringTopNColumnSelectorStrategy
       TopNQuery query,
       Cursor cursor,
       DimensionSelector selector,
-      Map<String, Aggregator[]> aggregatesStore
+      Map<Comparable, Aggregator[]> aggregatesStore
   )
   {
     long processedRows = 0;
@@ -167,7 +171,7 @@ public class StringTopNColumnSelectorStrategy
       final IndexedInts dimValues = selector.getRow();
       for (int i = 0; i < dimValues.size(); ++i) {
         final int dimIndex = dimValues.get(i);
-        final String key = selector.lookupName(dimIndex);
+        final Comparable<?> key = dimensionValueConverter.apply(selector.lookupName(dimIndex));
 
         Aggregator[] theAggregators = aggregatesStore.get(key);
         if (theAggregators == null) {
diff --git a/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java
index a8f5d32..5b8a411 100644
--- a/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java
+++ b/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java
@@ -19,7 +19,6 @@
 
 package io.druid.query.topn.types;
 
-import com.google.common.base.Function;
 import io.druid.query.aggregation.Aggregator;
 import io.druid.query.dimension.ColumnSelectorStrategy;
 import io.druid.query.topn.TopNParams;
@@ -27,9 +26,7 @@ import io.druid.query.topn.TopNQuery;
 import io.druid.query.topn.TopNResultBuilder;
 import io.druid.segment.Cursor;
 import io.druid.segment.StorageAdapter;
-import io.druid.segment.column.ValueType;
 
-import javax.annotation.Nullable;
 import java.util.Map;
 
 public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggregateStoreType extends Map>
@@ -39,8 +36,6 @@ public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggr
 
   int getCardinality(ValueSelectorType selector);
 
-  ValueType getValueType();
-
   /**
    * Used by DimExtractionTopNAlgorithm.
    *
@@ -107,13 +102,11 @@ public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggr
    * Read entries from the aggregates store, adding the keys and associated values to the resultBuilder, applying the
    * valueTransformer to the keys if present
    *
-   * @param aggregatesStore  Map created by makeDimExtractionAggregateStore()
-   * @param valueTransformer Converts keys to different types, if null no conversion is needed
-   * @param resultBuilder    TopN result builder
+   * @param aggregatesStore Map created by makeDimExtractionAggregateStore()
+   * @param resultBuilder   TopN result builder
    */
   void updateDimExtractionResults(
       DimExtractionAggregateStoreType aggregatesStore,
-      @Nullable Function<Object, Object> valueTransformer,
       TopNResultBuilder resultBuilder
   );
 }
diff --git a/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java b/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java
index 1d252f6..7dd77bb 100644
--- a/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java
+++ b/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java
@@ -19,6 +19,7 @@
 
 package io.druid.query.topn.types;
 
+import com.google.common.base.Preconditions;
 import io.druid.java.util.common.IAE;
 import io.druid.query.dimension.ColumnSelectorStrategyFactory;
 import io.druid.segment.ColumnValueSelector;
@@ -27,23 +28,39 @@ import io.druid.segment.column.ValueType;
 
 public class TopNColumnSelectorStrategyFactory implements ColumnSelectorStrategyFactory<TopNColumnSelectorStrategy>
 {
+  private final ValueType dimensionType;
+
+  public TopNColumnSelectorStrategyFactory(final ValueType dimensionType)
+  {
+    this.dimensionType = Preconditions.checkNotNull(dimensionType, "dimensionType");
+  }
+
   @Override
   public TopNColumnSelectorStrategy makeColumnSelectorStrategy(
       ColumnCapabilities capabilities, ColumnValueSelector selector
   )
   {
-    ValueType type = capabilities.getType();
-    switch (type) {
+    final ValueType selectorType = capabilities.getType();
+
+    switch (selectorType) {
       case STRING:
-        return new StringTopNColumnSelectorStrategy();
+        // Return strategy that reads strings and outputs dimensionTypes.
+        return new StringTopNColumnSelectorStrategy(dimensionType);
       case LONG:
-        return new NumericTopNColumnSelectorStrategy.OfLong();
       case FLOAT:
-        return new NumericTopNColumnSelectorStrategy.OfFloat();
       case DOUBLE:
-        return new NumericTopNColumnSelectorStrategy.OfDouble();
+        if (ValueType.isNumeric(dimensionType)) {
+          // Return strategy that aggregates using the _output_ type, because this allows us to collapse values
+          // properly (numeric types cannot represent all values of other numeric types).
+          return NumericTopNColumnSelectorStrategy.ofType(dimensionType, dimensionType);
+        } else {
+          // Return strategy that aggregates using the _input_ type. Here we are assuming that the output type can
+          // represent all possible values of the input type. This will be true for STRING, which is the only
+          // non-numeric type currently supported.
+          return NumericTopNColumnSelectorStrategy.ofType(selectorType, dimensionType);
+        }
       default:
-        throw new IAE("Cannot create query type helper from invalid type [%s]", type);
+        throw new IAE("Cannot create query type helper from invalid type [%s]", selectorType);
     }
   }
 }
diff --git a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java
index 5c846b6..375f27c 100644
--- a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java
+++ b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java
@@ -19,12 +19,14 @@
 
 package io.druid.segment;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Doubles;
 import com.google.common.primitives.Floats;
 import io.druid.common.guava.GuavaUtils;
 import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
 import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.guava.Comparators;
 import io.druid.java.util.common.parsers.ParseException;
 import io.druid.query.ColumnSelectorPlus;
 import io.druid.query.dimension.ColumnSelectorStrategy;
@@ -38,6 +40,7 @@ import javax.annotation.Nullable;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Function;
 
 public final class DimensionHandlerUtils
 {
@@ -132,9 +135,10 @@ public final class DimensionHandlerUtils
    * in a query engine. See GroupByStrategyFactory for a reference.
    *
    * @param <ColumnSelectorStrategyClass> The strategy type created by the provided strategy factory.
-   * @param strategyFactory A factory provided by query engines that generates type-handling strategies
-   * @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for
-   * @param columnSelectorFactory Used to create value selectors for columns.
+   * @param strategyFactory               A factory provided by query engines that generates type-handling strategies
+   * @param dimensionSpecs                The set of columns to generate ColumnSelectorPlus objects for
+   * @param columnSelectorFactory         Used to create value selectors for columns.
+   *
    * @return An array of ColumnSelectorPlus objects, in the order of the columns specified in dimensionSpecs
    */
   public static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy>
@@ -238,6 +242,15 @@ public final class DimensionHandlerUtils
   }
 
   @Nullable
+  public static String convertObjectToString(@Nullable Object valObj)
+  {
+    if (valObj == null) {
+      return null;
+    }
+    return valObj.toString();
+  }
+
+  @Nullable
   public static Long convertObjectToLong(@Nullable Object valObj)
   {
     return convertObjectToLong(valObj, false);
@@ -294,6 +307,114 @@ public final class DimensionHandlerUtils
   }
 
   @Nullable
+  public static Comparable<?> convertObjectToType(
+      @Nullable final Object obj,
+      final ValueType type,
+      final boolean reportParseExceptions
+  )
+  {
+    Preconditions.checkNotNull(type, "type");
+
+    switch (type) {
+      case LONG:
+        return convertObjectToLong(obj, reportParseExceptions);
+      case FLOAT:
+        return convertObjectToFloat(obj, reportParseExceptions);
+      case DOUBLE:
+        return convertObjectToDouble(obj, reportParseExceptions);
+      case STRING:
+        return convertObjectToString(obj);
+      default:
+        throw new IAE("Type[%s] is not supported for dimensions!", type);
+    }
+  }
+
+  public static int compareObjectsAsType(
+      @Nullable final Object lhs,
+      @Nullable final Object rhs,
+      final ValueType type
+  )
+  {
+    //noinspection unchecked
+    return Comparators.<Comparable>naturalNullsFirst().compare(
+        convertObjectToType(lhs, type),
+        convertObjectToType(rhs, type)
+    );
+  }
+
+  @Nullable
+  public static Comparable<?> convertObjectToType(
+      @Nullable final Object obj,
+      final ValueType type
+  )
+  {
+    return convertObjectToType(obj, Preconditions.checkNotNull(type, "type"), false);
+  }
+
+  /**
+   * This function only exists in the backport of #6220 to 0.12.x. It won't return nulls.
+   */
+  public static Comparable<?> convertObjectToTypeNonNull(
+      @Nullable final Object obj,
+      final ValueType type
+  )
+  {
+    return nonNullify(convertObjectToType(obj, Preconditions.checkNotNull(type, "type"), false), type);
+  }
+
+  /**
+   * This function only exists in the backport of #6220 to 0.12.x. It won't return nulls.
+   */
+  private static Comparable<?> nonNullify(@Nullable final Comparable<?> obj, final ValueType type)
+  {
+    if (obj == null) {
+      switch (type) {
+        case LONG:
+          return 0L;
+        case DOUBLE:
+          return 0.0d;
+        case FLOAT:
+          return 0.0f;
+        case STRING:
+          return "";
+        default:
+          throw new IAE("Cannot handle type[%s]", type);
+      }
+    } else {
+      return obj;
+    }
+  }
+
+  public static Function<Object, Comparable<?>> converterFromTypeToType(
+      final ValueType fromType,
+      final ValueType toType
+  )
+  {
+    if (fromType == toType) {
+      //noinspection unchecked
+      return (Function) Function.identity();
+    } else {
+      return obj -> convertObjectToType(obj, toType);
+    }
+  }
+
+  /**
+   * This function only exists in the backport of #6220 to 0.12.x. It won't return nulls.
+   */
+  public static Function<Object, Comparable<?>> converterFromTypeToTypeNonNull(
+      final ValueType fromType,
+      final ValueType toType
+  )
+  {
+    if (fromType == toType) {
+      //noinspection unchecked
+      return obj -> nonNullify((Comparable<?>) obj, toType);
+    } else {
+      return obj -> convertObjectToTypeNonNull(obj, toType);
+    }
+  }
+
+  @Nullable
   public static Double convertObjectToDouble(@Nullable Object valObj)
   {
     return convertObjectToDouble(valObj, false);
diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java
index 4896910..34c341e 100644
--- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java
+++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java
@@ -5133,6 +5133,138 @@ public class TopNQueryRunnerTest
   }
 
   @Test
+  public void testSortOnDoubleAsLong()
+  {
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.dataSource)
+        .granularity(QueryRunnerTestHelper.allGran)
+        .dimension(new DefaultDimensionSpec("index", "index_alias", ValueType.LONG))
+        .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+        .threshold(4)
+        .intervals(QueryRunnerTestHelper.fullOnInterval)
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+        new Result<>(
+            DateTimes.of("2011-01-12T00:00:00.000Z"),
+            new TopNResultValue(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 59L)
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 67L)
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 68L)
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 69L)
+                        .build()
+                )
+            )
+        )
+    );
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testSortOnTimeAsLong()
+  {
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.dataSource)
+        .granularity(QueryRunnerTestHelper.allGran)
+        .dimension(new DefaultDimensionSpec("__time", "__time_alias", ValueType.LONG))
+        .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+        .threshold(4)
+        .intervals(QueryRunnerTestHelper.fullOnInterval)
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+        new Result<>(
+            DateTimes.of("2011-01-12T00:00:00.000Z"),
+            new TopNResultValue(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.<String, Object>builder()
+                        .put("__time_alias", DateTimes.of("2011-01-12T00:00:00.000Z").getMillis())
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("__time_alias", DateTimes.of("2011-01-13T00:00:00.000Z").getMillis())
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("__time_alias", DateTimes.of("2011-01-14T00:00:00.000Z").getMillis())
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("__time_alias", DateTimes.of("2011-01-15T00:00:00.000Z").getMillis())
+                        .build()
+                )
+            )
+        )
+    );
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testSortOnStringAsDouble()
+  {
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.dataSource)
+        .granularity(QueryRunnerTestHelper.allGran)
+        .dimension(new DefaultDimensionSpec("market", "alias", ValueType.DOUBLE))
+        .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+        .threshold(4)
+        .intervals(QueryRunnerTestHelper.fullOnInterval)
+        .build();
+
+    final Map<String, Object> nullAliasMap = new HashMap<>();
+    nullAliasMap.put("alias", 0.0d);
+
+    List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+        new Result<>(
+            DateTimes.of("2011-01-12T00:00:00.000Z"),
+            new TopNResultValue(Collections.singletonList(nullAliasMap))
+        )
+    );
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testSortOnDoubleAsDouble()
+  {
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.dataSource)
+        .granularity(QueryRunnerTestHelper.allGran)
+        .dimension(new DefaultDimensionSpec("index", "index_alias", ValueType.DOUBLE))
+        .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+        .threshold(4)
+        .intervals(QueryRunnerTestHelper.fullOnInterval)
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+        new Result<>(
+            DateTimes.of("2011-01-12T00:00:00.000Z"),
+            new TopNResultValue(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 59.021022d)
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 59.266595d)
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 67.73117d)
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 68.573162d)
+                        .build()
+                )
+            )
+        )
+    );
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
   public void testFullOnTopNLongTimeColumnWithExFn()
   {
     String jsFn = "function(str) { return 'super-' + str; }";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org