You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by hi...@apache.org on 2019/09/26 15:04:53 UTC

[incubator-druid] branch master updated: doubleMean aggregator to be used at query time (#8459)

This is an automated email from the ASF dual-hosted git repository.

himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f1f5e1  doubleMean aggregator to be used at query time (#8459)
9f1f5e1 is described below

commit 9f1f5e115c0595e208c0f59e12bf2a5e26ed0c29
Author: Himanshu <g....@gmail.com>
AuthorDate: Thu Sep 26 08:04:33 2019 -0700

    doubleMean aggregator to be used at query time (#8459)
    
    * doubleMean aggregator for computing mean
    
    * make docs
    
    * build fixes
    
    * address review comment: handle null args
---
 .../druid/java/util/common/collect/Utils.java      |   9 +
 docs/querying/aggregations.md                      |   8 +
 .../apache/druid/jackson/AggregatorsModule.java    |   5 +
 .../druid/query/aggregation/AggregatorUtil.java    |   2 +
 .../aggregation/SimpleDoubleBufferAggregator.java  |   2 +-
 .../aggregation/mean/DoubleMeanAggregator.java     |  86 ++++++++++
 .../mean/DoubleMeanAggregatorFactory.java          | 181 +++++++++++++++++++++
 .../mean/DoubleMeanBufferAggregator.java           | 101 ++++++++++++
 .../query/aggregation/mean/DoubleMeanHolder.java   | 136 ++++++++++++++++
 .../mean/DoubleMeanVectorAggregator.java           |  81 +++++++++
 .../druid/query/groupby/GroupByQueryConfig.java    |   2 +-
 .../query/aggregation/AggregationTestHelper.java   |   2 +-
 .../mean/DoubleMeanAggregationTest.java            | 159 ++++++++++++++++++
 .../query/aggregation/mean/SimpleTestIndex.java    | 113 +++++++++++++
 14 files changed, 884 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java b/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java
index de7a239..429660f 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java
@@ -81,4 +81,13 @@ public class Utils
 
     return true;
   }
+
+  public static String safeObjectClassGetName(@Nullable Object o)
+  {
+    if (o == null) {
+      return "NULL";
+    } else {
+      return o.getClass().getName();
+    }
+  }
 }
diff --git a/docs/querying/aggregations.md b/docs/querying/aggregations.md
index 3f63095..4bc7605 100644
--- a/docs/querying/aggregations.md
+++ b/docs/querying/aggregations.md
@@ -120,6 +120,14 @@ Computes and stores the sum of values as 32-bit floating point value. Similar to
 { "type" : "longMax", "name" : <output_name>, "fieldName" : <metric_name> }
 ```
 
+### `doubleMean` aggregator
+
+Computes and returns arithmetic mean of a column values as 64 bit float value. This is a query time aggregator only and should not be used during indexing.
+
+```json
+{ "type" : "doubleMean", "name" : <output_name>, "fieldName" : <metric_name> }
+```
+
 ### First / Last aggregator
 
 (Double/Float/Long) First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries.
diff --git a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
index 6689d27..aba52a0 100644
--- a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
+++ b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
@@ -53,6 +53,8 @@ import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
 import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
 import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
 import org.apache.druid.query.aggregation.last.StringLastFoldingAggregatorFactory;
+import org.apache.druid.query.aggregation.mean.DoubleMeanAggregatorFactory;
+import org.apache.druid.query.aggregation.mean.DoubleMeanHolder;
 import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
 import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
 import org.apache.druid.query.aggregation.post.DoubleGreatestPostAggregator;
@@ -79,6 +81,8 @@ public class AggregatorsModule extends SimpleModule
 
     setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
     setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
+
+    addSerializer(DoubleMeanHolder.class, DoubleMeanHolder.Serializer.INSTANCE);
   }
 
   @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@@ -105,6 +109,7 @@ public class AggregatorsModule extends SimpleModule
       @JsonSubTypes.Type(name = "stringFirstFold", value = StringFirstFoldingAggregatorFactory.class),
       @JsonSubTypes.Type(name = "longLast", value = LongLastAggregatorFactory.class),
       @JsonSubTypes.Type(name = "doubleLast", value = DoubleLastAggregatorFactory.class),
+      @JsonSubTypes.Type(name = "doubleMean", value = DoubleMeanAggregatorFactory.class),
       @JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class),
       @JsonSubTypes.Type(name = "stringLast", value = StringLastAggregatorFactory.class),
       @JsonSubTypes.Type(name = "stringLastFold", value = StringLastFoldingAggregatorFactory.class)
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
index e37f87b..8be5e53 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
@@ -119,6 +119,8 @@ public class AggregatorUtil
   // TDigest sketch aggregators
   public static final byte TDIGEST_BUILD_SKETCH_CACHE_TYPE_ID = 0x38;
 
+  public static final byte MEAN_CACHE_TYPE_ID = 0x41;
+
   /**
    * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
    *
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleBufferAggregator.java
index 29db642..01f3901 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleBufferAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleBufferAggregator.java
@@ -29,7 +29,7 @@ public abstract class SimpleDoubleBufferAggregator implements BufferAggregator
 {
   final BaseDoubleColumnValueSelector selector;
 
-  SimpleDoubleBufferAggregator(BaseDoubleColumnValueSelector selector)
+  public SimpleDoubleBufferAggregator(BaseDoubleColumnValueSelector selector)
   {
     this.selector = selector;
   }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregator.java
new file mode 100644
index 0000000..a4f7b3d
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.mean;
+
+import org.apache.druid.java.util.common.Numbers;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import java.util.List;
+
+/**
+ */
+public class DoubleMeanAggregator implements Aggregator
+{
+  private final ColumnValueSelector selector;
+
+  private final DoubleMeanHolder value = new DoubleMeanHolder(0, 0);
+
+  public DoubleMeanAggregator(ColumnValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void aggregate()
+  {
+    Object update = selector.getObject();
+
+    if (update instanceof DoubleMeanHolder) {
+      value.update((DoubleMeanHolder) update);
+    } else if (update instanceof List) {
+      for (Object o : (List) update) {
+        value.update(Numbers.tryParseDouble(o, 0));
+      }
+    } else {
+      value.update(Numbers.tryParseDouble(update, 0));
+    }
+  }
+
+  @Override
+  public Object get()
+  {
+    return value;
+  }
+
+  @Override
+  public float getFloat()
+  {
+    throw new UnsupportedOperationException("not supported");
+  }
+
+  @Override
+  public long getLong()
+  {
+    throw new UnsupportedOperationException("not supported");
+  }
+
+  @Override
+  public double getDouble()
+  {
+    throw new UnsupportedOperationException("not supported");
+  }
+
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregatorFactory.java
new file mode 100644
index 0000000..5ed87be
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregatorFactory.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.mean;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.collect.Utils;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.AggregatorUtil;
+import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ */
+public class DoubleMeanAggregatorFactory extends AggregatorFactory
+{
+  private final String name;
+  private final String fieldName;
+
+  @JsonCreator
+  public DoubleMeanAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName
+  )
+  {
+    this.name = Preconditions.checkNotNull(name, "null name");
+    this.fieldName = Preconditions.checkNotNull(fieldName, "null fieldName");
+  }
+
+  @Override
+  @JsonProperty
+  public String getName()
+  {
+    return name;
+  }
+
+  @JsonProperty
+  public String getFieldName()
+  {
+    return fieldName;
+  }
+
+  @Override
+  public List<String> requiredFields()
+  {
+    return Collections.singletonList(fieldName);
+  }
+
+  @Override
+  public String getTypeName()
+  {
+    return "doubleMean";
+  }
+
+  @Override
+  public int getMaxIntermediateSize()
+  {
+    return DoubleMeanHolder.MAX_INTERMEDIATE_SIZE;
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    return new DoubleMeanAggregator(metricFactory.makeColumnValueSelector(fieldName));
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
+  {
+    return new DoubleMeanBufferAggregator(metricFactory.makeColumnValueSelector(fieldName));
+  }
+
+  @Override
+  public VectorAggregator factorizeVector(final VectorColumnSelectorFactory selectorFactory)
+  {
+    return new DoubleMeanVectorAggregator(selectorFactory.makeValueSelector(fieldName));
+  }
+
+  @Override
+  public boolean canVectorize()
+  {
+    return true;
+  }
+
+  @Override
+  public Comparator getComparator()
+  {
+    return DoubleMeanHolder.COMPARATOR;
+  }
+
+  @Nullable
+  @Override
+  public Object combine(@Nullable Object lhs, @Nullable Object rhs)
+  {
+    if (lhs instanceof DoubleMeanHolder && rhs instanceof DoubleMeanHolder) {
+      return ((DoubleMeanHolder) lhs).update((DoubleMeanHolder) rhs);
+    } else {
+      throw new IAE(
+          "lhs[%s] or rhs[%s] not of type [%s]",
+          Utils.safeObjectClassGetName(lhs),
+          Utils.safeObjectClassGetName(rhs),
+          DoubleMeanHolder.class.getName()
+      );
+    }
+  }
+
+  @Override
+  public AggregatorFactory getCombiningFactory()
+  {
+    return new DoubleMeanAggregatorFactory(name, name);
+  }
+
+  @Override
+  public List<AggregatorFactory> getRequiredColumns()
+  {
+    return Collections.singletonList(new DoubleMeanAggregatorFactory(fieldName, fieldName));
+  }
+
+  @Override
+  public Object deserialize(Object object)
+  {
+    if (object instanceof String) {
+      return DoubleMeanHolder.fromBytes(StringUtils.decodeBase64(StringUtils.toUtf8((String) object)));
+    } else if (object instanceof DoubleMeanHolder) {
+      return object;
+    } else {
+      throw new IAE("Unknown object type [%s]", Utils.safeObjectClassGetName(object));
+    }
+  }
+
+  @Nullable
+  @Override
+  public Object finalizeComputation(@Nullable Object object)
+  {
+    if (object instanceof DoubleMeanHolder) {
+      return ((DoubleMeanHolder) object).mean();
+    } else if (object == null) {
+      return null;
+    } else {
+      throw new IAE("Unknown object type [%s]", object.getClass().getName());
+    }
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    return new CacheKeyBuilder(AggregatorUtil.MEAN_CACHE_TYPE_ID)
+        .appendString(name)
+        .appendString(fieldName)
+        .build();
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanBufferAggregator.java
new file mode 100644
index 0000000..5ff7f81
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanBufferAggregator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.mean;
+
+import org.apache.druid.java.util.common.Numbers;
+import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ */
+public class DoubleMeanBufferAggregator implements BufferAggregator
+{
+
+  private final ColumnValueSelector selector;
+
+  public DoubleMeanBufferAggregator(ColumnValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    DoubleMeanHolder.init(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    Object update = selector.getObject();
+
+    if (update instanceof DoubleMeanHolder) {
+      DoubleMeanHolder.update(buf, position, (DoubleMeanHolder) update);
+    } else if (update instanceof List) {
+      for (Object o : (List) update) {
+        DoubleMeanHolder.update(buf, position, Numbers.tryParseDouble(o, 0));
+      }
+    } else {
+      DoubleMeanHolder.update(buf, position, Numbers.tryParseDouble(update, 0));
+    }
+  }
+
+  @Nullable
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    return DoubleMeanHolder.get(buf, position);
+  }
+
+  @Override
+  public float getFloat(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("not supported");
+  }
+
+  @Override
+  public long getLong(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("not supported");
+  }
+
+  @Override
+  public double getDouble(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("not supported");
+  }
+
+  @Override
+  public void close()
+  {
+
+  }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+    inspector.visit("selector", selector);
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanHolder.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanHolder.java
new file mode 100644
index 0000000..f42c993
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanHolder.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.mean;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.google.common.primitives.Doubles;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+public class DoubleMeanHolder
+{
+  public static final int MAX_INTERMEDIATE_SIZE = Long.SIZE + Double.SIZE;
+  public static final Comparator<DoubleMeanHolder> COMPARATOR = (o1, o2) -> Doubles.compare(o1.mean(), o2.mean());
+
+  private double sum;
+  private long count;
+
+  public DoubleMeanHolder(double sum, long count)
+  {
+    this.sum = sum;
+    this.count = count;
+  }
+
+  public void update(double sum)
+  {
+    this.sum += sum;
+    count++;
+  }
+
+  public DoubleMeanHolder update(DoubleMeanHolder other)
+  {
+    sum += other.sum;
+    count += other.count;
+    return this;
+  }
+
+  public double mean()
+  {
+    return count == 0 ? 0 : sum / count;
+  }
+
+  public byte[] toBytes()
+  {
+    ByteBuffer buf = ByteBuffer.allocate(Double.SIZE + Long.SIZE);
+    buf.putDouble(0, sum);
+    buf.putLong(Double.SIZE, count);
+    return buf.array();
+  }
+
+  public static DoubleMeanHolder fromBytes(byte[] data)
+  {
+    ByteBuffer buf = ByteBuffer.wrap(data);
+    return new DoubleMeanHolder(buf.getDouble(0), buf.getLong(Double.SIZE));
+  }
+
+  public static void init(ByteBuffer buf, int position)
+  {
+    writeSum(buf, position, 0d);
+    writeCount(buf, position, 0);
+  }
+
+  public static void update(ByteBuffer buf, int position, double sum)
+  {
+    writeSum(buf, position, getSum(buf, position) + sum);
+    writeCount(buf, position, getCount(buf, position) + 1);
+  }
+
+  public static void update(ByteBuffer buf, int position, DoubleMeanHolder other)
+  {
+    writeSum(buf, position, getSum(buf, position) + other.sum);
+    writeCount(buf, position, getCount(buf, position) + other.count);
+  }
+
+  public static DoubleMeanHolder get(ByteBuffer buf, int position)
+  {
+    return new DoubleMeanHolder(getSum(buf, position), getCount(buf, position));
+  }
+
+  private static void writeSum(ByteBuffer buf, int position, double sum)
+  {
+    buf.putDouble(position, sum);
+  }
+
+  private static double getSum(ByteBuffer buf, int position)
+  {
+    return buf.getDouble(position);
+  }
+
+  private static void writeCount(ByteBuffer buf, int position, long count)
+  {
+    buf.putLong(position + Double.SIZE, count);
+  }
+
+  private static long getCount(ByteBuffer buf, int position)
+  {
+    return buf.getLong(position + Double.SIZE);
+  }
+
+  public static class Serializer extends JsonSerializer<DoubleMeanHolder>
+  {
+    public static final Serializer INSTANCE = new Serializer();
+
+    private Serializer()
+    {
+
+    }
+
+    @Override
+    public void serialize(DoubleMeanHolder obj, JsonGenerator jgen, SerializerProvider provider)
+        throws IOException
+    {
+      jgen.writeBinary(obj.toBytes());
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanVectorAggregator.java
new file mode 100644
index 0000000..0a1a93c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/mean/DoubleMeanVectorAggregator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.mean;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class DoubleMeanVectorAggregator implements VectorAggregator
+{
+  private final VectorValueSelector selector;
+
+  public DoubleMeanVectorAggregator(final VectorValueSelector selector)
+  {
+    this.selector = Preconditions.checkNotNull(selector, "selector");
+  }
+
+  @Override
+  public void init(final ByteBuffer buf, final int position)
+  {
+    DoubleMeanHolder.init(buf, position);
+  }
+
+  @Override
+  public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+  {
+    final double[] vector = selector.getDoubleVector();
+    for (int i = startRow; i < endRow; i++) {
+      DoubleMeanHolder.update(buf, position, vector[i]);
+    }
+  }
+
+  @Override
+  public void aggregate(
+      final ByteBuffer buf,
+      final int numRows,
+      final int[] positions,
+      @Nullable final int[] rows,
+      final int positionOffset
+  )
+  {
+    final double[] vector = selector.getDoubleVector();
+
+    for (int i = 0; i < numRows; i++) {
+      final double val = vector[rows != null ? rows[i] : i];
+      DoubleMeanHolder.update(buf, positions[i] + positionOffset, val);
+    }
+  }
+
+  @Override
+  public Object get(final ByteBuffer buf, final int position)
+  {
+    return DoubleMeanHolder.get(buf, position);
+  }
+
+  @Override
+  public void close()
+  {
+    // Nothing to close.
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
index b272ab1..92d9e9e 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
@@ -43,7 +43,7 @@ public class GroupByQueryConfig
   private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation";
   private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree";
   private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = "numParallelCombineThreads";
-  private static final String CTX_KEY_VECTORIZE = "vectorize";
+  public static final String CTX_KEY_VECTORIZE = "vectorize";
 
   @JsonProperty
   private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2;
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
index 6121281..af936d1 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
@@ -571,7 +571,7 @@ public class AggregationTestHelper implements Closeable
     }
   }
 
-  public IncrementalIndex createIncrementalIndex(
+  public static IncrementalIndex createIncrementalIndex(
       Iterator rows,
       InputRowParser parser,
       final AggregatorFactory[] metrics,
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregationTest.java
new file mode 100644
index 0000000..a71c6f0
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/mean/DoubleMeanAggregationTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.mean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.Result;
+import org.apache.druid.query.aggregation.AggregationTestHelper;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.IncrementalIndexSegment;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.timeline.SegmentId;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collections;
+import java.util.List;
+
+public class DoubleMeanAggregationTest
+{
+  @Rule
+  public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private final AggregationTestHelper groupByQueryTestHelper;
+  private final AggregationTestHelper timeseriesQueryTestHelper;
+
+  private final List<Segment> segments;
+
+  public DoubleMeanAggregationTest() throws Exception
+  {
+
+    groupByQueryTestHelper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
+        Collections.EMPTY_LIST,
+        new GroupByQueryConfig(),
+        tempFolder
+    );
+
+    timeseriesQueryTestHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(
+        Collections.EMPTY_LIST,
+        tempFolder
+    );
+
+    segments = ImmutableList.of(
+        new IncrementalIndexSegment(SimpleTestIndex.getIncrementalTestIndex(), SegmentId.dummy("test1")),
+        new QueryableIndexSegment(SimpleTestIndex.getMMappedTestIndex(), SegmentId.dummy("test2"))
+    );
+  }
+
+  @Test
+  public void testBufferAggretatorUsingGroupByQuery() throws Exception
+  {
+    GroupByQuery query = new GroupByQuery.Builder()
+        .setDataSource("test")
+        .setGranularity(Granularities.ALL)
+        .setInterval("1970/2050")
+        .setAggregatorSpecs(
+            new DoubleMeanAggregatorFactory("meanOnDouble", SimpleTestIndex.DOUBLE_COL),
+            new DoubleMeanAggregatorFactory("meanOnString", SimpleTestIndex.SINGLE_VALUE_DOUBLE_AS_STRING_DIM),
+            new DoubleMeanAggregatorFactory("meanOnMultiValue", SimpleTestIndex.MULTI_VALUE_DOUBLE_AS_STRING_DIM)
+        )
+        .build();
+
+    // do json serialization and deserialization of query to ensure there are no serde issues
+    ObjectMapper jsonMapper = groupByQueryTestHelper.getObjectMapper();
+    query = (GroupByQuery) jsonMapper.readValue(jsonMapper.writeValueAsString(query), Query.class);
+
+    Sequence<ResultRow> seq = groupByQueryTestHelper.runQueryOnSegmentsObjs(segments, query);
+    Row result = Iterables.getOnlyElement(seq.toList()).toMapBasedRow(query);
+
+    Assert.assertEquals(6.2d, result.getMetric("meanOnDouble").doubleValue(), 0.0001d);
+    Assert.assertEquals(6.2d, result.getMetric("meanOnString").doubleValue(), 0.0001d);
+    Assert.assertEquals(4.1333d, result.getMetric("meanOnMultiValue").doubleValue(), 0.0001d);
+  }
+
+  @Test
+  public void testVectorAggretatorUsingGroupByQueryOnDoubleColumn() throws Exception
+  {
+    GroupByQuery query = new GroupByQuery.Builder()
+        .setDataSource("test")
+        .setGranularity(Granularities.ALL)
+        .setInterval("1970/2050")
+        .setAggregatorSpecs(
+            new DoubleMeanAggregatorFactory("meanOnDouble", SimpleTestIndex.DOUBLE_COL)
+        )
+        .setContext(Collections.singletonMap(GroupByQueryConfig.CTX_KEY_VECTORIZE, true))
+        .build();
+
+    // do json serialization and deserialization of query to ensure there are no serde issues
+    ObjectMapper jsonMapper = groupByQueryTestHelper.getObjectMapper();
+    query = (GroupByQuery) jsonMapper.readValue(jsonMapper.writeValueAsString(query), Query.class);
+
+    Sequence<ResultRow> seq = groupByQueryTestHelper.runQueryOnSegmentsObjs(segments, query);
+    Row result = Iterables.getOnlyElement(seq.toList()).toMapBasedRow(query);
+
+    Assert.assertEquals(6.2d, result.getMetric("meanOnDouble").doubleValue(), 0.0001d);
+  }
+
+  @Test
+  public void testAggretatorUsingTimeseriesQuery() throws Exception
+  {
+    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                  .dataSource("test")
+                                  .granularity(Granularities.ALL)
+                                  .intervals("1970/2050")
+                                  .aggregators(
+                                      new DoubleMeanAggregatorFactory("meanOnDouble", SimpleTestIndex.DOUBLE_COL),
+                                      new DoubleMeanAggregatorFactory(
+                                          "meanOnString",
+                                          SimpleTestIndex.SINGLE_VALUE_DOUBLE_AS_STRING_DIM
+                                      ),
+                                      new DoubleMeanAggregatorFactory(
+                                          "meanOnMultiValue",
+                                          SimpleTestIndex.MULTI_VALUE_DOUBLE_AS_STRING_DIM
+                                      )
+                                  )
+                                  .build();
+
+    // do json serialization and deserialization of query to ensure there are no serde issues
+    ObjectMapper jsonMapper = timeseriesQueryTestHelper.getObjectMapper();
+    query = (TimeseriesQuery) jsonMapper.readValue(jsonMapper.writeValueAsString(query), Query.class);
+
+    Sequence seq = timeseriesQueryTestHelper.runQueryOnSegmentsObjs(segments, query);
+    TimeseriesResultValue result = ((Result<TimeseriesResultValue>) Iterables.getOnlyElement(seq.toList())).getValue();
+
+    Assert.assertEquals(6.2d, result.getDoubleMetric("meanOnDouble").doubleValue(), 0.0001d);
+    Assert.assertEquals(6.2d, result.getDoubleMetric("meanOnString").doubleValue(), 0.0001d);
+    Assert.assertEquals(4.1333d, result.getDoubleMetric("meanOnMultiValue").doubleValue(), 0.0001d);
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java b/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java
new file mode 100644
index 0000000..786a6fc
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.mean;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.NoopInputRowParser;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregationTestHelper;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.util.List;
+
+public class SimpleTestIndex
+{
+  public static final int NUM_ROWS = 10;
+
+  public static final String SINGLE_VALUE_DOUBLE_AS_STRING_DIM = "singleValueDoubleAsStringDim";
+  public static final String MULTI_VALUE_DOUBLE_AS_STRING_DIM = "multiValueDoubleAsStringDim";
+
+  public static final String DOUBLE_COL = "doubleCol";
+
+  public static final List<String> DIMENSIONS = ImmutableList.of(
+      SINGLE_VALUE_DOUBLE_AS_STRING_DIM,
+      MULTI_VALUE_DOUBLE_AS_STRING_DIM
+  );
+
+  private static Supplier<IncrementalIndex> realtimeIndex = Suppliers.memoize(
+      () -> makeRealtimeIndex()
+  );
+
+  private static Supplier<QueryableIndex> mmappedIndex = Suppliers.memoize(
+      () -> TestIndex.persistRealtimeAndLoadMMapped(realtimeIndex.get())
+  );
+
+
+  public static IncrementalIndex getIncrementalTestIndex()
+  {
+    return realtimeIndex.get();
+  }
+
+  public static QueryableIndex getMMappedTestIndex()
+  {
+    return mmappedIndex.get();
+  }
+
+  private static IncrementalIndex makeRealtimeIndex()
+  {
+    try {
+      List<InputRow> inputRows = Lists.newArrayListWithExpectedSize(NUM_ROWS);
+      for (int i = 1; i <= NUM_ROWS; i++) {
+        double doubleVal = i + 0.7d;
+        String stringVal = String.valueOf(doubleVal);
+
+        inputRows.add(new MapBasedInputRow(
+            DateTime.now(DateTimeZone.UTC),
+            DIMENSIONS,
+            ImmutableMap.of(
+                DOUBLE_COL, doubleVal,
+                SINGLE_VALUE_DOUBLE_AS_STRING_DIM, stringVal,
+                MULTI_VALUE_DOUBLE_AS_STRING_DIM, Lists.newArrayList(stringVal, null, stringVal)
+            )
+        ));
+      }
+
+      return AggregationTestHelper.createIncrementalIndex(
+          inputRows.iterator(),
+          new NoopInputRowParser(null),
+          new AggregatorFactory[]{
+              new CountAggregatorFactory("count"),
+              new DoubleSumAggregatorFactory(DOUBLE_COL, DOUBLE_COL)
+          },
+          0,
+          Granularities.NONE,
+          false,
+          100,
+          false
+      );
+    }
+    catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org