You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2020/08/01 22:32:31 UTC
[druid] branch master updated: Add vectorization support for the
longMin aggregator. (#10211)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 34a4113 Add vectorization support for the longMin aggregator. (#10211)
34a4113 is described below
commit 34a41137522fca463c6af1df5edb2f41297491f5
Author: Abhishek Radhakrishnan <ab...@gmail.com>
AuthorDate: Sat Aug 1 15:32:09 2020 -0700
Add vectorization support for the longMin aggregator. (#10211)
* Fix minor formatting in docs.
* Add Nullhandling initialization for test to run from IDE.
* Vectorize longMin aggregator.
- A new vectorized class for the vectorized long min aggregator.
- Changes to AggregatorFactory to support vectorize functionality.
- Few changes to schema evolution test to add LongMinAggregatorFactory.
* Add longSum to the supported vectorized aggregator implementations.
* Add MIN() long min to calcite query test that can vectorize.
* Add simple long aggregations test.
* Fixup formatting per checkstyle guide.
* fixup and add more tests for long min aggregator.
* Override test for groupBy since timestamps are handled differently.
* Null compatibility check in test.
* Review comment: Add a test case to LongMinAggregationTest.
---
docs/querying/query-context.md | 2 +-
.../druid/query/aggregation/AggregatorFactory.java | 4 +-
.../aggregation/LongMinAggregatorFactory.java | 29 ++++++++
.../query/aggregation/LongMinVectorAggregator.java | 83 ++++++++++++++++++++++
.../apache/druid/query/QueryRunnerTestHelper.java | 3 +
.../apache/druid/query/SchemaEvolutionTest.java | 22 +++---
.../query/aggregation/LongMinAggregationTest.java | 52 +++++++++++++-
.../query/groupby/GroupByQueryRunnerTest.java | 20 +++---
.../groupby/GroupByTimeseriesQueryRunnerTest.java | 29 ++++++++
.../timeseries/TimeseriesQueryRunnerTest.java | 67 +++++++++++++----
.../apache/druid/sql/calcite/CalciteQueryTest.java | 31 +++++++-
11 files changed, 304 insertions(+), 38 deletions(-)
diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md
index d14eeb7..abb430b 100644
--- a/docs/querying/query-context.md
+++ b/docs/querying/query-context.md
@@ -89,7 +89,7 @@ requirements:
- All query-level filters must either be able to run on bitmap indexes or must offer vectorized row-matchers. These
include "selector", "bound", "in", "like", "regex", "search", "and", "or", and "not".
- All filters in filtered aggregators must offer vectorized row-matchers.
-- All aggregators must offer vectorized implementations. These include "count", "doubleSum", "floatSum", "longSum",
+- All aggregators must offer vectorized implementations. These include "count", "doubleSum", "floatSum", "longSum", "longMin",
"hyperUnique", and "filtered".
- No virtual columns.
- For GroupBy: All dimension specs must be "default" (no extraction functions or filtered dimension specs).
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
index 41e55a3..88d6f87 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
@@ -36,7 +36,7 @@ import java.util.List;
import java.util.Map;
/**
- * AggregatorFactory is a strategy (in the terms of Design Patterns) that represents column aggregation, e. g. min,
+ * AggregatorFactory is a strategy (in the terms of Design Patterns) that represents column aggregation, e.g. min,
* max, sum of metric columns, or cardinality of dimension columns (see {@link
* org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory}).
* Implementations of {@link AggregatorFactory} which need to Support Nullable Aggregations are encouraged
@@ -46,7 +46,7 @@ import java.util.Map;
* for them e.g. doubleSum aggregator tries to parse the string value as double and assumes it to be zero if parsing
* fails.
* If it is a multi value column then each individual value should be taken into account for aggregation e.g. if a row
- * had value ["1","1","1"] , doubleSum aggregation would take each of them and sum them to 3.
+ * had value ["1","1","1"], doubleSum aggregation would take each of them and sum them to 3.
*/
@ExtensionPoint
public abstract class AggregatorFactory implements Cacheable
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/LongMinAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/LongMinAggregatorFactory.java
index 24d82e3..ffa0374 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/LongMinAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/LongMinAggregatorFactory.java
@@ -25,6 +25,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@@ -70,6 +74,31 @@ public class LongMinAggregatorFactory extends SimpleLongAggregatorFactory
}
@Override
+ protected VectorValueSelector vectorSelector(VectorColumnSelectorFactory columnSelectorFactory)
+ {
+ return columnSelectorFactory.makeValueSelector(fieldName);
+ }
+
+ @Override
+ protected VectorAggregator factorizeVector(
+ VectorColumnSelectorFactory columnSelectorFactory,
+ VectorValueSelector selector
+ )
+ {
+ return new LongMinVectorAggregator(selector);
+ }
+
+ @Override
+ public boolean canVectorize(ColumnInspector columnInspector)
+ {
+ if (fieldName != null) {
+ final ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
+ return expression == null && (capabilities == null || capabilities.getType().isNumeric());
+ }
+ return expression == null;
+ }
+
+ @Override
@Nullable
public Object combine(@Nullable Object lhs, @Nullable Object rhs)
{
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/LongMinVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/LongMinVectorAggregator.java
new file mode 100644
index 0000000..dee5377
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/LongMinVectorAggregator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class LongMinVectorAggregator implements VectorAggregator
+{
+ private final VectorValueSelector selector;
+
+ public LongMinVectorAggregator(final VectorValueSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public void init(final ByteBuffer buf, final int position)
+ {
+ buf.putLong(position, Long.MAX_VALUE);
+ }
+
+ @Override
+ public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+ {
+ final long[] vector = selector.getLongVector();
+
+ long min = buf.getLong(position);
+ for (int i = startRow; i < endRow; i++) {
+ min = Math.min(min, vector[i]);
+ }
+
+ buf.putLong(position, min);
+ }
+
+ @Override
+ public void aggregate(
+ final ByteBuffer buf,
+ final int numRows,
+ final int[] positions,
+ @Nullable final int[] rows,
+ final int positionOffset
+ )
+ {
+ final long[] vector = selector.getLongVector();
+
+ for (int i = 0; i < numRows; i++) {
+ final int position = positions[i] + positionOffset;
+ buf.putLong(position, Math.min(buf.getLong(position), vector[rows != null ? rows[i] : i]));
+ }
+ }
+
+ @Override
+ public Object get(final ByteBuffer buf, final int position)
+ {
+ return buf.getLong(position);
+ }
+
+ @Override
+ public void close()
+ {
+ // Nothing to close.
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
index 474f796..bf1b028 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
@@ -37,6 +37,7 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.JavaScriptAggregatorFactory;
+import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
@@ -113,6 +114,7 @@ public class QueryRunnerTestHelper
public static final String INDEX_METRIC = "index";
public static final String UNIQUE_METRIC = "uniques";
public static final String ADD_ROWS_INDEX_CONSTANT_METRIC = "addRowsIndexConstant";
+ public static final String LONG_MIN_INDEX_METRIC = "longMinIndex";
public static String dependentPostAggMetric = "dependentPostAgg";
public static final CountAggregatorFactory ROWS_COUNT = new CountAggregatorFactory("rows");
public static final LongSumAggregatorFactory INDEX_LONG_SUM = new LongSumAggregatorFactory("index", INDEX_METRIC);
@@ -121,6 +123,7 @@ public class QueryRunnerTestHelper
"index",
INDEX_METRIC
);
+ public static final LongMinAggregatorFactory INDEX_LONG_MIN = new LongMinAggregatorFactory(LONG_MIN_INDEX_METRIC, INDEX_METRIC);
public static final String JS_COMBINE_A_PLUS_B = "function combine(a, b) { return a + b; }";
public static final String JS_RESET_0 = "function reset() { return 0; }";
public static final JavaScriptAggregatorFactory JS_INDEX_SUM_IF_PLACEMENTISH_A = new JavaScriptAggregatorFactory(
diff --git a/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java b/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java
index ce17b38..729923c 100644
--- a/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java
+++ b/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java
@@ -39,6 +39,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
+import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable;
@@ -162,7 +163,7 @@ public class SchemaEvolutionTest
.rows(inputRowsWithDimensions(ImmutableList.of("c1")))
.buildMMappedIndex();
- // Index2: c1 is a long, c2 is a string, "uniques" is uniques on c2
+ // Index2: c1 is a long, c2 is a string, "uniques" is uniques on c2, "longmin" is min on c1
index2 = IndexBuilder.create()
.tmpDir(temporaryFolder.newFolder())
.schema(
@@ -170,7 +171,8 @@ public class SchemaEvolutionTest
.withMetrics(
new CountAggregatorFactory("cnt"),
new LongSumAggregatorFactory("c1", "c1"),
- new HyperUniquesAggregatorFactory("uniques", "c2")
+ new HyperUniquesAggregatorFactory("uniques", "c2"),
+ new LongMinAggregatorFactory("longmin", "c1")
)
.withRollup(false)
.build()
@@ -347,6 +349,7 @@ public class SchemaEvolutionTest
new LongSumAggregatorFactory("a", "c1"),
new DoubleSumAggregatorFactory("b", "c1"),
new FloatSumAggregatorFactory("d", "c1"),
+ new LongMinAggregatorFactory("e", "c1"),
new CountAggregatorFactory("c")
)
)
@@ -355,19 +358,19 @@ public class SchemaEvolutionTest
// Only string(1) -- which we can filter but not aggregate
Assert.assertEquals(
- timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.1, "c", 2L, "d", 19.1f)),
+ timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.1, "c", 2L, "d", 19.1f, "e", 9L)),
runQuery(query, factory, ImmutableList.of(index1))
);
- // Only long(2) -- which we can filter and aggregate
+ // Only long(2) -- which we can filter and aggregate
Assert.assertEquals(
- timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.0, "c", 2L, "d", 19.0f)),
+ timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.0, "c", 2L, "d", 19.0f, "e", 9L)),
runQuery(query, factory, ImmutableList.of(index2))
);
// Only float(3) -- which we can't filter, but can aggregate
Assert.assertEquals(
- timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.1, "c", 2L, "d", 19.1f)),
+ timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.1, "c", 2L, "d", 19.1f, "e", 9L)),
runQuery(query, factory, ImmutableList.of(index3))
);
@@ -381,7 +384,9 @@ public class SchemaEvolutionTest
"c",
0L,
"d",
- NullHandling.defaultFloatValue()
+ NullHandling.defaultFloatValue(),
+ "e",
+ NullHandling.sqlCompatible() ? null : Long.MAX_VALUE
)),
runQuery(query, factory, ImmutableList.of(index4))
);
@@ -392,7 +397,8 @@ public class SchemaEvolutionTest
"a", 57L,
"b", 57.2,
"c", 6L,
- "d", 57.20000076293945
+ "d", 57.20000076293945,
+ "e", 9L
)),
runQuery(query, factory, ImmutableList.of(index1, index2, index3, index4))
);
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java
index e840d57..00c9ab4 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java
@@ -19,8 +19,13 @@
package org.apache.druid.query.aggregation;
+import org.apache.druid.common.config.NullHandling;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorValueSelector;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
@@ -33,25 +38,43 @@ import java.nio.ByteBuffer;
public class LongMinAggregationTest
{
private LongMinAggregatorFactory longMinAggFactory;
+ private LongMinAggregatorFactory longMinVectorAggFactory;
private ColumnSelectorFactory colSelectorFactory;
+ private VectorColumnSelectorFactory vectorColumnSelectorFactory;
private TestLongColumnSelector selector;
- private long[] values = {-9223372036854775802L, -9223372036854775803L, -9223372036854775806L, -9223372036854775805L};
+ private final long[] values = {-9223372036854775802L, -9223372036854775803L, -9223372036854775806L, -9223372036854775805L};
+ private final long[] longValues1 = {5L, 2L, 4L, 100L, 1L, 5L, -2L, -3L, 0L, 55L};
public LongMinAggregationTest() throws Exception
{
String aggSpecJson = "{\"type\": \"longMin\", \"name\": \"billy\", \"fieldName\": \"nilly\"}";
longMinAggFactory = TestHelper.makeJsonMapper().readValue(aggSpecJson, LongMinAggregatorFactory.class);
+
+ String vectorAggSpecJson = "{\"type\": \"longMin\", \"name\": \"lng\", \"fieldName\": \"lngFld\"}";
+ longMinVectorAggFactory = TestHelper.makeJsonMapper().readValue(vectorAggSpecJson, LongMinAggregatorFactory.class);
}
@Before
public void setup()
{
+ NullHandling.initializeForTests();
selector = new TestLongColumnSelector(values);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector);
EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null);
EasyMock.replay(colSelectorFactory);
+
+ VectorValueSelector vectorValueSelector = EasyMock.createMock(VectorValueSelector.class);
+ EasyMock.expect(vectorValueSelector.getLongVector()).andReturn(longValues1).anyTimes();
+ EasyMock.expect(vectorValueSelector.getNullVector()).andReturn(null).anyTimes();
+ EasyMock.replay(vectorValueSelector);
+
+ vectorColumnSelectorFactory = EasyMock.createMock(VectorColumnSelectorFactory.class);
+ EasyMock.expect(vectorColumnSelectorFactory.getColumnCapabilities("lngFld"))
+ .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.LONG).setDictionaryEncoded(true)).anyTimes();
+ EasyMock.expect(vectorColumnSelectorFactory.makeValueSelector("lngFld")).andReturn(vectorValueSelector).anyTimes();
+ EasyMock.replay(vectorColumnSelectorFactory);
}
@Test
@@ -88,6 +111,33 @@ public class LongMinAggregationTest
}
@Test
+ public void testLongMinVectorAggregator()
+ {
+ // Some sanity.
+ Assert.assertTrue(longMinVectorAggFactory.canVectorize(vectorColumnSelectorFactory));
+ VectorValueSelector vectorValueSelector = longMinVectorAggFactory.vectorSelector(vectorColumnSelectorFactory);
+ Assert.assertEquals(longValues1, vectorValueSelector.getLongVector());
+
+ VectorAggregator vectorAggregator = longMinVectorAggFactory.factorizeVector(vectorColumnSelectorFactory);
+
+ final ByteBuffer buf = ByteBuffer.allocate(longMinAggFactory.getMaxIntermediateSizeWithNulls() * 2);
+ vectorAggregator.init(buf, 0);
+
+ vectorAggregator.aggregate(buf, 0, 0, 3);
+ Assert.assertEquals(longValues1[1], (long) vectorAggregator.get(buf, 0));
+
+ vectorAggregator.aggregate(buf, 1, 0, 3);
+ Assert.assertEquals(longValues1[1], (long) vectorAggregator.get(buf, 1));
+
+ vectorAggregator.aggregate(buf, 2, 3, 7);
+ Assert.assertEquals(longValues1[6], (long) vectorAggregator.get(buf, 2));
+
+ vectorAggregator.aggregate(buf, 0, 0, 10);
+ Assert.assertEquals(longValues1[7], (long) vectorAggregator.get(buf, 0));
+ }
+
+
+ @Test
public void testCombine()
{
Assert.assertEquals(-9223372036854775803L, longMinAggFactory.combine(-9223372036854775800L, -9223372036854775803L));
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index e8ab616..3ab559a 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -4126,7 +4126,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setInterval("2011-01-25/2011-01-28")
.setDimensions(new DefaultDimensionSpec("quality", "alias"))
- .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new DoubleSumAggregatorFactory("index", "index"))
+ .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new DoubleSumAggregatorFactory("index", "index"), QueryRunnerTestHelper.INDEX_LONG_MIN)
.setGranularity(Granularities.ALL)
.setHavingSpec(new GreaterThanHavingSpec("index", 310L))
.setLimitSpec(
@@ -4139,11 +4139,11 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
GroupByQuery fullQuery = builder.build();
List<ResultRow> expectedResults = Arrays.asList(
- makeRow(fullQuery, "2011-01-25", "alias", "business", "rows", 3L, "index", 312.38165283203125),
- makeRow(fullQuery, "2011-01-25", "alias", "news", "rows", 3L, "index", 312.7834167480469),
- makeRow(fullQuery, "2011-01-25", "alias", "technology", "rows", 3L, "index", 324.6412353515625),
- makeRow(fullQuery, "2011-01-25", "alias", "travel", "rows", 3L, "index", 393.36322021484375),
- makeRow(fullQuery, "2011-01-25", "alias", "health", "rows", 3L, "index", 511.2996826171875)
+ makeRow(fullQuery, "2011-01-25", "alias", "business", "rows", 3L, "index", 312.38165283203125, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 101L),
+ makeRow(fullQuery, "2011-01-25", "alias", "news", "rows", 3L, "index", 312.7834167480469, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 102L),
+ makeRow(fullQuery, "2011-01-25", "alias", "technology", "rows", 3L, "index", 324.6412353515625, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 102L),
+ makeRow(fullQuery, "2011-01-25", "alias", "travel", "rows", 3L, "index", 393.36322021484375, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 122L),
+ makeRow(fullQuery, "2011-01-25", "alias", "health", "rows", 3L, "index", 511.2996826171875, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 159L)
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, fullQuery);
@@ -4264,16 +4264,16 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(new DefaultDimensionSpec("quality", "alias"))
- .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
+ .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"), QueryRunnerTestHelper.INDEX_LONG_MIN)
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
.setHavingSpec(havingSpec);
final GroupByQuery fullQuery = builder.build();
List<ResultRow> expectedResults = Arrays.asList(
- makeRow(fullQuery, "2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
- makeRow(fullQuery, "2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
- makeRow(fullQuery, "2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L)
+ makeRow(fullQuery, "2011-04-01", "alias", "business", "rows", 2L, "idx", 217L, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 105L),
+ makeRow(fullQuery, "2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 107L),
+ makeRow(fullQuery, "2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 122L)
);
TestHelper.assertExpectedObjects(
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
index a189e6a..4e3f63b 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
@@ -215,6 +215,35 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
Assert.assertEquals(result.toString(), 59.021022, value.getDoubleMetric("minIndex"), 59.021022 * 1e-6);
}
+ // GroupBy handles timestamps differently when granularity is ALL
+ @Override
+ @Test
+ public void testFullOnTimeseriesLongMin()
+ {
+ TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+ .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .granularity(Granularities.ALL)
+ .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
+ .aggregators(
+ QueryRunnerTestHelper.INDEX_LONG_MIN
+ )
+ .descending(descending)
+ .build();
+
+ DateTime expectedEarliest = DateTimes.of("1970-01-01");
+ DateTime expectedLast = DateTimes.of("2011-04-15");
+
+
+ Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query)).toList();
+ Result<TimeseriesResultValue> result = results.iterator().next();
+
+ Assert.assertEquals(expectedEarliest, result.getTimestamp());
+ Assert.assertFalse(
+ StringUtils.format("Timestamp[%s] > expectedLast[%s]", result.getTimestamp(), expectedLast),
+ result.getTimestamp().isAfter(expectedLast)
+ );
+ Assert.assertEquals(59L, (long) result.getValue().getLongMetric(QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC));
+ }
@Override
public void testEmptyTimeseries()
diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
index 4579028..ae4ab83 100644
--- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
@@ -371,6 +371,34 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
}
@Test
+ public void testFullOnTimeseriesLongMin()
+ {
+ TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+ .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .granularity(Granularities.ALL)
+ .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
+ .aggregators(
+ QueryRunnerTestHelper.INDEX_LONG_MIN
+ )
+ .descending(descending)
+ .context(makeContext())
+ .build();
+
+ DateTime expectedEarliest = DateTimes.of("2011-01-12");
+ DateTime expectedLast = DateTimes.of("2011-04-15");
+
+ Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query)).toList();
+ Result<TimeseriesResultValue> result = results.iterator().next();
+ Assert.assertEquals(expectedEarliest, result.getTimestamp());
+ Assert.assertFalse(
+ StringUtils.format("Timestamp[%s] > expectedLast[%s]", result.getTimestamp(), expectedLast),
+ result.getTimestamp().isAfter(expectedLast)
+ );
+
+ Assert.assertEquals(59L, (long) result.getValue().getLongMetric(QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC));
+ }
+
+ @Test
public void testFullOnTimeseriesWithFilter()
{
@@ -439,7 +467,8 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
"idx",
"index"
),
- QueryRunnerTestHelper.QUALITY_UNIQUES
+ QueryRunnerTestHelper.QUALITY_UNIQUES,
+ QueryRunnerTestHelper.INDEX_LONG_MIN
)
)
.descending(descending)
@@ -450,19 +479,18 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
new Result<>(
DateTimes.of("2011-04-01"),
new TimeseriesResultValue(
- ImmutableMap.of("rows", 13L, "idx", 6619L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
+ ImmutableMap.of("rows", 13L, "idx", 6619L, "uniques", QueryRunnerTestHelper.UNIQUES_9, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 78L)
)
),
new Result<>(
DateTimes.of("2011-04-02"),
new TimeseriesResultValue(
- ImmutableMap.of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
+ ImmutableMap.of("rows", 13L, "idx", 5827L, "uniques", QueryRunnerTestHelper.UNIQUES_9, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 97L)
)
)
);
Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query)).toList();
-
assertExpectedResults(expectedResults, results);
}
@@ -477,7 +505,8 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
Arrays.asList(
QueryRunnerTestHelper.ROWS_COUNT,
QueryRunnerTestHelper.INDEX_LONG_SUM,
- QueryRunnerTestHelper.QUALITY_UNIQUES
+ QueryRunnerTestHelper.QUALITY_UNIQUES,
+ QueryRunnerTestHelper.INDEX_LONG_MIN
)
)
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
@@ -499,7 +528,9 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
"uniques",
QueryRunnerTestHelper.UNIQUES_9,
QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT_METRIC,
- 6633.0
+ 6633.0,
+ QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
+ 78L
)
)
)
@@ -517,7 +548,9 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
"uniques",
QueryRunnerTestHelper.UNIQUES_9,
QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT_METRIC,
- 5841.0
+ 5841.0,
+ QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
+ 97L
)
)
)
@@ -539,7 +572,9 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
"uniques",
QueryRunnerTestHelper.UNIQUES_9,
QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT_METRIC,
- 12473.0
+ 12473.0,
+ QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
+ 78L
)
)
)
@@ -569,7 +604,8 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
.aggregators(
Arrays.asList(
QueryRunnerTestHelper.ROWS_COUNT,
- QueryRunnerTestHelper.INDEX_LONG_SUM
+ QueryRunnerTestHelper.INDEX_LONG_SUM,
+ QueryRunnerTestHelper.INDEX_LONG_MIN
)
)
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
@@ -588,7 +624,9 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
"index",
NullHandling.defaultLongValue(),
QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT_METRIC,
- NullHandling.sqlCompatible() ? null : 1.0
+ NullHandling.sqlCompatible() ? null : 1.0,
+ QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
+ NullHandling.sqlCompatible() ? null : Long.MAX_VALUE
)
)
)
@@ -1141,7 +1179,8 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
.aggregators(
QueryRunnerTestHelper.ROWS_COUNT,
QueryRunnerTestHelper.INDEX_LONG_SUM,
- QueryRunnerTestHelper.QUALITY_UNIQUES
+ QueryRunnerTestHelper.QUALITY_UNIQUES,
+ QueryRunnerTestHelper.INDEX_LONG_MIN
)
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
.descending(descending)
@@ -1156,7 +1195,8 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
"rows", 9L,
"index", 1102L,
"addRowsIndexConstant", 1112.0,
- "uniques", QueryRunnerTestHelper.UNIQUES_9
+ "uniques", QueryRunnerTestHelper.UNIQUES_9,
+ QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 78L
)
)
),
@@ -1167,7 +1207,8 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
"rows", 9L,
"index", 1120L,
"addRowsIndexConstant", 1130.0,
- "uniques", QueryRunnerTestHelper.UNIQUES_9
+ "uniques", QueryRunnerTestHelper.UNIQUES_9,
+ QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 97L
)
)
)
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index e8fda2a..74901df 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -4382,7 +4382,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testGroupByWithGroupByEmpty() throws Exception
{
testQuery(
- "SELECT COUNT(*), SUM(cnt) FROM druid.foo GROUP BY ()",
+ "SELECT COUNT(*), SUM(cnt), MIN(cnt) FROM druid.foo GROUP BY ()",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
@@ -4390,12 +4390,13 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.granularity(Granularities.ALL)
.aggregators(aggregators(
new CountAggregatorFactory("a0"),
- new LongSumAggregatorFactory("a1", "cnt")
+ new LongSumAggregatorFactory("a1", "cnt"),
+ new LongMinAggregatorFactory("a2", "cnt")
))
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
- ImmutableList.of(new Object[]{6L, 6L})
+ ImmutableList.of(new Object[]{6L, 6L, 1L})
);
}
@@ -4797,6 +4798,30 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
}
@Test
+ public void testSimpleLongAggregations() throws Exception
+ {
+ testQuery(
+ "SELECT MIN(l1), MIN(cnt) FROM druid.numfoo",
+ ImmutableList.of(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE3)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.ALL)
+ .aggregators(aggregators(
+ new LongMinAggregatorFactory("a0", "l1"),
+ new LongMinAggregatorFactory("a1", "cnt")
+ ))
+ .context(TIMESERIES_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{0L, 1L}
+ )
+ );
+ }
+
+
+ @Test
public void testSimpleAggregations() throws Exception
{
// Cannot vectorize due to "longMax" aggregator.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org