You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/05/24 00:04:14 UTC

[incubator-druid] branch 0.15.0-incubating updated: allow quantiles merge aggregator to also accept doubles (#7718) (#7743)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.15.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.15.0-incubating by this push:
     new ea3de8c  allow quantiles merge aggregator to also accept doubles (#7718) (#7743)
ea3de8c is described below

commit ea3de8cac5c82173d0563416679fef2e3e7beeb2
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Thu May 23 17:04:08 2019 -0700

    allow quantiles merge aggregator to also accept doubles (#7718) (#7743)
    
    * allow quantiles merge aggregator to also accept doubles
    
    * consolidate dupe
    
    * import
---
 .../quantiles/DoublesSketchBuildAggregator.java    |   3 -
 .../quantiles/DoublesSketchMergeAggregator.java    |  23 +++--
 .../DoublesSketchMergeBufferAggregator.java        |  11 +--
 .../sql/DoublesSketchSqlAggregatorTest.java        | 106 +++++++++++++++++----
 4 files changed, 106 insertions(+), 37 deletions(-)

diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java
index bd46fc5..18f94a9 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java
@@ -28,14 +28,12 @@ public class DoublesSketchBuildAggregator implements Aggregator
 {
 
   private final ColumnValueSelector<Double> valueSelector;
-  private final int size;
 
   private UpdateDoublesSketch sketch;
 
   public DoublesSketchBuildAggregator(final ColumnValueSelector<Double> valueSelector, final int size)
   {
     this.valueSelector = valueSelector;
-    this.size = size;
     sketch = DoublesSketch.builder().setK(size).build();
   }
 
@@ -68,5 +66,4 @@ public class DoublesSketchBuildAggregator implements Aggregator
   {
     sketch = null;
   }
-
 }
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
index 325a6f2..4598048 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
@@ -27,10 +27,10 @@ import org.apache.druid.segment.ColumnValueSelector;
 public class DoublesSketchMergeAggregator implements Aggregator
 {
 
-  private final ColumnValueSelector<DoublesSketch> selector;
+  private final ColumnValueSelector selector;
   private DoublesUnion union;
 
-  public DoublesSketchMergeAggregator(final ColumnValueSelector<DoublesSketch> selector, final int k)
+  public DoublesSketchMergeAggregator(final ColumnValueSelector selector, final int k)
   {
     this.selector = selector;
     union = DoublesUnion.builder().setMaxK(k).build();
@@ -39,13 +39,10 @@ public class DoublesSketchMergeAggregator implements Aggregator
   @Override
   public synchronized void aggregate()
   {
-    final DoublesSketch sketch = selector.getObject();
-    if (sketch == null) {
-      return;
-    }
-    union.update(sketch);
+    updateUnion(selector, union);
   }
 
+
   @Override
   public synchronized Object get()
   {
@@ -70,4 +67,16 @@ public class DoublesSketchMergeAggregator implements Aggregator
     union = null;
   }
 
+  static void updateUnion(ColumnValueSelector selector, DoublesUnion union)
+  {
+    final Object object = selector.getObject();
+    if (object == null) {
+      return;
+    }
+    if (object instanceof DoublesSketch) {
+      union.update((DoublesSketch) object);
+    } else {
+      union.update(selector.getDouble());
+    }
+  }
 }
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java
index ffe9009..f5a1e9d 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java
@@ -20,7 +20,6 @@
 package org.apache.druid.query.aggregation.datasketches.quantiles;
 
 import com.yahoo.memory.WritableMemory;
-import com.yahoo.sketches.quantiles.DoublesSketch;
 import com.yahoo.sketches.quantiles.DoublesUnion;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@@ -34,14 +33,14 @@ import java.util.IdentityHashMap;
 public class DoublesSketchMergeBufferAggregator implements BufferAggregator
 {
 
-  private final ColumnValueSelector<DoublesSketch> selector;
+  private final ColumnValueSelector selector;
   private final int k;
   private final int maxIntermediateSize;
   private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
   private final IdentityHashMap<ByteBuffer, Int2ObjectMap<DoublesUnion>> unions = new IdentityHashMap<>();
 
   public DoublesSketchMergeBufferAggregator(
-      final ColumnValueSelector<DoublesSketch> selector,
+      final ColumnValueSelector selector,
       final int k,
       final int maxIntermediateSize)
   {
@@ -62,12 +61,8 @@ public class DoublesSketchMergeBufferAggregator implements BufferAggregator
   @Override
   public synchronized void aggregate(final ByteBuffer buffer, final int position)
   {
-    final DoublesSketch sketch = selector.getObject();
-    if (sketch == null) {
-      return;
-    }
     final DoublesUnion union = unions.get(buffer).get(position);
-    union.update(sketch);
+    DoublesSketchMergeAggregator.updateUnion(selector, union);
   }
 
   @Override
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
index e51c2de..a3382f4 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
@@ -125,25 +125,26 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
       CalciteTests.getJsonMapper().registerModule(mod);
     }
 
-    final QueryableIndex index = IndexBuilder.create()
-                                             .tmpDir(temporaryFolder.newFolder())
-                                             .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
-                                             .schema(
-                                                 new IncrementalIndexSchema.Builder()
-                                                     .withMetrics(
-                                                         new CountAggregatorFactory("cnt"),
-                                                         new DoubleSumAggregatorFactory("m1", "m1"),
-                                                         new DoublesSketchAggregatorFactory(
-                                                             "qsketch_m1",
-                                                             "m1",
-                                                             128
-                                                         )
-                                                     )
-                                                     .withRollup(false)
-                                                     .build()
-                                             )
-                                             .rows(CalciteTests.ROWS1)
-                                             .buildMMappedIndex();
+    final QueryableIndex index =
+        IndexBuilder.create()
+                    .tmpDir(temporaryFolder.newFolder())
+                    .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                    .schema(
+                        new IncrementalIndexSchema.Builder()
+                            .withMetrics(
+                                new CountAggregatorFactory("cnt"),
+                                new DoubleSumAggregatorFactory("m1", "m1"),
+                                new DoublesSketchAggregatorFactory(
+                                    "qsketch_m1",
+                                    "m1",
+                                    128
+                                )
+                            )
+                            .withRollup(false)
+                            .build()
+                    )
+                    .rows(CalciteTests.ROWS1)
+                    .buildMMappedIndex();
 
     walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
         DataSegment.builder()
@@ -401,6 +402,73 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
     );
   }
 
+  @Test
+  public void testQuantileOnInnerQuantileQuery() throws Exception
+  {
+    SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
+    final String sql = "SELECT dim1, APPROX_QUANTILE_DS(x, 0.5)\n"
+                       + "FROM (SELECT dim1, dim2, APPROX_QUANTILE_DS(m1, 0.5) AS x FROM foo GROUP BY dim1, dim2) GROUP BY dim1";
+
+
+    final List<Object[]> results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
+
+    ImmutableList.Builder<Object[]> builder = ImmutableList.builder();
+    builder.add(new Object[]{"", 1.0});
+    builder.add(new Object[]{"1", 4.0});
+    builder.add(new Object[]{"10.1", 2.0});
+    builder.add(new Object[]{"2", 3.0});
+    builder.add(new Object[]{"abc", 6.0});
+    builder.add(new Object[]{"def", 5.0});
+    final List<Object[]> expectedResults = builder.build();
+    Assert.assertEquals(expectedResults.size(), results.size());
+    for (int i = 0; i < expectedResults.size(); i++) {
+      Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
+    }
+
+    // Verify query
+    Assert.assertEquals(
+        GroupByQuery.builder()
+                    .setDataSource(
+                        new QueryDataSource(
+                            GroupByQuery.builder()
+                                        .setDataSource(CalciteTests.DATASOURCE1)
+                                        .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+                                        .setGranularity(Granularities.ALL)
+                                        .setDimensions(
+                                            new DefaultDimensionSpec("dim1", "d0"),
+                                            new DefaultDimensionSpec("dim2", "d1")
+                                        )
+                                        .setAggregatorSpecs(
+                                            ImmutableList.of(
+                                                new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
+                                            )
+                                        )
+                                        .setPostAggregatorSpecs(
+                                            ImmutableList.of(
+                                                new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.5f)
+                                            )
+                                        )
+                                        .setContext(ImmutableMap.of(PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
+                                        .build()
+                        )
+                    )
+                    .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+                    .setGranularity(Granularities.ALL)
+                    .setDimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING))
+                    .setAggregatorSpecs(
+                        new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128)
+                    )
+                    .setPostAggregatorSpecs(
+                        ImmutableList.of(
+                            new DoublesSketchToQuantilePostAggregator("_a0", makeFieldAccessPostAgg("_a0:agg"), 0.5f)
+                        )
+                    )
+                    .setContext(ImmutableMap.of(PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
+                    .build(),
+        Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+    );
+  }
+
   private static PostAggregator makeFieldAccessPostAgg(String name)
   {
     return new FieldAccessPostAggregator(name, name);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org