You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2020/08/01 22:32:31 UTC

[druid] branch master updated: Add vectorization support for the longMin aggregator. (#10211)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 34a4113  Add vectorization support for the longMin aggregator. (#10211)
34a4113 is described below

commit 34a41137522fca463c6af1df5edb2f41297491f5
Author: Abhishek Radhakrishnan <ab...@gmail.com>
AuthorDate: Sat Aug 1 15:32:09 2020 -0700

    Add vectorization support for the longMin aggregator. (#10211)
    
    * Fix minor formatting in docs.
    
    * Add Nullhandling initialization for test to run from IDE.
    
    * Vectorize longMin aggregator.
    
    - A new vectorized class for the vectorized long min aggregator.
    - Changes to AggregatorFactory to support vectorize functionality.
    - Few changes to schema evolution test to add LongMinAggregatorFactory.
    
    * Add longSum to the supported vectorized aggregator implementations.
    
    * Add MIN() long min to calcite query test that can vectorize.
    
    * Add simple long aggregations test.
    
    * Fixup formatting per checkstyle guide.
    
    * fixup and add more tests for long min aggregator.
    
    * Override test for groupBy since timestamps are handled differently.
    
    * Null compatibility check in test.
    
    * Review comment: Add a test case to LongMinAggregationTest.
---
 docs/querying/query-context.md                     |  2 +-
 .../druid/query/aggregation/AggregatorFactory.java |  4 +-
 .../aggregation/LongMinAggregatorFactory.java      | 29 ++++++++
 .../query/aggregation/LongMinVectorAggregator.java | 83 ++++++++++++++++++++++
 .../apache/druid/query/QueryRunnerTestHelper.java  |  3 +
 .../apache/druid/query/SchemaEvolutionTest.java    | 22 +++---
 .../query/aggregation/LongMinAggregationTest.java  | 52 +++++++++++++-
 .../query/groupby/GroupByQueryRunnerTest.java      | 20 +++---
 .../groupby/GroupByTimeseriesQueryRunnerTest.java  | 29 ++++++++
 .../timeseries/TimeseriesQueryRunnerTest.java      | 67 +++++++++++++----
 .../apache/druid/sql/calcite/CalciteQueryTest.java | 31 +++++++-
 11 files changed, 304 insertions(+), 38 deletions(-)

diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md
index d14eeb7..abb430b 100644
--- a/docs/querying/query-context.md
+++ b/docs/querying/query-context.md
@@ -89,7 +89,7 @@ requirements:
 - All query-level filters must either be able to run on bitmap indexes or must offer vectorized row-matchers. These
 include "selector", "bound", "in", "like", "regex", "search", "and", "or", and "not".
 - All filters in filtered aggregators must offer vectorized row-matchers.
-- All aggregators must offer vectorized implementations. These include "count", "doubleSum", "floatSum", "longSum",
+- All aggregators must offer vectorized implementations. These include "count", "doubleSum", "floatSum", "longSum", "longMin",
 "hyperUnique", and "filtered".
 - No virtual columns.
 - For GroupBy: All dimension specs must be "default" (no extraction functions or filtered dimension specs).
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
index 41e55a3..88d6f87 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
@@ -36,7 +36,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * AggregatorFactory is a strategy (in the terms of Design Patterns) that represents column aggregation, e. g. min,
+ * AggregatorFactory is a strategy (in the terms of Design Patterns) that represents column aggregation, e.g. min,
  * max, sum of metric columns, or cardinality of dimension columns (see {@link
  * org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory}).
  * Implementations of {@link AggregatorFactory} which need to Support Nullable Aggregations are encouraged
@@ -46,7 +46,7 @@ import java.util.Map;
  * for them e.g. doubleSum aggregator tries to parse the string value as double and assumes it to be zero if parsing
  * fails.
  * If it is a multi value column then each individual value should be taken into account for aggregation e.g. if a row
- * had value ["1","1","1"] , doubleSum aggregation would take each of them and sum them to 3.
+ * had value ["1","1","1"], doubleSum aggregation would take each of them and sum them to 3.
  */
 @ExtensionPoint
 public abstract class AggregatorFactory implements Cacheable
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/LongMinAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/LongMinAggregatorFactory.java
index 24d82e3..ffa0374 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/LongMinAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/LongMinAggregatorFactory.java
@@ -25,6 +25,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorValueSelector;
 
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
@@ -70,6 +74,31 @@ public class LongMinAggregatorFactory extends SimpleLongAggregatorFactory
   }
 
   @Override
+  protected VectorValueSelector vectorSelector(VectorColumnSelectorFactory columnSelectorFactory)
+  {
+    return columnSelectorFactory.makeValueSelector(fieldName);
+  }
+
+  @Override
+  protected VectorAggregator factorizeVector(
+          VectorColumnSelectorFactory columnSelectorFactory,
+          VectorValueSelector selector
+  )
+  {
+    return new LongMinVectorAggregator(selector);
+  }
+
+  @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    if (fieldName != null) {
+      final ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
+      return expression == null && (capabilities == null || capabilities.getType().isNumeric());
+    }
+    return expression == null;
+  }
+
+  @Override
   @Nullable
   public Object combine(@Nullable Object lhs, @Nullable Object rhs)
   {
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/LongMinVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/LongMinVectorAggregator.java
new file mode 100644
index 0000000..dee5377
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/LongMinVectorAggregator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class LongMinVectorAggregator implements VectorAggregator
+{
+  private final VectorValueSelector selector;
+
+  public LongMinVectorAggregator(final VectorValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void init(final ByteBuffer buf, final int position)
+  {
+    buf.putLong(position, Long.MAX_VALUE);
+  }
+
+  @Override
+  public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+  {
+    final long[] vector = selector.getLongVector();
+
+    long min = buf.getLong(position);
+    for (int i = startRow; i < endRow; i++) {
+      min = Math.min(min, vector[i]);
+    }
+
+    buf.putLong(position, min);
+  }
+
+  @Override
+  public void aggregate(
+      final ByteBuffer buf,
+      final int numRows,
+      final int[] positions,
+      @Nullable final int[] rows,
+      final int positionOffset
+  )
+  {
+    final long[] vector = selector.getLongVector();
+
+    for (int i = 0; i < numRows; i++) {
+      final int position = positions[i] + positionOffset;
+      buf.putLong(position, Math.min(buf.getLong(position), vector[rows != null ? rows[i] : i]));
+    }
+  }
+
+  @Override
+  public Object get(final ByteBuffer buf, final int position)
+  {
+    return buf.getLong(position);
+  }
+
+  @Override
+  public void close()
+  {
+    // Nothing to close.
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
index 474f796..bf1b028 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
@@ -37,6 +37,7 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
 import org.apache.druid.query.aggregation.JavaScriptAggregatorFactory;
+import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
 import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
@@ -113,6 +114,7 @@ public class QueryRunnerTestHelper
   public static final String INDEX_METRIC = "index";
   public static final String UNIQUE_METRIC = "uniques";
   public static final String ADD_ROWS_INDEX_CONSTANT_METRIC = "addRowsIndexConstant";
+  public static final String LONG_MIN_INDEX_METRIC = "longMinIndex";
   public static String dependentPostAggMetric = "dependentPostAgg";
   public static final CountAggregatorFactory ROWS_COUNT = new CountAggregatorFactory("rows");
   public static final LongSumAggregatorFactory INDEX_LONG_SUM = new LongSumAggregatorFactory("index", INDEX_METRIC);
@@ -121,6 +123,7 @@ public class QueryRunnerTestHelper
       "index",
       INDEX_METRIC
   );
+  public static final LongMinAggregatorFactory INDEX_LONG_MIN = new LongMinAggregatorFactory(LONG_MIN_INDEX_METRIC, INDEX_METRIC);
   public static final String JS_COMBINE_A_PLUS_B = "function combine(a, b) { return a + b; }";
   public static final String JS_RESET_0 = "function reset() { return 0; }";
   public static final JavaScriptAggregatorFactory JS_INDEX_SUM_IF_PLACEMENTISH_A = new JavaScriptAggregatorFactory(
diff --git a/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java b/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java
index ce17b38..729923c 100644
--- a/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java
+++ b/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java
@@ -39,6 +39,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
+import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import org.apache.druid.query.expression.TestExprMacroTable;
@@ -162,7 +163,7 @@ public class SchemaEvolutionTest
                          .rows(inputRowsWithDimensions(ImmutableList.of("c1")))
                          .buildMMappedIndex();
 
-    // Index2: c1 is a long, c2 is a string, "uniques" is uniques on c2
+    // Index2: c1 is a long, c2 is a string, "uniques" is uniques on c2, "longmin" is min on c1
     index2 = IndexBuilder.create()
                          .tmpDir(temporaryFolder.newFolder())
                          .schema(
@@ -170,7 +171,8 @@ public class SchemaEvolutionTest
                                  .withMetrics(
                                      new CountAggregatorFactory("cnt"),
                                      new LongSumAggregatorFactory("c1", "c1"),
-                                     new HyperUniquesAggregatorFactory("uniques", "c2")
+                                     new HyperUniquesAggregatorFactory("uniques", "c2"),
+                                     new LongMinAggregatorFactory("longmin", "c1")
                                  )
                                  .withRollup(false)
                                  .build()
@@ -347,6 +349,7 @@ public class SchemaEvolutionTest
                 new LongSumAggregatorFactory("a", "c1"),
                 new DoubleSumAggregatorFactory("b", "c1"),
                 new FloatSumAggregatorFactory("d", "c1"),
+                new LongMinAggregatorFactory("e", "c1"),
                 new CountAggregatorFactory("c")
             )
         )
@@ -355,19 +358,19 @@ public class SchemaEvolutionTest
 
     // Only string(1) -- which we can filter but not aggregate
     Assert.assertEquals(
-        timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.1, "c", 2L, "d", 19.1f)),
+        timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.1, "c", 2L, "d", 19.1f, "e", 9L)),
         runQuery(query, factory, ImmutableList.of(index1))
     );
 
-    // Only long(2) -- which we can filter and aggregate
+     // Only long(2) -- which we can filter and aggregate
     Assert.assertEquals(
-        timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.0, "c", 2L, "d", 19.0f)),
+        timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.0, "c", 2L, "d", 19.0f, "e", 9L)),
         runQuery(query, factory, ImmutableList.of(index2))
     );
 
     // Only float(3) -- which we can't filter, but can aggregate
     Assert.assertEquals(
-        timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.1, "c", 2L, "d", 19.1f)),
+        timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.1, "c", 2L, "d", 19.1f, "e", 9L)),
         runQuery(query, factory, ImmutableList.of(index3))
     );
 
@@ -381,7 +384,9 @@ public class SchemaEvolutionTest
             "c",
             0L,
             "d",
-            NullHandling.defaultFloatValue()
+            NullHandling.defaultFloatValue(),
+            "e",
+            NullHandling.sqlCompatible() ? null : Long.MAX_VALUE
         )),
         runQuery(query, factory, ImmutableList.of(index4))
     );
@@ -392,7 +397,8 @@ public class SchemaEvolutionTest
             "a", 57L,
             "b", 57.2,
             "c", 6L,
-            "d", 57.20000076293945
+            "d", 57.20000076293945,
+            "e", 9L
         )),
         runQuery(query, factory, ImmutableList.of(index1, index2, index3, index4))
     );
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java
index e840d57..00c9ab4 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java
@@ -19,8 +19,13 @@
 
 package org.apache.druid.query.aggregation;
 
+import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorValueSelector;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
@@ -33,25 +38,43 @@ import java.nio.ByteBuffer;
 public class LongMinAggregationTest
 {
   private LongMinAggregatorFactory longMinAggFactory;
+  private LongMinAggregatorFactory longMinVectorAggFactory;
   private ColumnSelectorFactory colSelectorFactory;
+  private VectorColumnSelectorFactory vectorColumnSelectorFactory;
   private TestLongColumnSelector selector;
 
-  private long[] values = {-9223372036854775802L, -9223372036854775803L, -9223372036854775806L, -9223372036854775805L};
+  private final long[] values = {-9223372036854775802L, -9223372036854775803L, -9223372036854775806L, -9223372036854775805L};
+  private final long[] longValues1 = {5L, 2L, 4L, 100L, 1L, 5L, -2L, -3L, 0L, 55L};
 
   public LongMinAggregationTest() throws Exception
   {
     String aggSpecJson = "{\"type\": \"longMin\", \"name\": \"billy\", \"fieldName\": \"nilly\"}";
     longMinAggFactory = TestHelper.makeJsonMapper().readValue(aggSpecJson, LongMinAggregatorFactory.class);
+
+    String vectorAggSpecJson = "{\"type\": \"longMin\", \"name\": \"lng\", \"fieldName\": \"lngFld\"}";
+    longMinVectorAggFactory = TestHelper.makeJsonMapper().readValue(vectorAggSpecJson, LongMinAggregatorFactory.class);
   }
 
   @Before
   public void setup()
   {
+    NullHandling.initializeForTests();
     selector = new TestLongColumnSelector(values);
     colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector);
     EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null);
     EasyMock.replay(colSelectorFactory);
+
+    VectorValueSelector vectorValueSelector = EasyMock.createMock(VectorValueSelector.class);
+    EasyMock.expect(vectorValueSelector.getLongVector()).andReturn(longValues1).anyTimes();
+    EasyMock.expect(vectorValueSelector.getNullVector()).andReturn(null).anyTimes();
+    EasyMock.replay(vectorValueSelector);
+
+    vectorColumnSelectorFactory = EasyMock.createMock(VectorColumnSelectorFactory.class);
+    EasyMock.expect(vectorColumnSelectorFactory.getColumnCapabilities("lngFld"))
+            .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.LONG).setDictionaryEncoded(true)).anyTimes();
+    EasyMock.expect(vectorColumnSelectorFactory.makeValueSelector("lngFld")).andReturn(vectorValueSelector).anyTimes();
+    EasyMock.replay(vectorColumnSelectorFactory);
   }
 
   @Test
@@ -88,6 +111,33 @@ public class LongMinAggregationTest
   }
 
   @Test
+  public void testLongMinVectorAggregator()
+  {
+    // Some sanity.
+    Assert.assertTrue(longMinVectorAggFactory.canVectorize(vectorColumnSelectorFactory));
+    VectorValueSelector vectorValueSelector = longMinVectorAggFactory.vectorSelector(vectorColumnSelectorFactory);
+    Assert.assertEquals(longValues1, vectorValueSelector.getLongVector());
+
+    VectorAggregator vectorAggregator = longMinVectorAggFactory.factorizeVector(vectorColumnSelectorFactory);
+
+    final ByteBuffer buf = ByteBuffer.allocate(longMinAggFactory.getMaxIntermediateSizeWithNulls() * 2);
+    vectorAggregator.init(buf, 0);
+
+    vectorAggregator.aggregate(buf, 0, 0, 3);
+    Assert.assertEquals(longValues1[1], (long) vectorAggregator.get(buf, 0));
+
+    vectorAggregator.aggregate(buf, 1, 0, 3);
+    Assert.assertEquals(longValues1[1], (long) vectorAggregator.get(buf, 1));
+
+    vectorAggregator.aggregate(buf, 2, 3, 7);
+    Assert.assertEquals(longValues1[6], (long) vectorAggregator.get(buf, 2));
+
+    vectorAggregator.aggregate(buf, 0, 0, 10);
+    Assert.assertEquals(longValues1[7], (long) vectorAggregator.get(buf, 0));
+  }
+
+
+  @Test
   public void testCombine()
   {
     Assert.assertEquals(-9223372036854775803L, longMinAggFactory.combine(-9223372036854775800L, -9223372036854775803L));
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index e8ab616..3ab559a 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -4126,7 +4126,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
         .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
         .setInterval("2011-01-25/2011-01-28")
         .setDimensions(new DefaultDimensionSpec("quality", "alias"))
-        .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new DoubleSumAggregatorFactory("index", "index"))
+        .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new DoubleSumAggregatorFactory("index", "index"), QueryRunnerTestHelper.INDEX_LONG_MIN)
         .setGranularity(Granularities.ALL)
         .setHavingSpec(new GreaterThanHavingSpec("index", 310L))
         .setLimitSpec(
@@ -4139,11 +4139,11 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
     GroupByQuery fullQuery = builder.build();
 
     List<ResultRow> expectedResults = Arrays.asList(
-        makeRow(fullQuery, "2011-01-25", "alias", "business", "rows", 3L, "index", 312.38165283203125),
-        makeRow(fullQuery, "2011-01-25", "alias", "news", "rows", 3L, "index", 312.7834167480469),
-        makeRow(fullQuery, "2011-01-25", "alias", "technology", "rows", 3L, "index", 324.6412353515625),
-        makeRow(fullQuery, "2011-01-25", "alias", "travel", "rows", 3L, "index", 393.36322021484375),
-        makeRow(fullQuery, "2011-01-25", "alias", "health", "rows", 3L, "index", 511.2996826171875)
+        makeRow(fullQuery, "2011-01-25", "alias", "business", "rows", 3L, "index", 312.38165283203125, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 101L),
+        makeRow(fullQuery, "2011-01-25", "alias", "news", "rows", 3L, "index", 312.7834167480469, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 102L),
+        makeRow(fullQuery, "2011-01-25", "alias", "technology", "rows", 3L, "index", 324.6412353515625, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 102L),
+        makeRow(fullQuery, "2011-01-25", "alias", "travel", "rows", 3L, "index", 393.36322021484375, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 122L),
+        makeRow(fullQuery, "2011-01-25", "alias", "health", "rows", 3L, "index", 511.2996826171875, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 159L)
     );
 
     Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, fullQuery);
@@ -4264,16 +4264,16 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
         .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
         .setInterval("2011-04-02/2011-04-04")
         .setDimensions(new DefaultDimensionSpec("quality", "alias"))
-        .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
+        .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"), QueryRunnerTestHelper.INDEX_LONG_MIN)
         .setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
         .setHavingSpec(havingSpec);
 
     final GroupByQuery fullQuery = builder.build();
 
     List<ResultRow> expectedResults = Arrays.asList(
-        makeRow(fullQuery, "2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
-        makeRow(fullQuery, "2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
-        makeRow(fullQuery, "2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L)
+        makeRow(fullQuery, "2011-04-01", "alias", "business", "rows", 2L, "idx", 217L, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 105L),
+        makeRow(fullQuery, "2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 107L),
+        makeRow(fullQuery, "2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 122L)
     );
 
     TestHelper.assertExpectedObjects(
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
index a189e6a..4e3f63b 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
@@ -215,6 +215,35 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
     Assert.assertEquals(result.toString(), 59.021022, value.getDoubleMetric("minIndex"), 59.021022 * 1e-6);
   }
 
+  // GroupBy handles timestamps differently when granularity is ALL
+  @Override
+  @Test
+  public void testFullOnTimeseriesLongMin()
+  {
+    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                  .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+                                  .granularity(Granularities.ALL)
+                                  .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
+                                  .aggregators(
+                                      QueryRunnerTestHelper.INDEX_LONG_MIN
+                                  )
+                                  .descending(descending)
+                                  .build();
+
+    DateTime expectedEarliest = DateTimes.of("1970-01-01");
+    DateTime expectedLast = DateTimes.of("2011-04-15");
+
+
+    Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query)).toList();
+    Result<TimeseriesResultValue> result = results.iterator().next();
+
+    Assert.assertEquals(expectedEarliest, result.getTimestamp());
+    Assert.assertFalse(
+        StringUtils.format("Timestamp[%s] > expectedLast[%s]", result.getTimestamp(), expectedLast),
+        result.getTimestamp().isAfter(expectedLast)
+    );
+    Assert.assertEquals(59L, (long) result.getValue().getLongMetric(QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC));
+  }
 
   @Override
   public void testEmptyTimeseries()
diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
index 4579028..ae4ab83 100644
--- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
@@ -371,6 +371,34 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testFullOnTimeseriesLongMin()
+  {
+    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                  .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+                                  .granularity(Granularities.ALL)
+                                  .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
+                                  .aggregators(
+                                      QueryRunnerTestHelper.INDEX_LONG_MIN
+                                  )
+                                  .descending(descending)
+                                  .context(makeContext())
+                                  .build();
+
+    DateTime expectedEarliest = DateTimes.of("2011-01-12");
+    DateTime expectedLast = DateTimes.of("2011-04-15");
+
+    Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query)).toList();
+    Result<TimeseriesResultValue> result = results.iterator().next();
+    Assert.assertEquals(expectedEarliest, result.getTimestamp());
+    Assert.assertFalse(
+        StringUtils.format("Timestamp[%s] > expectedLast[%s]", result.getTimestamp(), expectedLast),
+        result.getTimestamp().isAfter(expectedLast)
+    );
+
+    Assert.assertEquals(59L, (long) result.getValue().getLongMetric(QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC));
+  }
+
+  @Test
   public void testFullOnTimeseriesWithFilter()
   {
 
@@ -439,7 +467,8 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
                                               "idx",
                                               "index"
                                           ),
-                                          QueryRunnerTestHelper.QUALITY_UNIQUES
+                                          QueryRunnerTestHelper.QUALITY_UNIQUES,
+                                          QueryRunnerTestHelper.INDEX_LONG_MIN
                                       )
                                   )
                                   .descending(descending)
@@ -450,19 +479,18 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
         new Result<>(
             DateTimes.of("2011-04-01"),
             new TimeseriesResultValue(
-                ImmutableMap.of("rows", 13L, "idx", 6619L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
+                ImmutableMap.of("rows", 13L, "idx", 6619L, "uniques", QueryRunnerTestHelper.UNIQUES_9, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 78L)
             )
         ),
         new Result<>(
             DateTimes.of("2011-04-02"),
             new TimeseriesResultValue(
-                ImmutableMap.of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
+                ImmutableMap.of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 97L)
             )
         )
     );
 
     Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query)).toList();
-
     assertExpectedResults(expectedResults, results);
   }
 
@@ -477,7 +505,8 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
                                       Arrays.asList(
                                           QueryRunnerTestHelper.ROWS_COUNT,
                                           QueryRunnerTestHelper.INDEX_LONG_SUM,
-                                          QueryRunnerTestHelper.QUALITY_UNIQUES
+                                          QueryRunnerTestHelper.QUALITY_UNIQUES,
+                                          QueryRunnerTestHelper.INDEX_LONG_MIN
                                       )
                                   )
                                   .postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
@@ -499,7 +528,9 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
                     "uniques",
                     QueryRunnerTestHelper.UNIQUES_9,
                     QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT_METRIC,
-                    6633.0
+                    6633.0,
+                    QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
+                    78L
                 )
             )
         )
@@ -517,7 +548,9 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
                     "uniques",
                     QueryRunnerTestHelper.UNIQUES_9,
                     QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT_METRIC,
-                    5841.0
+                    5841.0,
+                    QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
+                    97L
                 )
             )
         )
@@ -539,7 +572,9 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
                     "uniques",
                     QueryRunnerTestHelper.UNIQUES_9,
                     QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT_METRIC,
-                    12473.0
+                    12473.0,
+                    QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
+                    78L
                 )
             )
         )
@@ -569,7 +604,8 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
                                   .aggregators(
                                       Arrays.asList(
                                           QueryRunnerTestHelper.ROWS_COUNT,
-                                          QueryRunnerTestHelper.INDEX_LONG_SUM
+                                          QueryRunnerTestHelper.INDEX_LONG_SUM,
+                                          QueryRunnerTestHelper.INDEX_LONG_MIN
                                       )
                                   )
                                   .postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
@@ -588,7 +624,9 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
                     "index",
                     NullHandling.defaultLongValue(),
                     QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT_METRIC,
-                    NullHandling.sqlCompatible() ? null : 1.0
+                    NullHandling.sqlCompatible() ? null : 1.0,
+                    QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
+                    NullHandling.sqlCompatible() ? null : Long.MAX_VALUE
                 )
             )
         )
@@ -1141,7 +1179,8 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
                                   .aggregators(
                                       QueryRunnerTestHelper.ROWS_COUNT,
                                       QueryRunnerTestHelper.INDEX_LONG_SUM,
-                                      QueryRunnerTestHelper.QUALITY_UNIQUES
+                                      QueryRunnerTestHelper.QUALITY_UNIQUES,
+                                      QueryRunnerTestHelper.INDEX_LONG_MIN
                                   )
                                   .postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
                                   .descending(descending)
@@ -1156,7 +1195,8 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
                     "rows", 9L,
                     "index", 1102L,
                     "addRowsIndexConstant", 1112.0,
-                    "uniques", QueryRunnerTestHelper.UNIQUES_9
+                    "uniques", QueryRunnerTestHelper.UNIQUES_9,
+                    QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 78L
                 )
             )
         ),
@@ -1167,7 +1207,8 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
                     "rows", 9L,
                     "index", 1120L,
                     "addRowsIndexConstant", 1130.0,
-                    "uniques", QueryRunnerTestHelper.UNIQUES_9
+                    "uniques", QueryRunnerTestHelper.UNIQUES_9,
+                    QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 97L
                 )
             )
         )
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index e8fda2a..74901df 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -4382,7 +4382,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
   public void testGroupByWithGroupByEmpty() throws Exception
   {
     testQuery(
-        "SELECT COUNT(*), SUM(cnt) FROM druid.foo GROUP BY ()",
+        "SELECT COUNT(*), SUM(cnt), MIN(cnt) FROM druid.foo GROUP BY ()",
         ImmutableList.of(
             Druids.newTimeseriesQueryBuilder()
                   .dataSource(CalciteTests.DATASOURCE1)
@@ -4390,12 +4390,13 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                   .granularity(Granularities.ALL)
                   .aggregators(aggregators(
                       new CountAggregatorFactory("a0"),
-                      new LongSumAggregatorFactory("a1", "cnt")
+                      new LongSumAggregatorFactory("a1", "cnt"),
+                      new LongMinAggregatorFactory("a2", "cnt")
                   ))
                   .context(TIMESERIES_CONTEXT_DEFAULT)
                   .build()
         ),
-        ImmutableList.of(new Object[]{6L, 6L})
+        ImmutableList.of(new Object[]{6L, 6L, 1L})
     );
   }
 
@@ -4797,6 +4798,30 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
   }
 
   @Test
+  public void testSimpleLongAggregations() throws Exception
+  {
+    testQuery(
+        "SELECT  MIN(l1), MIN(cnt) FROM druid.numfoo",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                .dataSource(CalciteTests.DATASOURCE3)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .granularity(Granularities.ALL)
+                .aggregators(aggregators(
+                                new LongMinAggregatorFactory("a0", "l1"),
+                                new LongMinAggregatorFactory("a1", "cnt")
+                            ))
+                .context(TIMESERIES_CONTEXT_DEFAULT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{0L, 1L}
+        )
+    );
+  }
+
+
+  @Test
   public void testSimpleAggregations() throws Exception
   {
     // Cannot vectorize due to "longMax" aggregator.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org