You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/03/13 20:53:12 UTC

[incubator-druid] branch master updated: Fix time-extraction topN with non-STRING outputType. (#7257)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 98a1b55  Fix time-extraction topN with non-STRING outputType. (#7257)
98a1b55 is described below

commit 98a1b5537f993b050131e0270e80cebc8767d6d8
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Wed Mar 13 16:53:07 2019 -0400

    Fix time-extraction topN with non-STRING outputType. (#7257)
    
    Similar to other bugs fixed in #6220, but this one was missed. This bug would
    cause "extraction" dimensionSpecs on the "__time" column with non-STRING
    outputTypes to potentially be output as STRING sometimes instead of LONG,
    causing incompletely merged results.
---
 .../query/topn/TimeExtractionTopNAlgorithm.java    | 38 ++++++++++++-----
 .../druid/query/topn/TopNQueryRunnerTest.java      | 47 ++++++++++++++++++++++
 2 files changed, 74 insertions(+), 11 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
index 3fd8795..7d4c23c 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
@@ -22,25 +22,36 @@ package org.apache.druid.query.topn;
 import org.apache.druid.query.ColumnSelectorPlus;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.DimensionHandlerUtils;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.ValueType;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Function;
 
-public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<String, Aggregator[]>, TopNParams>
+public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Comparable, Aggregator[]>, TopNParams>
 {
-  public static final int[] EMPTY_INTS = new int[]{};
+  private static final int[] EMPTY_INTS = new int[]{};
+
   private final TopNQuery query;
+  private final Function<Object, Comparable<?>> dimensionValueConverter;
 
   public TimeExtractionTopNAlgorithm(StorageAdapter storageAdapter, TopNQuery query)
   {
     super(storageAdapter);
     this.query = query;
-  }
 
+    // This strategy is used for ExtractionFns on the __time column. They always return STRING, so we need to convert
+    // from STRING to the desired output type.
+    this.dimensionValueConverter = DimensionHandlerUtils.converterFromTypeToType(
+        ValueType.STRING,
+        query.getDimensionSpec().getOutputType()
+    );
+  }
 
   @Override
+  @SuppressWarnings("unchecked")
   public TopNParams makeInitParams(ColumnSelectorPlus selectorPlus, Cursor cursor)
   {
     return new TopNParams(
@@ -63,13 +74,18 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<St
   }
 
   @Override
-  protected Map<String, Aggregator[]> makeDimValAggregateStore(TopNParams params)
+  @SuppressWarnings("unchecked")
+  protected Map<Comparable, Aggregator[]> makeDimValAggregateStore(TopNParams params)
   {
-    return new HashMap<>();
+    return params.getSelectorPlus().getColumnSelectorStrategy().makeDimExtractionAggregateStore();
   }
 
   @Override
-  protected long scanAndAggregate(TopNParams params, int[] dimValSelector, Map<String, Aggregator[]> aggregatesStore)
+  protected long scanAndAggregate(
+      TopNParams params,
+      int[] dimValSelector,
+      Map<Comparable, Aggregator[]> aggregatesStore
+  )
   {
     if (params.getCardinality() < 0) {
       throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
@@ -80,7 +96,7 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<St
 
     long processedRows = 0;
     while (!cursor.isDone()) {
-      final String key = dimSelector.lookupName(dimSelector.getRow().get(0));
+      final Comparable key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));
 
       Aggregator[] theAggregators = aggregatesStore.get(key);
       if (theAggregators == null) {
@@ -102,11 +118,11 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<St
   protected void updateResults(
       TopNParams params,
       int[] dimValSelector,
-      Map<String, Aggregator[]> aggregatesStore,
+      Map<Comparable, Aggregator[]> aggregatesStore,
       TopNResultBuilder resultBuilder
   )
   {
-    for (Map.Entry<String, Aggregator[]> entry : aggregatesStore.entrySet()) {
+    for (Map.Entry<Comparable, Aggregator[]> entry : aggregatesStore.entrySet()) {
       Aggregator[] aggs = entry.getValue();
       if (aggs != null) {
         Object[] vals = new Object[aggs.length];
@@ -124,7 +140,7 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<St
   }
 
   @Override
-  protected void closeAggregators(Map<String, Aggregator[]> stringMap)
+  protected void closeAggregators(Map<Comparable, Aggregator[]> stringMap)
   {
     for (Aggregator[] aggregators : stringMap.values()) {
       for (Aggregator agg : aggregators) {
diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
index 0244372..cf636ec 100644
--- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
@@ -2365,6 +2365,53 @@ public class TopNQueryRunnerTest
   }
 
   @Test
+  public void testTopNDimExtractionTimeToOneLong()
+  {
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.dataSource)
+        .granularity(QueryRunnerTestHelper.allGran)
+        .dimension(
+            new ExtractionDimensionSpec(
+                ColumnHolder.TIME_COLUMN_NAME,
+                "t",
+                ValueType.LONG,
+                new JavaScriptExtractionFn(
+                    "function(f) { return \"42\"; }",
+                    false,
+                    JavaScriptConfig.getEnabledInstance()
+                )
+            )
+        )
+        .metric("rows")
+        .threshold(10)
+        .intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
+        .aggregators(commonAggregators)
+        .postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant))
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+        new Result<>(
+            DateTimes.of("2011-01-12T00:00:00.000Z"),
+            new TopNResultValue(
+                Collections.<Map<String, Object>>singletonList(
+                    ImmutableMap.of(
+                        "addRowsIndexConstant", 504542.5071372986D,
+                        "index", 503332.5071372986D,
+                        "t", 42L,
+                        "uniques", QueryRunnerTestHelper.UNIQUES_9,
+                        "rows", 1209L
+                    )
+                )
+            )
+        )
+    );
+    List<Result<TopNResultValue>> list = runWithMerge(query).toList();
+    Assert.assertEquals(list.size(), 1);
+    Assert.assertEquals("Didn't merge results", list.get(0).getValue().getValue().size(), 1);
+    TestHelper.assertExpectedResults(expectedResults, list, "Failed to match");
+  }
+
+  @Test
   public void testTopNCollapsingDimExtraction()
   {
     TopNQuery query = new TopNQueryBuilder()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org