You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2020/01/18 02:42:32 UTC

[druid] branch 0.17.0 updated: Fix LATEST / EARLIEST Buffer Aggregator does not work on String column (#9197) (#9210)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.17.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.17.0 by this push:
     new e92e109  Fix LATEST / EARLIEST Buffer Aggregator does not work on String column  (#9197) (#9210)
e92e109 is described below

commit e92e1094d0db78a41e9d27402664e0d85b6aa180
Author: Maytas Monsereenusorn <52...@users.noreply.github.com>
AuthorDate: Fri Jan 17 18:42:22 2020 -0800

    Fix LATEST / EARLIEST Buffer Aggregator does not work on String column  (#9197) (#9210)
    
    * fix buff limit bug
    
    * add tests
    
    * add test
    
    * add tests
    
    * fix checkstyle
---
 .../aggregation/first/StringFirstLastUtils.java    |   2 +-
 .../first/StringFirstLastUtilsTest.java            |  59 +++++++++
 .../apache/druid/sql/calcite/CalciteQueryTest.java | 146 ++++++++++++++++++++-
 3 files changed, 201 insertions(+), 6 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
index 133c4ba..630f70c 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
@@ -87,7 +87,7 @@ public class StringFirstLastUtils
 
     if (pair.rhs != null) {
       mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
-      mutationBuffer.limit(maxStringBytes);
+      mutationBuffer.limit(position + Long.BYTES + Integer.BYTES + maxStringBytes);
       final int len = StringUtils.toUtf8WithLimit(pair.rhs, mutationBuffer);
       mutationBuffer.putInt(position + Long.BYTES, len);
     } else {
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java
new file mode 100644
index 0000000..b4e4088
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.query.aggregation.SerializablePairLongString;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class StringFirstLastUtilsTest
+{
+  private static final SerializablePairLongString PAIR_TO_WRITE = new SerializablePairLongString(
+      DateTimes.MAX.getMillis(),
+      "asdasddsaasd"
+  );
+
+  private static final int BUFFER_CAPACITY = 100;
+  // PAIR_TO_WRITE Size is 12 so MAX_BYTE_TO_WRITE is set to 15 which is more than enough
+  private static final int MAX_BYTE_TO_WRITE = 15;
+
+  @Test
+  public void testWritePairThenReadPairAtBeginningBuffer()
+  {
+    int positionAtBeginning = 0;
+    ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY);
+    StringFirstLastUtils.writePair(buf, positionAtBeginning, PAIR_TO_WRITE, MAX_BYTE_TO_WRITE);
+    SerializablePairLongString actual = StringFirstLastUtils.readPair(buf, positionAtBeginning);
+    Assert.assertEquals(PAIR_TO_WRITE, actual);
+  }
+
+  @Test
+  public void testWritePairThenReadPairAtMiddleBuffer()
+  {
+    int positionAtMiddle = 60;
+    ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY);
+    StringFirstLastUtils.writePair(buf, positionAtMiddle, PAIR_TO_WRITE, MAX_BYTE_TO_WRITE);
+    SerializablePairLongString actual = StringFirstLastUtils.readPair(buf, positionAtMiddle);
+    Assert.assertEquals(PAIR_TO_WRITE, actual);
+  }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 895b452..8fbb1ad 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -44,11 +44,13 @@ import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
 import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
+import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
 import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
 import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
 import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
 import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
 import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
 import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
 import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
 import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
@@ -1298,13 +1300,13 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
   }
 
   @Test
-  public void testLatestInSubquery() throws Exception
+  public void testPrimitiveLatestInSubquery() throws Exception
   {
     // Cannot vectorize LATEST aggregator.
     skipVectorize();
 
     testQuery(
-        "SELECT SUM(val) FROM (SELECT dim2, LATEST(m1) AS val FROM foo GROUP BY dim2)",
+        "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, LATEST(m1) AS val1, LATEST(cnt) AS val2, LATEST(m2) AS val3 FROM foo GROUP BY dim2)",
         ImmutableList.of(
             GroupByQuery.builder()
                         .setDataSource(
@@ -1313,7 +1315,141 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                                         .setInterval(querySegmentSpec(Filtration.eternity()))
                                         .setGranularity(Granularities.ALL)
                                         .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
-                                        .setAggregatorSpecs(aggregators(new FloatLastAggregatorFactory("a0:a", "m1")))
+                                        .setAggregatorSpecs(aggregators(
+                                            new FloatLastAggregatorFactory("a0:a", "m1"),
+                                            new LongLastAggregatorFactory("a1:a", "cnt"),
+                                            new DoubleLastAggregatorFactory("a2:a", "m2"))
+                                        )
+                                        .setPostAggregatorSpecs(
+                                            ImmutableList.of(
+                                                new FinalizingFieldAccessPostAggregator("a0", "a0:a"),
+                                                new FinalizingFieldAccessPostAggregator("a1", "a1:a"),
+                                                new FinalizingFieldAccessPostAggregator("a2", "a2:a")
+
+                                            )
+                                        )
+                                        .setContext(QUERY_CONTEXT_DEFAULT)
+                                        .build()
+                        )
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setAggregatorSpecs(aggregators(
+                            new DoubleSumAggregatorFactory("_a0", "a0"),
+                            new LongSumAggregatorFactory("_a1", "a1"),
+                            new DoubleSumAggregatorFactory("_a2", "a2")
+                                            )
+                        )
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{18.0, 4L, 18.0}) : ImmutableList.of(new Object[]{15.0, 3L, 15.0})
+    );
+  }
+
+  // This test the off-heap (buffer) version of the EarliestAggregator (Double/Float/Long)
+  @Test
+  public void testPrimitiveEarliestInSubquery() throws Exception
+  {
+    // Cannot vectorize EARLIEST aggregator.
+    skipVectorize();
+
+    testQuery(
+        "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, EARLIEST(m1) AS val1, EARLIEST(cnt) AS val2, EARLIEST(m2) AS val3 FROM foo GROUP BY dim2)",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            GroupByQuery.builder()
+                                        .setDataSource(CalciteTests.DATASOURCE1)
+                                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                                        .setGranularity(Granularities.ALL)
+                                        .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
+                                        .setAggregatorSpecs(aggregators(
+                                            new FloatFirstAggregatorFactory("a0:a", "m1"),
+                                            new LongFirstAggregatorFactory("a1:a", "cnt"),
+                                            new DoubleFirstAggregatorFactory("a2:a", "m2"))
+                                        )
+                                        .setPostAggregatorSpecs(
+                                            ImmutableList.of(
+                                                new FinalizingFieldAccessPostAggregator("a0", "a0:a"),
+                                                new FinalizingFieldAccessPostAggregator("a1", "a1:a"),
+                                                new FinalizingFieldAccessPostAggregator("a2", "a2:a")
+
+                                            )
+                                        )
+                                        .setContext(QUERY_CONTEXT_DEFAULT)
+                                        .build()
+                        )
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setAggregatorSpecs(aggregators(
+                            new DoubleSumAggregatorFactory("_a0", "a0"),
+                            new LongSumAggregatorFactory("_a1", "a1"),
+                            new DoubleSumAggregatorFactory("_a2", "a2")
+                                            )
+                        )
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{11.0, 4L, 11.0}) : ImmutableList.of(new Object[]{8.0, 3L, 8.0})
+    );
+  }
+
+  // This test the off-heap (buffer) version of the LatestAggregator (String)
+  @Test
+  public void testStringLatestInSubquery() throws Exception
+  {
+    // Cannot vectorize LATEST aggregator.
+    skipVectorize();
+
+    testQuery(
+        "SELECT SUM(val) FROM (SELECT dim2, LATEST(dim1, 10) AS val FROM foo GROUP BY dim2)",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            GroupByQuery.builder()
+                                        .setDataSource(CalciteTests.DATASOURCE1)
+                                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                                        .setGranularity(Granularities.ALL)
+                                        .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
+                                        .setAggregatorSpecs(aggregators(new StringLastAggregatorFactory("a0:a", "dim1", 10)))
+                                        .setPostAggregatorSpecs(
+                                            ImmutableList.of(
+                                                new FinalizingFieldAccessPostAggregator("a0", "a0:a")
+                                            )
+                                        )
+                                        .setContext(QUERY_CONTEXT_DEFAULT)
+                                        .build()
+                        )
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')", ExprMacroTable.nil())))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{NullHandling.sqlCompatible() ? 3 : 1.0}
+        )
+    );
+  }
+
+  // This test the off-heap (buffer) version of the EarliestAggregator (String)
+  @Test
+  public void testStringEarliestInSubquery() throws Exception
+  {
+    // Cannot vectorize EARLIEST aggregator.
+    skipVectorize();
+
+    testQuery(
+        "SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1, 10) AS val FROM foo GROUP BY dim2)",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            GroupByQuery.builder()
+                                        .setDataSource(CalciteTests.DATASOURCE1)
+                                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                                        .setGranularity(Granularities.ALL)
+                                        .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
+                                        .setAggregatorSpecs(aggregators(new StringFirstAggregatorFactory("a0:a", "dim1", 10)))
                                         .setPostAggregatorSpecs(
                                             ImmutableList.of(
                                                 new FinalizingFieldAccessPostAggregator("a0", "a0:a")
@@ -1324,12 +1460,12 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                         )
                         .setInterval(querySegmentSpec(Filtration.eternity()))
                         .setGranularity(Granularities.ALL)
-                        .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", "a0")))
+                        .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')", ExprMacroTable.nil())))
                         .setContext(QUERY_CONTEXT_DEFAULT)
                         .build()
         ),
         ImmutableList.of(
-            new Object[]{NullHandling.sqlCompatible() ? 18.0 : 15.0}
+            new Object[]{NullHandling.sqlCompatible() ? 12.1 : 11.1}
         )
     );
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org