You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by hi...@apache.org on 2019/09/26 15:04:53 UTC
[incubator-druid] branch master updated: doubleMean aggregator to
be used at query time (#8459)
This is an automated email from the ASF dual-hosted git repository.
himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9f1f5e1 doubleMean aggregator to be used at query time (#8459)
9f1f5e1 is described below
commit 9f1f5e115c0595e208c0f59e12bf2a5e26ed0c29
Author: Himanshu <g....@gmail.com>
AuthorDate: Thu Sep 26 08:04:33 2019 -0700
doubleMean aggregator to be used at query time (#8459)
* doubleMean aggregator for computing mean
* make docs
* build fixes
* address review comment: handle null args
---
.../druid/java/util/common/collect/Utils.java | 9 +
docs/querying/aggregations.md | 8 +
.../apache/druid/jackson/AggregatorsModule.java | 5 +
.../druid/query/aggregation/AggregatorUtil.java | 2 +
.../aggregation/SimpleDoubleBufferAggregator.java | 2 +-
.../aggregation/mean/DoubleMeanAggregator.java | 86 ++++++++++
.../mean/DoubleMeanAggregatorFactory.java | 181 +++++++++++++++++++++
.../mean/DoubleMeanBufferAggregator.java | 101 ++++++++++++
.../query/aggregation/mean/DoubleMeanHolder.java | 136 ++++++++++++++++
.../mean/DoubleMeanVectorAggregator.java | 81 +++++++++
.../druid/query/groupby/GroupByQueryConfig.java | 2 +-
.../query/aggregation/AggregationTestHelper.java | 2 +-
.../mean/DoubleMeanAggregationTest.java | 159 ++++++++++++++++++
.../query/aggregation/mean/SimpleTestIndex.java | 113 +++++++++++++
14 files changed, 884 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java b/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java
index de7a239..429660f 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java
@@ -81,4 +81,13 @@ public class Utils
return true;
}
+
+ public static String safeObjectClassGetName(@Nullable Object o)
+ {
+ if (o == null) {
+ return "NULL";
+ } else {
+ return o.getClass().getName();
+ }
+ }
}
diff --git a/docs/querying/aggregations.md b/docs/querying/aggregations.md
index 3f63095..4bc7605 100644
--- a/docs/querying/aggregations.md
+++ b/docs/querying/aggregations.md
@@ -120,6 +120,14 @@ Computes and stores the sum of values as 32-bit floating point value. Similar to
{ "type" : "longMax", "name" : <output_name>, "fieldName" : <metric_name> }
```
+### `doubleMean` aggregator
+
+Computes and returns arithmetic mean of a column values as 64 bit float value. This is a query time aggregator only and should not be used during indexing.
+
+```json
+{ "type" : "doubleMean", "name" : <output_name>, "fieldName" : <metric_name> }
+```
+
### First / Last aggregator
(Double/Float/Long) First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries.
diff --git a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
index 6689d27..aba52a0 100644
--- a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
+++ b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
@@ -53,6 +53,8 @@ import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.StringLastFoldingAggregatorFactory;
+import org.apache.druid.query.aggregation.mean.DoubleMeanAggregatorFactory;
+import org.apache.druid.query.aggregation.mean.DoubleMeanHolder;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
import org.apache.druid.query.aggregation.post.DoubleGreatestPostAggregator;
@@ -79,6 +81,8 @@ public class AggregatorsModule extends SimpleModule
setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
+
+ addSerializer(DoubleMeanHolder.class, DoubleMeanHolder.Serializer.INSTANCE);
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@@ -105,6 +109,7 @@ public class AggregatorsModule extends SimpleModule
@JsonSubTypes.Type(name = "stringFirstFold", value = StringFirstFoldingAggregatorFactory.class),
@JsonSubTypes.Type(name = "longLast", value = LongLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleLast", value = DoubleLastAggregatorFactory.class),
+ @JsonSubTypes.Type(name = "doubleMean", value = DoubleMeanAggregatorFactory.class),
@JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "stringLast", value = StringLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "stringLastFold", value = StringLastFoldingAggregatorFactory.class)
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
index e37f87b..8be5e53 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
@@ -119,6 +119,8 @@ public class AggregatorUtil
// TDigest sketch aggregators
public static final byte TDIGEST_BUILD_SKETCH_CACHE_TYPE_ID = 0x38;
+ public static final byte MEAN_CACHE_TYPE_ID = 0x41;
+
/**
* returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
*
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleBufferAggregator.java
index 29db642..01f3901 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleBufferAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleBufferAggregator.java
@@ -29,7 +29,7 @@ public abstract class SimpleDoubleBufferAggregator implements BufferAggregator
{
final BaseDoubleColumnValueSelector selector;
- SimpleDoubleBufferAggregator(BaseDoubleColumnValueSelector selector)
+ public SimpleDoubleBufferAggregator(BaseDoubleColumnValueSelector selector)
{
this.selector = selector;
}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregator.java
new file mode 100644
index 0000000..a4f7b3d
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.mean;
+
+import org.apache.druid.java.util.common.Numbers;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import java.util.List;
+
+/**
+ */
+public class DoubleMeanAggregator implements Aggregator
+{
+ private final ColumnValueSelector selector;
+
+ private final DoubleMeanHolder value = new DoubleMeanHolder(0, 0);
+
+ public DoubleMeanAggregator(ColumnValueSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate()
+ {
+ Object update = selector.getObject();
+
+ if (update instanceof DoubleMeanHolder) {
+ value.update((DoubleMeanHolder) update);
+ } else if (update instanceof List) {
+ for (Object o : (List) update) {
+ value.update(Numbers.tryParseDouble(o, 0));
+ }
+ } else {
+ value.update(Numbers.tryParseDouble(update, 0));
+ }
+ }
+
+ @Override
+ public Object get()
+ {
+ return value;
+ }
+
+ @Override
+ public float getFloat()
+ {
+ throw new UnsupportedOperationException("not supported");
+ }
+
+ @Override
+ public long getLong()
+ {
+ throw new UnsupportedOperationException("not supported");
+ }
+
+ @Override
+ public double getDouble()
+ {
+ throw new UnsupportedOperationException("not supported");
+ }
+
+ @Override
+ public void close()
+ {
+ // no resources to cleanup
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregatorFactory.java
new file mode 100644
index 0000000..5ed87be
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregatorFactory.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.mean;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.collect.Utils;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.AggregatorUtil;
+import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ */
+public class DoubleMeanAggregatorFactory extends AggregatorFactory
+{
+ private final String name;
+ private final String fieldName;
+
+ @JsonCreator
+ public DoubleMeanAggregatorFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") final String fieldName
+ )
+ {
+ this.name = Preconditions.checkNotNull(name, "null name");
+ this.fieldName = Preconditions.checkNotNull(fieldName, "null fieldName");
+ }
+
+ @Override
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @JsonProperty
+ public String getFieldName()
+ {
+ return fieldName;
+ }
+
+ @Override
+ public List<String> requiredFields()
+ {
+ return Collections.singletonList(fieldName);
+ }
+
+ @Override
+ public String getTypeName()
+ {
+ return "doubleMean";
+ }
+
+ @Override
+ public int getMaxIntermediateSize()
+ {
+ return DoubleMeanHolder.MAX_INTERMEDIATE_SIZE;
+ }
+
+ @Override
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
+ {
+ return new DoubleMeanAggregator(metricFactory.makeColumnValueSelector(fieldName));
+ }
+
+ @Override
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
+ {
+ return new DoubleMeanBufferAggregator(metricFactory.makeColumnValueSelector(fieldName));
+ }
+
+ @Override
+ public VectorAggregator factorizeVector(final VectorColumnSelectorFactory selectorFactory)
+ {
+ return new DoubleMeanVectorAggregator(selectorFactory.makeValueSelector(fieldName));
+ }
+
+ @Override
+ public boolean canVectorize()
+ {
+ return true;
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ return DoubleMeanHolder.COMPARATOR;
+ }
+
+ @Nullable
+ @Override
+ public Object combine(@Nullable Object lhs, @Nullable Object rhs)
+ {
+ if (lhs instanceof DoubleMeanHolder && rhs instanceof DoubleMeanHolder) {
+ return ((DoubleMeanHolder) lhs).update((DoubleMeanHolder) rhs);
+ } else {
+ throw new IAE(
+ "lhs[%s] or rhs[%s] not of type [%s]",
+ Utils.safeObjectClassGetName(lhs),
+ Utils.safeObjectClassGetName(rhs),
+ DoubleMeanHolder.class.getName()
+ );
+ }
+ }
+
+ @Override
+ public AggregatorFactory getCombiningFactory()
+ {
+ return new DoubleMeanAggregatorFactory(name, name);
+ }
+
+ @Override
+ public List<AggregatorFactory> getRequiredColumns()
+ {
+ return Collections.singletonList(new DoubleMeanAggregatorFactory(fieldName, fieldName));
+ }
+
+ @Override
+ public Object deserialize(Object object)
+ {
+ if (object instanceof String) {
+ return DoubleMeanHolder.fromBytes(StringUtils.decodeBase64(StringUtils.toUtf8((String) object)));
+ } else if (object instanceof DoubleMeanHolder) {
+ return object;
+ } else {
+ throw new IAE("Unknown object type [%s]", Utils.safeObjectClassGetName(object));
+ }
+ }
+
+ @Nullable
+ @Override
+ public Object finalizeComputation(@Nullable Object object)
+ {
+ if (object instanceof DoubleMeanHolder) {
+ return ((DoubleMeanHolder) object).mean();
+ } else if (object == null) {
+ return null;
+ } else {
+ throw new IAE("Unknown object type [%s]", object.getClass().getName());
+ }
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(AggregatorUtil.MEAN_CACHE_TYPE_ID)
+ .appendString(name)
+ .appendString(fieldName)
+ .build();
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanBufferAggregator.java
new file mode 100644
index 0000000..5ff7f81
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanBufferAggregator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.mean;
+
+import org.apache.druid.java.util.common.Numbers;
+import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ */
+public class DoubleMeanBufferAggregator implements BufferAggregator
+{
+
+ private final ColumnValueSelector selector;
+
+ public DoubleMeanBufferAggregator(ColumnValueSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ DoubleMeanHolder.init(buf, position);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ Object update = selector.getObject();
+
+ if (update instanceof DoubleMeanHolder) {
+ DoubleMeanHolder.update(buf, position, (DoubleMeanHolder) update);
+ } else if (update instanceof List) {
+ for (Object o : (List) update) {
+ DoubleMeanHolder.update(buf, position, Numbers.tryParseDouble(o, 0));
+ }
+ } else {
+ DoubleMeanHolder.update(buf, position, Numbers.tryParseDouble(update, 0));
+ }
+ }
+
+ @Nullable
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ return DoubleMeanHolder.get(buf, position);
+ }
+
+ @Override
+ public float getFloat(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("not supported");
+ }
+
+ @Override
+ public long getLong(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("not supported");
+ }
+
+ @Override
+ public double getDouble(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("not supported");
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inspector.visit("selector", selector);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanHolder.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanHolder.java
new file mode 100644
index 0000000..f42c993
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanHolder.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.mean;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.google.common.primitives.Doubles;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+public class DoubleMeanHolder
+{
+ public static final int MAX_INTERMEDIATE_SIZE = Long.SIZE + Double.SIZE;
+ public static final Comparator<DoubleMeanHolder> COMPARATOR = (o1, o2) -> Doubles.compare(o1.mean(), o2.mean());
+
+ private double sum;
+ private long count;
+
+ public DoubleMeanHolder(double sum, long count)
+ {
+ this.sum = sum;
+ this.count = count;
+ }
+
+ public void update(double sum)
+ {
+ this.sum += sum;
+ count++;
+ }
+
+ public DoubleMeanHolder update(DoubleMeanHolder other)
+ {
+ sum += other.sum;
+ count += other.count;
+ return this;
+ }
+
+ public double mean()
+ {
+ return count == 0 ? 0 : sum / count;
+ }
+
+ public byte[] toBytes()
+ {
+ ByteBuffer buf = ByteBuffer.allocate(Double.SIZE + Long.SIZE);
+ buf.putDouble(0, sum);
+ buf.putLong(Double.SIZE, count);
+ return buf.array();
+ }
+
+ public static DoubleMeanHolder fromBytes(byte[] data)
+ {
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ return new DoubleMeanHolder(buf.getDouble(0), buf.getLong(Double.SIZE));
+ }
+
+ public static void init(ByteBuffer buf, int position)
+ {
+ writeSum(buf, position, 0d);
+ writeCount(buf, position, 0);
+ }
+
+ public static void update(ByteBuffer buf, int position, double sum)
+ {
+ writeSum(buf, position, getSum(buf, position) + sum);
+ writeCount(buf, position, getCount(buf, position) + 1);
+ }
+
+ public static void update(ByteBuffer buf, int position, DoubleMeanHolder other)
+ {
+ writeSum(buf, position, getSum(buf, position) + other.sum);
+ writeCount(buf, position, getCount(buf, position) + other.count);
+ }
+
+ public static DoubleMeanHolder get(ByteBuffer buf, int position)
+ {
+ return new DoubleMeanHolder(getSum(buf, position), getCount(buf, position));
+ }
+
+ private static void writeSum(ByteBuffer buf, int position, double sum)
+ {
+ buf.putDouble(position, sum);
+ }
+
+ private static double getSum(ByteBuffer buf, int position)
+ {
+ return buf.getDouble(position);
+ }
+
+ private static void writeCount(ByteBuffer buf, int position, long count)
+ {
+ buf.putLong(position + Double.SIZE, count);
+ }
+
+ private static long getCount(ByteBuffer buf, int position)
+ {
+ return buf.getLong(position + Double.SIZE);
+ }
+
+ public static class Serializer extends JsonSerializer<DoubleMeanHolder>
+ {
+ public static final Serializer INSTANCE = new Serializer();
+
+ private Serializer()
+ {
+
+ }
+
+ @Override
+ public void serialize(DoubleMeanHolder obj, JsonGenerator jgen, SerializerProvider provider)
+ throws IOException
+ {
+ jgen.writeBinary(obj.toBytes());
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanVectorAggregator.java
new file mode 100644
index 0000000..0a1a93c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanVectorAggregator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.mean;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class DoubleMeanVectorAggregator implements VectorAggregator
+{
+ private final VectorValueSelector selector;
+
+ public DoubleMeanVectorAggregator(final VectorValueSelector selector)
+ {
+ this.selector = Preconditions.checkNotNull(selector, "selector");
+ }
+
+ @Override
+ public void init(final ByteBuffer buf, final int position)
+ {
+ DoubleMeanHolder.init(buf, position);
+ }
+
+ @Override
+ public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+ {
+ final double[] vector = selector.getDoubleVector();
+ for (int i = startRow; i < endRow; i++) {
+ DoubleMeanHolder.update(buf, position, vector[i]);
+ }
+ }
+
+ @Override
+ public void aggregate(
+ final ByteBuffer buf,
+ final int numRows,
+ final int[] positions,
+ @Nullable final int[] rows,
+ final int positionOffset
+ )
+ {
+ final double[] vector = selector.getDoubleVector();
+
+ for (int i = 0; i < numRows; i++) {
+ final double val = vector[rows != null ? rows[i] : i];
+ DoubleMeanHolder.update(buf, positions[i] + positionOffset, val);
+ }
+ }
+
+ @Override
+ public Object get(final ByteBuffer buf, final int position)
+ {
+ return DoubleMeanHolder.get(buf, position);
+ }
+
+ @Override
+ public void close()
+ {
+ // Nothing to close.
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
index b272ab1..92d9e9e 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
@@ -43,7 +43,7 @@ public class GroupByQueryConfig
private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation";
private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree";
private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = "numParallelCombineThreads";
- private static final String CTX_KEY_VECTORIZE = "vectorize";
+ public static final String CTX_KEY_VECTORIZE = "vectorize";
@JsonProperty
private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2;
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
index 6121281..af936d1 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
@@ -571,7 +571,7 @@ public class AggregationTestHelper implements Closeable
}
}
- public IncrementalIndex createIncrementalIndex(
+ public static IncrementalIndex createIncrementalIndex(
Iterator rows,
InputRowParser parser,
final AggregatorFactory[] metrics,
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregationTest.java
new file mode 100644
index 0000000..a71c6f0
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregationTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.mean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.Result;
+import org.apache.druid.query.aggregation.AggregationTestHelper;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.IncrementalIndexSegment;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.timeline.SegmentId;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collections;
+import java.util.List;
+
+public class DoubleMeanAggregationTest
+{
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private final AggregationTestHelper groupByQueryTestHelper;
+ private final AggregationTestHelper timeseriesQueryTestHelper;
+
+ private final List<Segment> segments;
+
+ public DoubleMeanAggregationTest() throws Exception
+ {
+
+ groupByQueryTestHelper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
+ Collections.EMPTY_LIST,
+ new GroupByQueryConfig(),
+ tempFolder
+ );
+
+ timeseriesQueryTestHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(
+ Collections.EMPTY_LIST,
+ tempFolder
+ );
+
+ segments = ImmutableList.of(
+ new IncrementalIndexSegment(SimpleTestIndex.getIncrementalTestIndex(), SegmentId.dummy("test1")),
+ new QueryableIndexSegment(SimpleTestIndex.getMMappedTestIndex(), SegmentId.dummy("test2"))
+ );
+ }
+
+ @Test
+ public void testBufferAggretatorUsingGroupByQuery() throws Exception
+ {
+ GroupByQuery query = new GroupByQuery.Builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval("1970/2050")
+ .setAggregatorSpecs(
+ new DoubleMeanAggregatorFactory("meanOnDouble", SimpleTestIndex.DOUBLE_COL),
+ new DoubleMeanAggregatorFactory("meanOnString", SimpleTestIndex.SINGLE_VALUE_DOUBLE_AS_STRING_DIM),
+ new DoubleMeanAggregatorFactory("meanOnMultiValue", SimpleTestIndex.MULTI_VALUE_DOUBLE_AS_STRING_DIM)
+ )
+ .build();
+
+ // do json serialization and deserialization of query to ensure there are no serde issues
+ ObjectMapper jsonMapper = groupByQueryTestHelper.getObjectMapper();
+ query = (GroupByQuery) jsonMapper.readValue(jsonMapper.writeValueAsString(query), Query.class);
+
+ Sequence<ResultRow> seq = groupByQueryTestHelper.runQueryOnSegmentsObjs(segments, query);
+ Row result = Iterables.getOnlyElement(seq.toList()).toMapBasedRow(query);
+
+ Assert.assertEquals(6.2d, result.getMetric("meanOnDouble").doubleValue(), 0.0001d);
+ Assert.assertEquals(6.2d, result.getMetric("meanOnString").doubleValue(), 0.0001d);
+ Assert.assertEquals(4.1333d, result.getMetric("meanOnMultiValue").doubleValue(), 0.0001d);
+ }
+
+ @Test
+ public void testVectorAggretatorUsingGroupByQueryOnDoubleColumn() throws Exception
+ {
+ GroupByQuery query = new GroupByQuery.Builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval("1970/2050")
+ .setAggregatorSpecs(
+ new DoubleMeanAggregatorFactory("meanOnDouble", SimpleTestIndex.DOUBLE_COL)
+ )
+ .setContext(Collections.singletonMap(GroupByQueryConfig.CTX_KEY_VECTORIZE, true))
+ .build();
+
+ // do json serialization and deserialization of query to ensure there are no serde issues
+ ObjectMapper jsonMapper = groupByQueryTestHelper.getObjectMapper();
+ query = (GroupByQuery) jsonMapper.readValue(jsonMapper.writeValueAsString(query), Query.class);
+
+ Sequence<ResultRow> seq = groupByQueryTestHelper.runQueryOnSegmentsObjs(segments, query);
+ Row result = Iterables.getOnlyElement(seq.toList()).toMapBasedRow(query);
+
+ Assert.assertEquals(6.2d, result.getMetric("meanOnDouble").doubleValue(), 0.0001d);
+ }
+
+ @Test
+ public void testAggretatorUsingTimeseriesQuery() throws Exception
+ {
+ TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+ .dataSource("test")
+ .granularity(Granularities.ALL)
+ .intervals("1970/2050")
+ .aggregators(
+ new DoubleMeanAggregatorFactory("meanOnDouble", SimpleTestIndex.DOUBLE_COL),
+ new DoubleMeanAggregatorFactory(
+ "meanOnString",
+ SimpleTestIndex.SINGLE_VALUE_DOUBLE_AS_STRING_DIM
+ ),
+ new DoubleMeanAggregatorFactory(
+ "meanOnMultiValue",
+ SimpleTestIndex.MULTI_VALUE_DOUBLE_AS_STRING_DIM
+ )
+ )
+ .build();
+
+ // do json serialization and deserialization of query to ensure there are no serde issues
+ ObjectMapper jsonMapper = timeseriesQueryTestHelper.getObjectMapper();
+ query = (TimeseriesQuery) jsonMapper.readValue(jsonMapper.writeValueAsString(query), Query.class);
+
+ Sequence seq = timeseriesQueryTestHelper.runQueryOnSegmentsObjs(segments, query);
+ TimeseriesResultValue result = ((Result<TimeseriesResultValue>) Iterables.getOnlyElement(seq.toList())).getValue();
+
+ Assert.assertEquals(6.2d, result.getDoubleMetric("meanOnDouble").doubleValue(), 0.0001d);
+ Assert.assertEquals(6.2d, result.getDoubleMetric("meanOnString").doubleValue(), 0.0001d);
+ Assert.assertEquals(4.1333d, result.getDoubleMetric("meanOnMultiValue").doubleValue(), 0.0001d);
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java b/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java
new file mode 100644
index 0000000..786a6fc
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.mean;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.NoopInputRowParser;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregationTestHelper;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.util.List;
+
+public class SimpleTestIndex
+{
+ public static final int NUM_ROWS = 10;
+
+ public static final String SINGLE_VALUE_DOUBLE_AS_STRING_DIM = "singleValueDoubleAsStringDim";
+ public static final String MULTI_VALUE_DOUBLE_AS_STRING_DIM = "multiValueDoubleAsStringDim";
+
+ public static final String DOUBLE_COL = "doubleCol";
+
+ public static final List<String> DIMENSIONS = ImmutableList.of(
+ SINGLE_VALUE_DOUBLE_AS_STRING_DIM,
+ MULTI_VALUE_DOUBLE_AS_STRING_DIM
+ );
+
+ private static Supplier<IncrementalIndex> realtimeIndex = Suppliers.memoize(
+ () -> makeRealtimeIndex()
+ );
+
+ private static Supplier<QueryableIndex> mmappedIndex = Suppliers.memoize(
+ () -> TestIndex.persistRealtimeAndLoadMMapped(realtimeIndex.get())
+ );
+
+
+ public static IncrementalIndex getIncrementalTestIndex()
+ {
+ return realtimeIndex.get();
+ }
+
+ public static QueryableIndex getMMappedTestIndex()
+ {
+ return mmappedIndex.get();
+ }
+
+ private static IncrementalIndex makeRealtimeIndex()
+ {
+ try {
+ List<InputRow> inputRows = Lists.newArrayListWithExpectedSize(NUM_ROWS);
+ for (int i = 1; i <= NUM_ROWS; i++) {
+ double doubleVal = i + 0.7d;
+ String stringVal = String.valueOf(doubleVal);
+
+ inputRows.add(new MapBasedInputRow(
+ DateTime.now(DateTimeZone.UTC),
+ DIMENSIONS,
+ ImmutableMap.of(
+ DOUBLE_COL, doubleVal,
+ SINGLE_VALUE_DOUBLE_AS_STRING_DIM, stringVal,
+ MULTI_VALUE_DOUBLE_AS_STRING_DIM, Lists.newArrayList(stringVal, null, stringVal)
+ )
+ ));
+ }
+
+ return AggregationTestHelper.createIncrementalIndex(
+ inputRows.iterator(),
+ new NoopInputRowParser(null),
+ new AggregatorFactory[]{
+ new CountAggregatorFactory("count"),
+ new DoubleSumAggregatorFactory(DOUBLE_COL, DOUBLE_COL)
+ },
+ 0,
+ Granularities.NONE,
+ false,
+ 100,
+ false
+ );
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org