You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/05/24 00:04:14 UTC
[incubator-druid] branch 0.15.0-incubating updated: allow quantiles
merge aggregator to also accept doubles (#7718) (#7743)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch 0.15.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.15.0-incubating by this push:
new ea3de8c allow quantiles merge aggregator to also accept doubles (#7718) (#7743)
ea3de8c is described below
commit ea3de8cac5c82173d0563416679fef2e3e7beeb2
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Thu May 23 17:04:08 2019 -0700
allow quantiles merge aggregator to also accept doubles (#7718) (#7743)
* allow quantiles merge aggregator to also accept doubles
* consolidate dupe
* import
---
.../quantiles/DoublesSketchBuildAggregator.java | 3 -
.../quantiles/DoublesSketchMergeAggregator.java | 23 +++--
.../DoublesSketchMergeBufferAggregator.java | 11 +--
.../sql/DoublesSketchSqlAggregatorTest.java | 106 +++++++++++++++++----
4 files changed, 106 insertions(+), 37 deletions(-)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java
index bd46fc5..18f94a9 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java
@@ -28,14 +28,12 @@ public class DoublesSketchBuildAggregator implements Aggregator
{
private final ColumnValueSelector<Double> valueSelector;
- private final int size;
private UpdateDoublesSketch sketch;
public DoublesSketchBuildAggregator(final ColumnValueSelector<Double> valueSelector, final int size)
{
this.valueSelector = valueSelector;
- this.size = size;
sketch = DoublesSketch.builder().setK(size).build();
}
@@ -68,5 +66,4 @@ public class DoublesSketchBuildAggregator implements Aggregator
{
sketch = null;
}
-
}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
index 325a6f2..4598048 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
@@ -27,10 +27,10 @@ import org.apache.druid.segment.ColumnValueSelector;
public class DoublesSketchMergeAggregator implements Aggregator
{
- private final ColumnValueSelector<DoublesSketch> selector;
+ private final ColumnValueSelector selector;
private DoublesUnion union;
- public DoublesSketchMergeAggregator(final ColumnValueSelector<DoublesSketch> selector, final int k)
+ public DoublesSketchMergeAggregator(final ColumnValueSelector selector, final int k)
{
this.selector = selector;
union = DoublesUnion.builder().setMaxK(k).build();
@@ -39,13 +39,10 @@ public class DoublesSketchMergeAggregator implements Aggregator
@Override
public synchronized void aggregate()
{
- final DoublesSketch sketch = selector.getObject();
- if (sketch == null) {
- return;
- }
- union.update(sketch);
+ updateUnion(selector, union);
}
+
@Override
public synchronized Object get()
{
@@ -70,4 +67,16 @@ public class DoublesSketchMergeAggregator implements Aggregator
union = null;
}
+ static void updateUnion(ColumnValueSelector selector, DoublesUnion union)
+ {
+ final Object object = selector.getObject();
+ if (object == null) {
+ return;
+ }
+ if (object instanceof DoublesSketch) {
+ union.update((DoublesSketch) object);
+ } else {
+ union.update(selector.getDouble());
+ }
+ }
}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java
index ffe9009..f5a1e9d 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java
@@ -20,7 +20,6 @@
package org.apache.druid.query.aggregation.datasketches.quantiles;
import com.yahoo.memory.WritableMemory;
-import com.yahoo.sketches.quantiles.DoublesSketch;
import com.yahoo.sketches.quantiles.DoublesUnion;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@@ -34,14 +33,14 @@ import java.util.IdentityHashMap;
public class DoublesSketchMergeBufferAggregator implements BufferAggregator
{
- private final ColumnValueSelector<DoublesSketch> selector;
+ private final ColumnValueSelector selector;
private final int k;
private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<DoublesUnion>> unions = new IdentityHashMap<>();
public DoublesSketchMergeBufferAggregator(
- final ColumnValueSelector<DoublesSketch> selector,
+ final ColumnValueSelector selector,
final int k,
final int maxIntermediateSize)
{
@@ -62,12 +61,8 @@ public class DoublesSketchMergeBufferAggregator implements BufferAggregator
@Override
public synchronized void aggregate(final ByteBuffer buffer, final int position)
{
- final DoublesSketch sketch = selector.getObject();
- if (sketch == null) {
- return;
- }
final DoublesUnion union = unions.get(buffer).get(position);
- union.update(sketch);
+ DoublesSketchMergeAggregator.updateUnion(selector, union);
}
@Override
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
index e51c2de..a3382f4 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
@@ -125,25 +125,26 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
CalciteTests.getJsonMapper().registerModule(mod);
}
- final QueryableIndex index = IndexBuilder.create()
- .tmpDir(temporaryFolder.newFolder())
- .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
- .schema(
- new IncrementalIndexSchema.Builder()
- .withMetrics(
- new CountAggregatorFactory("cnt"),
- new DoubleSumAggregatorFactory("m1", "m1"),
- new DoublesSketchAggregatorFactory(
- "qsketch_m1",
- "m1",
- 128
- )
- )
- .withRollup(false)
- .build()
- )
- .rows(CalciteTests.ROWS1)
- .buildMMappedIndex();
+ final QueryableIndex index =
+ IndexBuilder.create()
+ .tmpDir(temporaryFolder.newFolder())
+ .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ new IncrementalIndexSchema.Builder()
+ .withMetrics(
+ new CountAggregatorFactory("cnt"),
+ new DoubleSumAggregatorFactory("m1", "m1"),
+ new DoublesSketchAggregatorFactory(
+ "qsketch_m1",
+ "m1",
+ 128
+ )
+ )
+ .withRollup(false)
+ .build()
+ )
+ .rows(CalciteTests.ROWS1)
+ .buildMMappedIndex();
walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
@@ -401,6 +402,73 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
);
}
+ @Test
+ public void testQuantileOnInnerQuantileQuery() throws Exception
+ {
+ SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
+ final String sql = "SELECT dim1, APPROX_QUANTILE_DS(x, 0.5)\n"
+ + "FROM (SELECT dim1, dim2, APPROX_QUANTILE_DS(m1, 0.5) AS x FROM foo GROUP BY dim1, dim2) GROUP BY dim1";
+
+
+ final List<Object[]> results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
+
+ ImmutableList.Builder<Object[]> builder = ImmutableList.builder();
+ builder.add(new Object[]{"", 1.0});
+ builder.add(new Object[]{"1", 4.0});
+ builder.add(new Object[]{"10.1", 2.0});
+ builder.add(new Object[]{"2", 3.0});
+ builder.add(new Object[]{"abc", 6.0});
+ builder.add(new Object[]{"def", 5.0});
+ final List<Object[]> expectedResults = builder.build();
+ Assert.assertEquals(expectedResults.size(), results.size());
+ for (int i = 0; i < expectedResults.size(); i++) {
+ Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
+ }
+
+ // Verify query
+ Assert.assertEquals(
+ GroupByQuery.builder()
+ .setDataSource(
+ new QueryDataSource(
+ GroupByQuery.builder()
+ .setDataSource(CalciteTests.DATASOURCE1)
+ .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(
+ new DefaultDimensionSpec("dim1", "d0"),
+ new DefaultDimensionSpec("dim2", "d1")
+ )
+ .setAggregatorSpecs(
+ ImmutableList.of(
+ new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
+ )
+ )
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.5f)
+ )
+ )
+ .setContext(ImmutableMap.of(PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
+ .build()
+ )
+ )
+ .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING))
+ .setAggregatorSpecs(
+ new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128)
+ )
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new DoublesSketchToQuantilePostAggregator("_a0", makeFieldAccessPostAgg("_a0:agg"), 0.5f)
+ )
+ )
+ .setContext(ImmutableMap.of(PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
+ .build(),
+ Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+ );
+ }
+
private static PostAggregator makeFieldAccessPostAgg(String name)
{
return new FieldAccessPostAggregator(name, name);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org