You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2020/01/18 02:42:32 UTC
[druid] branch 0.17.0 updated: Fix LATEST / EARLIEST Buffer
Aggregator does not work on String column (#9197) (#9210)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 0.17.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.17.0 by this push:
new e92e109 Fix LATEST / EARLIEST Buffer Aggregator does not work on String column (#9197) (#9210)
e92e109 is described below
commit e92e1094d0db78a41e9d27402664e0d85b6aa180
Author: Maytas Monsereenusorn <52...@users.noreply.github.com>
AuthorDate: Fri Jan 17 18:42:22 2020 -0800
Fix LATEST / EARLIEST Buffer Aggregator does not work on String column (#9197) (#9210)
* fix buff limit bug
* add tests
* add test
* add tests
* fix checkstyle
---
.../aggregation/first/StringFirstLastUtils.java | 2 +-
.../first/StringFirstLastUtilsTest.java | 59 +++++++++
.../apache/druid/sql/calcite/CalciteQueryTest.java | 146 ++++++++++++++++++++-
3 files changed, 201 insertions(+), 6 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
index 133c4ba..630f70c 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
@@ -87,7 +87,7 @@ public class StringFirstLastUtils
if (pair.rhs != null) {
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
- mutationBuffer.limit(maxStringBytes);
+ mutationBuffer.limit(position + Long.BYTES + Integer.BYTES + maxStringBytes);
final int len = StringUtils.toUtf8WithLimit(pair.rhs, mutationBuffer);
mutationBuffer.putInt(position + Long.BYTES, len);
} else {
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java
new file mode 100644
index 0000000..b4e4088
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.query.aggregation.SerializablePairLongString;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class StringFirstLastUtilsTest
+{
+ private static final SerializablePairLongString PAIR_TO_WRITE = new SerializablePairLongString(
+ DateTimes.MAX.getMillis(),
+ "asdasddsaasd"
+ );
+
+ private static final int BUFFER_CAPACITY = 100;
+ // PAIR_TO_WRITE Size is 12 so MAX_BYTE_TO_WRITE is set to 15 which is more than enough
+ private static final int MAX_BYTE_TO_WRITE = 15;
+
+ @Test
+ public void testWritePairThenReadPairAtBeginningBuffer()
+ {
+ int positionAtBeginning = 0;
+ ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY);
+ StringFirstLastUtils.writePair(buf, positionAtBeginning, PAIR_TO_WRITE, MAX_BYTE_TO_WRITE);
+ SerializablePairLongString actual = StringFirstLastUtils.readPair(buf, positionAtBeginning);
+ Assert.assertEquals(PAIR_TO_WRITE, actual);
+ }
+
+ @Test
+ public void testWritePairThenReadPairAtMiddleBuffer()
+ {
+ int positionAtMiddle = 60;
+ ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY);
+ StringFirstLastUtils.writePair(buf, positionAtMiddle, PAIR_TO_WRITE, MAX_BYTE_TO_WRITE);
+ SerializablePairLongString actual = StringFirstLastUtils.readPair(buf, positionAtMiddle);
+ Assert.assertEquals(PAIR_TO_WRITE, actual);
+ }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 895b452..8fbb1ad 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -44,11 +44,13 @@ import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
+import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
@@ -1298,13 +1300,13 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
}
@Test
- public void testLatestInSubquery() throws Exception
+ public void testPrimitiveLatestInSubquery() throws Exception
{
// Cannot vectorize LATEST aggregator.
skipVectorize();
testQuery(
- "SELECT SUM(val) FROM (SELECT dim2, LATEST(m1) AS val FROM foo GROUP BY dim2)",
+ "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, LATEST(m1) AS val1, LATEST(cnt) AS val2, LATEST(m2) AS val3 FROM foo GROUP BY dim2)",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
@@ -1313,7 +1315,141 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
- .setAggregatorSpecs(aggregators(new FloatLastAggregatorFactory("a0:a", "m1")))
+ .setAggregatorSpecs(aggregators(
+ new FloatLastAggregatorFactory("a0:a", "m1"),
+ new LongLastAggregatorFactory("a1:a", "cnt"),
+ new DoubleLastAggregatorFactory("a2:a", "m2"))
+ )
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new FinalizingFieldAccessPostAggregator("a0", "a0:a"),
+ new FinalizingFieldAccessPostAggregator("a1", "a1:a"),
+ new FinalizingFieldAccessPostAggregator("a2", "a2:a")
+
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ )
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setAggregatorSpecs(aggregators(
+ new DoubleSumAggregatorFactory("_a0", "a0"),
+ new LongSumAggregatorFactory("_a1", "a1"),
+ new DoubleSumAggregatorFactory("_a2", "a2")
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{18.0, 4L, 18.0}) : ImmutableList.of(new Object[]{15.0, 3L, 15.0})
+ );
+ }
+
+ // This test the off-heap (buffer) version of the EarliestAggregator (Double/Float/Long)
+ @Test
+ public void testPrimitiveEarliestInSubquery() throws Exception
+ {
+ // Cannot vectorize EARLIEST aggregator.
+ skipVectorize();
+
+ testQuery(
+ "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, EARLIEST(m1) AS val1, EARLIEST(cnt) AS val2, EARLIEST(m2) AS val3 FROM foo GROUP BY dim2)",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ GroupByQuery.builder()
+ .setDataSource(CalciteTests.DATASOURCE1)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
+ .setAggregatorSpecs(aggregators(
+ new FloatFirstAggregatorFactory("a0:a", "m1"),
+ new LongFirstAggregatorFactory("a1:a", "cnt"),
+ new DoubleFirstAggregatorFactory("a2:a", "m2"))
+ )
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new FinalizingFieldAccessPostAggregator("a0", "a0:a"),
+ new FinalizingFieldAccessPostAggregator("a1", "a1:a"),
+ new FinalizingFieldAccessPostAggregator("a2", "a2:a")
+
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ )
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setAggregatorSpecs(aggregators(
+ new DoubleSumAggregatorFactory("_a0", "a0"),
+ new LongSumAggregatorFactory("_a1", "a1"),
+ new DoubleSumAggregatorFactory("_a2", "a2")
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{11.0, 4L, 11.0}) : ImmutableList.of(new Object[]{8.0, 3L, 8.0})
+ );
+ }
+
+ // This test the off-heap (buffer) version of the LatestAggregator (String)
+ @Test
+ public void testStringLatestInSubquery() throws Exception
+ {
+ // Cannot vectorize LATEST aggregator.
+ skipVectorize();
+
+ testQuery(
+ "SELECT SUM(val) FROM (SELECT dim2, LATEST(dim1, 10) AS val FROM foo GROUP BY dim2)",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ GroupByQuery.builder()
+ .setDataSource(CalciteTests.DATASOURCE1)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
+ .setAggregatorSpecs(aggregators(new StringLastAggregatorFactory("a0:a", "dim1", 10)))
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new FinalizingFieldAccessPostAggregator("a0", "a0:a")
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ )
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')", ExprMacroTable.nil())))
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{NullHandling.sqlCompatible() ? 3 : 1.0}
+ )
+ );
+ }
+
+ // This test the off-heap (buffer) version of the EarliestAggregator (String)
+ @Test
+ public void testStringEarliestInSubquery() throws Exception
+ {
+ // Cannot vectorize EARLIEST aggregator.
+ skipVectorize();
+
+ testQuery(
+ "SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1, 10) AS val FROM foo GROUP BY dim2)",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ GroupByQuery.builder()
+ .setDataSource(CalciteTests.DATASOURCE1)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
+ .setAggregatorSpecs(aggregators(new StringFirstAggregatorFactory("a0:a", "dim1", 10)))
.setPostAggregatorSpecs(
ImmutableList.of(
new FinalizingFieldAccessPostAggregator("a0", "a0:a")
@@ -1324,12 +1460,12 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
- .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", "a0")))
+ .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')", ExprMacroTable.nil())))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
- new Object[]{NullHandling.sqlCompatible() ? 18.0 : 15.0}
+ new Object[]{NullHandling.sqlCompatible() ? 12.1 : 11.1}
)
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org