You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/12/08 22:31:58 UTC

[pinot] 01/01: Revert "Add Variance and Standard Deviation Aggregation Functions (#9910)"

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch revert-9910-add-variance-function
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 48bed903895c6e75a7f5e031c2d4ff2b20f5788b
Author: Seunghyun Lee <sn...@apache.org>
AuthorDate: Thu Dec 8 14:31:51 2022 -0800

    Revert "Add Variance and Standard Deviation Aggregation Functions (#9910)"
    
    This reverts commit 852477bdf312607c9a0eba7278453b7957139466.
---
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  26 +-
 .../function/AggregationFunctionFactory.java       |   8 -
 .../function/CovarianceAggregationFunction.java    |  31 +-
 .../function/VarianceAggregationFunction.java      | 188 ------
 .../utils/StatisticalAggregationFunctionUtils.java |  53 --
 .../org/apache/pinot/queries/BaseQueriesTest.java  |  16 +-
 .../pinot/queries/CovarianceQueriesTest.java       | 461 +++++++++++++
 .../pinot/queries/StatisticalQueriesTest.java      | 749 ---------------------
 .../segment/local/customobject/VarianceTuple.java  | 105 ---
 .../pinot/segment/spi/AggregationFunctionType.java |   4 -
 10 files changed, 495 insertions(+), 1146 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 6fbacc5fcd..e22a2e7fb0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -72,7 +72,6 @@ import org.apache.pinot.segment.local.customobject.LongLongPair;
 import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
 import org.apache.pinot.segment.local.customobject.QuantileDigest;
 import org.apache.pinot.segment.local.customobject.StringLongPair;
-import org.apache.pinot.segment.local.customobject.VarianceTuple;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -124,8 +123,7 @@ public class ObjectSerDeUtils {
     FloatLongPair(29),
     DoubleLongPair(30),
     StringLongPair(31),
-    CovarianceTuple(32),
-    VarianceTuple(33);
+    CovarianceTuple(32);
 
     private final int _value;
 
@@ -207,8 +205,6 @@ public class ObjectSerDeUtils {
         return ObjectType.StringLongPair;
       } else if (value instanceof CovarianceTuple) {
         return ObjectType.CovarianceTuple;
-      } else if (value instanceof VarianceTuple) {
-        return ObjectType.VarianceTuple;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
       }
@@ -466,23 +462,6 @@ public class ObjectSerDeUtils {
     }
   };
 
-  public static final ObjectSerDe<VarianceTuple> VARIANCE_TUPLE_OBJECT_SER_DE = new ObjectSerDe<VarianceTuple>() {
-    @Override
-    public byte[] serialize(VarianceTuple varianceTuple) {
-      return varianceTuple.toBytes();
-    }
-
-    @Override
-    public VarianceTuple deserialize(byte[] bytes) {
-      return VarianceTuple.fromBytes(bytes);
-    }
-
-    @Override
-    public VarianceTuple deserialize(ByteBuffer byteBuffer) {
-      return VarianceTuple.fromByteBuffer(byteBuffer);
-    }
-  };
-
   public static final ObjectSerDe<HyperLogLog> HYPER_LOG_LOG_SER_DE = new ObjectSerDe<HyperLogLog>() {
 
     @Override
@@ -1212,8 +1191,7 @@ public class ObjectSerDeUtils {
       FLOAT_LONG_PAIR_SER_DE,
       DOUBLE_LONG_PAIR_SER_DE,
       STRING_LONG_PAIR_SER_DE,
-      COVARIANCE_TUPLE_OBJECT_SER_DE,
-      VARIANCE_TUPLE_OBJECT_SER_DE
+      COVARIANCE_TUPLE_OBJECT_SER_DE
   };
   //@formatter:on
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 2758d193a2..4e1bda2025 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -277,14 +277,6 @@ public class AggregationFunctionFactory {
             return new BooleanAndAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
           case BOOLOR:
             return new BooleanOrAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
-          case VARPOP:
-            return new VarianceAggregationFunction(firstArgument, false, false);
-          case VARSAMP:
-            return new VarianceAggregationFunction(firstArgument, true, false);
-          case STDDEVPOP:
-            return new VarianceAggregationFunction(firstArgument, false, true);
-          case STDDEVSAMP:
-            return new VarianceAggregationFunction(firstArgument, true, true);
           default:
             throw new IllegalArgumentException();
         }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CovarianceAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CovarianceAggregationFunction.java
index 1676835bae..bd68235c48 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CovarianceAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CovarianceAggregationFunction.java
@@ -19,6 +19,7 @@
 
 package org.apache.pinot.core.query.aggregation.function;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -29,7 +30,6 @@ import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
-import org.apache.pinot.core.query.aggregation.utils.StatisticalAggregationFunctionUtils;
 import org.apache.pinot.segment.local.customobject.CovarianceTuple;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
@@ -99,8 +99,8 @@ public class CovarianceAggregationFunction implements AggregationFunction<Covari
   @Override
   public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    double[] values1 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression1);
-    double[] values2 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression2);
+    double[] values1 = getValSet(blockValSetMap, _expression1);
+    double[] values2 = getValSet(blockValSetMap, _expression2);
 
     double sumX = 0.0;
     double sumY = 0.0;
@@ -134,11 +134,28 @@ public class CovarianceAggregationFunction implements AggregationFunction<Covari
     }
   }
 
+  private double[] getValSet(Map<ExpressionContext, BlockValSet> blockValSetMap, ExpressionContext expression) {
+    BlockValSet blockValSet = blockValSetMap.get(expression);
+    //TODO: Add MV support for covariance
+    Preconditions.checkState(blockValSet.isSingleValue(),
+        "Covariance function currently only supports single-valued column");
+    switch (blockValSet.getValueType().getStoredType()) {
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+        return blockValSet.getDoubleValuesSV();
+      default:
+        throw new IllegalStateException(
+            "Cannot compute covariance for non-numeric type: " + blockValSet.getValueType());
+    }
+  }
+
   @Override
   public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    double[] values1 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression1);
-    double[] values2 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression2);
+    double[] values1 = getValSet(blockValSetMap, _expression1);
+    double[] values2 = getValSet(blockValSetMap, _expression2);
     for (int i = 0; i < length; i++) {
       setGroupByResult(groupKeyArray[i], groupByResultHolder, values1[i], values2[i], values1[i] * values2[i], 1L);
     }
@@ -147,8 +164,8 @@ public class CovarianceAggregationFunction implements AggregationFunction<Covari
   @Override
   public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    double[] values1 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression1);
-    double[] values2 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression2);
+    double[] values1 = getValSet(blockValSetMap, _expression1);
+    double[] values2 = getValSet(blockValSetMap, _expression2);
     for (int i = 0; i < length; i++) {
       for (int groupKey : groupKeysArray[i]) {
         setGroupByResult(groupKey, groupByResultHolder, values1[i], values2[i], values1[i] * values2[i], 1L);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/VarianceAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/VarianceAggregationFunction.java
deleted file mode 100644
index c86269b7ce..0000000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/VarianceAggregationFunction.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.aggregation.function;
-
-import java.util.Map;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
-import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
-import org.apache.pinot.core.query.aggregation.utils.StatisticalAggregationFunctionUtils;
-import org.apache.pinot.segment.local.customobject.VarianceTuple;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
-
-
-/**
- * Aggregation function which computes Variance and Standard Deviation
- *
- * The algorithm to compute variance is based on "Updating Formulae and a Pairwise Algorithm for Computing
- * Sample Variances" by Chan et al. Please refer to the "Parallel Algorithm" section from the following wiki:
- * - https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
- */
-public class VarianceAggregationFunction extends BaseSingleInputAggregationFunction<VarianceTuple, Double> {
-  private static final double DEFAULT_FINAL_RESULT = Double.NEGATIVE_INFINITY;
-  protected final boolean _isSample;
-
-  protected final boolean _isStdDev;
-
-  public VarianceAggregationFunction(ExpressionContext expression, boolean isSample, boolean isStdDev) {
-    super(expression);
-    _isSample = isSample;
-    _isStdDev = isStdDev;
-  }
-
-  @Override
-  public AggregationFunctionType getType() {
-    if (_isSample) {
-      return (_isStdDev) ? AggregationFunctionType.STDDEVSAMP : AggregationFunctionType.VARSAMP;
-    }
-    return (_isStdDev) ? AggregationFunctionType.STDDEVPOP : AggregationFunctionType.VARPOP;
-  }
-
-  @Override
-  public AggregationResultHolder createAggregationResultHolder() {
-    return new ObjectAggregationResultHolder();
-  }
-
-  @Override
-  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
-    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
-  }
-
-  @Override
-  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
-
-    long count = 0;
-    double sum = 0.0;
-    double variance = 0.0;
-    for (int i = 0; i < length; i++) {
-      count++;
-      sum += values[i];
-      if (count > 1) {
-        variance = computeIntermediateVariance(count, sum, variance, values[i]);
-      }
-    }
-    setAggregationResult(aggregationResultHolder, length, sum, variance);
-  }
-
-  private double computeIntermediateVariance(long count, double sum, double m2, double value) {
-    double t = count * value - sum;
-    m2 += (t * t) / (count * (count - 1));
-    return m2;
-  }
-
-  protected void setAggregationResult(AggregationResultHolder aggregationResultHolder, long count, double sum,
-      double m2) {
-    VarianceTuple varianceTuple = aggregationResultHolder.getResult();
-    if (varianceTuple == null) {
-      aggregationResultHolder.setValue(new VarianceTuple(count, sum, m2));
-    } else {
-      varianceTuple.apply(count, sum, m2);
-    }
-  }
-
-  protected void setGroupByResult(int groupKey, GroupByResultHolder groupByResultHolder, long count, double sum,
-      double m2) {
-    VarianceTuple varianceTuple = groupByResultHolder.getResult(groupKey);
-    if (varianceTuple == null) {
-      groupByResultHolder.setValueForKey(groupKey, new VarianceTuple(count, sum, m2));
-    } else {
-      varianceTuple.apply(count, sum, m2);
-    }
-  }
-
-  @Override
-  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
-    for (int i = 0; i < length; i++) {
-      setGroupByResult(groupKeyArray[i], groupByResultHolder, 1L, values[i], 0.0);
-    }
-  }
-
-  @Override
-  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
-    for (int i = 0; i < length; i++) {
-      for (int groupKey : groupKeysArray[i]) {
-        setGroupByResult(groupKey, groupByResultHolder, 1L, values[i], 0.0);
-      }
-    }
-  }
-
-  @Override
-  public VarianceTuple extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
-    VarianceTuple varianceTuple = aggregationResultHolder.getResult();
-    if (varianceTuple == null) {
-      return new VarianceTuple(0L, 0.0, 0.0);
-    } else {
-      return varianceTuple;
-    }
-  }
-
-  @Override
-  public VarianceTuple extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
-    return groupByResultHolder.getResult(groupKey);
-  }
-
-  @Override
-  public VarianceTuple merge(VarianceTuple intermediateResult1, VarianceTuple intermediateResult2) {
-    intermediateResult1.apply(intermediateResult2);
-    return intermediateResult1;
-  }
-
-  @Override
-  public DataSchema.ColumnDataType getIntermediateResultColumnType() {
-    return DataSchema.ColumnDataType.OBJECT;
-  }
-
-  @Override
-  public DataSchema.ColumnDataType getFinalResultColumnType() {
-    return DataSchema.ColumnDataType.DOUBLE;
-  }
-
-  @Override
-  public Double extractFinalResult(VarianceTuple varianceTuple) {
-    if (varianceTuple == null) {
-      return null;
-    }
-    long count = varianceTuple.getCount();
-    if (count == 0L) {
-      return DEFAULT_FINAL_RESULT;
-    } else {
-      double variance = varianceTuple.getM2();
-      if (_isSample) {
-        if (count - 1 == 0L) {
-          return DEFAULT_FINAL_RESULT;
-        }
-        double sampleVar = variance / (count - 1);
-        return (_isStdDev) ? Math.sqrt(sampleVar) : sampleVar;
-      } else {
-        double popVar = variance / count;
-        return (_isStdDev) ? Math.sqrt(popVar) : popVar;
-      }
-    }
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/StatisticalAggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/StatisticalAggregationFunctionUtils.java
deleted file mode 100644
index b7a05de3a4..0000000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/StatisticalAggregationFunctionUtils.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.aggregation.utils;
-
-import com.google.common.base.Preconditions;
-import java.util.Map;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-
-
-/**
- * Util class for statistical aggregation functions
- *
- * e.g. Variance, Covariance, Standard Deviation...
- */
-public class StatisticalAggregationFunctionUtils {
-  private StatisticalAggregationFunctionUtils() {
-  }
-
-  public static double[] getValSet(Map<ExpressionContext, BlockValSet> blockValSetMap, ExpressionContext expression) {
-    BlockValSet blockValSet = blockValSetMap.get(expression);
-    //TODO: Add MV support for covariance
-    Preconditions.checkState(blockValSet.isSingleValue(),
-        "Variance, Covariance, Standard Deviation function currently only supports single-valued column");
-    switch (blockValSet.getValueType().getStoredType()) {
-      case INT:
-      case LONG:
-      case FLOAT:
-      case DOUBLE:
-        return blockValSet.getDoubleValuesSV();
-      default:
-        throw new IllegalStateException(
-            "Cannot compute variance, covariance, or standard deviation for non-numeric type: "
-                + blockValSet.getValueType());
-    }
-  }
-}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index fa5a5068df..f359987780 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -111,7 +111,7 @@ public abstract class BaseQueriesTest {
    * In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with
    * different index segments in the test and overriding getDistinctInstances.
    * This can be particularly useful to test statistical aggregation functions.
-   * @see StatisticalQueriesTest for an example use case.
+   * @see CovarianceQueriesTest for an example use case.
    */
   protected BrokerResponseNative getBrokerResponse(String query) {
     return getBrokerResponse(query, PLAN_MAKER);
@@ -125,7 +125,7 @@ public abstract class BaseQueriesTest {
    * In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with
    * different index segments in the test and overriding getDistinctInstances.
    * This can be particularly useful to test statistical aggregation functions.
-   * @see StatisticalQueriesTest for an example use case.
+   * @see CovarianceQueriesTest for an example use case.
    */
   protected BrokerResponseNative getBrokerResponseWithFilter(String query) {
     return getBrokerResponse(query + getFilter());
@@ -139,7 +139,7 @@ public abstract class BaseQueriesTest {
    * In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with
    * different index segments in the test and overriding getDistinctInstances.
    * This can be particularly useful to test statistical aggregation functions.
-   * @see StatisticalQueriesTest for an example use case.
+   * @see CovarianceQueriesTest for an example use case.
    */
   protected BrokerResponseNative getBrokerResponse(String query, PlanMaker planMaker) {
     return getBrokerResponse(query, planMaker, null);
@@ -153,7 +153,7 @@ public abstract class BaseQueriesTest {
    * In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with
    * different index segments in the test and overriding getDistinctInstances.
    * This can be particularly useful to test statistical aggregation functions.
-   * @see StatisticalQueriesTest for an example use case.
+   * @see CovarianceQueriesTest for an example use case.
    */
   protected BrokerResponseNative getBrokerResponse(String query, @Nullable Map<String, String> extraQueryOptions) {
     return getBrokerResponse(query, PLAN_MAKER, extraQueryOptions);
@@ -167,7 +167,7 @@ public abstract class BaseQueriesTest {
    * In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with
    * different index segments in the test and overriding getDistinctInstances.
    * This can be particularly useful to test statistical aggregation functions.
-   * @see StatisticalQueriesTest for an example use case.
+   * @see CovarianceQueriesTest for an example use case.
    */
   private BrokerResponseNative getBrokerResponse(String query, PlanMaker planMaker,
       @Nullable Map<String, String> extraQueryOptions) {
@@ -191,7 +191,7 @@ public abstract class BaseQueriesTest {
    * In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with
    * different index segments in the test and overriding getDistinctInstances.
    * This can be particularly useful to test statistical aggregation functions.
-   * @see StatisticalQueriesTest for an example use case.
+   * @see CovarianceQueriesTest for an example use case.
    */
   private BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery, PlanMaker planMaker) {
     PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
@@ -249,7 +249,7 @@ public abstract class BaseQueriesTest {
    * In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with
    * different index segments in the test and overriding getDistinctInstances.
    * This can be particularly useful to test statistical aggregation functions.
-   * @see StatisticalQueriesTest for an example use case.
+   * @see CovarianceQueriesTest for an example use case.
    */
   protected BrokerResponseNative getBrokerResponseForOptimizedQuery(String query, @Nullable TableConfig config,
       @Nullable Schema schema) {
@@ -286,7 +286,7 @@ public abstract class BaseQueriesTest {
    * The caller of this function should handle initializing 2 instances with different index segments in the test and
    * overriding getDistinctInstances.
    * This can be particularly useful to test statistical aggregation functions.
-   * @see StatisticalQueriesTest for an example use case.
+   * @see CovarianceQueriesTest for an example use case.
    */
   private BrokerResponseNative getBrokerResponseDistinctInstances(PinotQuery pinotQuery, PlanMaker planMaker) {
     PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/CovarianceQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/CovarianceQueriesTest.java
new file mode 100644
index 0000000000..0ab86180e8
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/CovarianceQueriesTest.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.math3.stat.correlation.Covariance;
+import org.apache.commons.math3.util.Precision;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import org.apache.pinot.core.operator.query.GroupByOperator;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.segment.local.customobject.CovarianceTuple;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Queries test for covariance queries.
+ */
+public class CovarianceQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "CovarianceQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  // test segments 1-4 evenly divide testSegment into 4 distinct segments
+  private static final String SEGMENT_NAME_1 = "testSegment1";
+  private static final String SEGMENT_NAME_2 = "testSegment2";
+  private static final String SEGMENT_NAME_3 = "testSegment3";
+  private static final String SEGMENT_NAME_4 = "testSegment4";
+
+  private static final int NUM_RECORDS = 2000;
+  private static final int NUM_GROUPS = 10;
+  private static final int MAX_VALUE = 500;
+  private static final double RELATIVE_EPSILON = 0.0001;
+  private static final double DELTA = 0.0001;
+
+  private static final String INT_COLUMN_X = "intColumnX";
+  private static final String INT_COLUMN_Y = "intColumnY";
+  private static final String DOUBLE_COLUMN_X = "doubleColumnX";
+  private static final String DOUBLE_COLUMN_Y = "doubleColumnY";
+  private static final String LONG_COLUMN = "longColumn";
+  private static final String FLOAT_COLUMN = "floatColumn";
+  private static final String GROUP_BY_COLUMN = "groupByColumn";
+
+  private static final Schema SCHEMA =
+      new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN_X, FieldSpec.DataType.INT)
+          .addSingleValueDimension(INT_COLUMN_Y, FieldSpec.DataType.INT)
+          .addSingleValueDimension(DOUBLE_COLUMN_X, FieldSpec.DataType.DOUBLE)
+          .addSingleValueDimension(DOUBLE_COLUMN_Y, FieldSpec.DataType.DOUBLE)
+          .addSingleValueDimension(LONG_COLUMN, FieldSpec.DataType.LONG)
+          .addSingleValueDimension(FLOAT_COLUMN, FieldSpec.DataType.FLOAT)
+          .addSingleValueDimension(GROUP_BY_COLUMN, FieldSpec.DataType.DOUBLE).build();
+  private static final TableConfig TABLE_CONFIG =
+      new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+  private List<List<IndexSegment>> _distinctInstances;
+  private int _sumIntX = 0;
+  private int _sumIntY = 0;
+  private int _sumIntXY = 0;
+
+  private double _sumDoubleX = 0;
+  private double _sumDoubleY = 0;
+  private double _sumDoubleXY = 0;
+
+  private long _sumLong = 0L;
+  private double _sumFloat = 0;
+
+  private double _sumIntDouble = 0;
+  private long _sumIntLong = 0L;
+  private double _sumIntFloat = 0;
+  private double _sumDoubleLong = 0;
+  private double _sumDoubleFloat = 0;
+  private double _sumLongFloat = 0;
+
+  private double _expectedCovIntXY;
+  private double _expectedCovDoubleXY;
+  private double _expectedCovIntDouble;
+  private double _expectedCovIntLong;
+  private double _expectedCovIntFloat;
+  private double _expectedCovDoubleLong;
+  private double _expectedCovDoubleFloat;
+  private double _expectedCovLongFloat;
+
+  private double _expectedCovWithFilter;
+
+  private final CovarianceTuple[] _expectedGroupByResultVer1 = new CovarianceTuple[NUM_GROUPS];
+  private final CovarianceTuple[] _expectedGroupByResultVer2 = new CovarianceTuple[NUM_GROUPS];
+  private final double[] _expectedFinalResultVer1 = new double[NUM_GROUPS];
+  private final double[] _expectedFinalResultVer2 = new double[NUM_GROUPS];
+
+  private boolean _useIdenticalSegment = false;
+
+  @Override
+  protected String getFilter() {
+    // filter out half of the rows based on group id
+    return " WHERE groupByColumn < " + (NUM_GROUPS / 2);
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @Override
+  protected List<List<IndexSegment>> getDistinctInstances() {
+    if (_useIdenticalSegment) {
+      return Collections.singletonList(_indexSegments);
+    }
+    return _distinctInstances;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+
+    Random rand = new Random();
+    int[] intColX = rand.ints(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
+    int[] intColY = rand.ints(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
+    double[] doubleColX = rand.doubles(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
+    double[] doubleColY = rand.doubles(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
+    long[] longCol = rand.longs(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
+    double[] floatCol = new double[NUM_RECORDS];
+    double[] groupByCol = new double[NUM_RECORDS];
+
+    int groupSize = NUM_RECORDS / NUM_GROUPS;
+    double sumX = 0;
+    double sumY = 0;
+    double sumGroupBy = 0;
+    double sumXY = 0;
+    double sumXGroupBy = 0;
+    int groupByVal = 0;
+
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      GenericRow record = new GenericRow();
+      int intX = intColX[i];
+      int intY = intColY[i];
+      double doubleX = doubleColX[i];
+      double doubleY = doubleColY[i];
+      long longVal = longCol[i];
+      float floatVal = -MAX_VALUE + rand.nextFloat() * 2 * MAX_VALUE;
+
+      // set up inner segment group by results
+      groupByVal = (int) Math.floor(i / groupSize);
+      if (i % groupSize == 0 && groupByVal > 0) {
+        _expectedGroupByResultVer1[groupByVal - 1] = new CovarianceTuple(sumX, sumGroupBy, sumXGroupBy, groupSize);
+        _expectedGroupByResultVer2[groupByVal - 1] = new CovarianceTuple(sumX, sumY, sumXY, groupSize);
+        sumX = 0;
+        sumY = 0;
+        sumGroupBy = 0;
+        sumXY = 0;
+        sumXGroupBy = 0;
+      }
+
+      sumX += doubleX;
+      sumY += doubleY;
+      sumGroupBy += groupByVal;
+      sumXY += doubleX * doubleY;
+      sumXGroupBy += doubleX * groupByVal;
+
+      floatCol[i] = floatVal;
+      groupByCol[i] = groupByVal;
+
+      // calculate inner segment results
+      _sumIntX += intX;
+      _sumIntY += intY;
+      _sumDoubleX += doubleX;
+      _sumDoubleY += doubleY;
+      _sumLong += longVal;
+      _sumFloat += floatVal;
+      _sumIntXY += intX * intY;
+      _sumDoubleXY += doubleX * doubleY;
+      _sumIntDouble += intX * doubleX;
+      _sumIntLong += intX * longVal;
+      _sumIntFloat += intX * floatCol[i];
+      _sumDoubleLong += doubleX * longVal;
+      _sumDoubleFloat += doubleX * floatCol[i];
+      _sumLongFloat += longVal * floatCol[i];
+
+      record.putValue(INT_COLUMN_X, intX);
+      record.putValue(INT_COLUMN_Y, intY);
+      record.putValue(DOUBLE_COLUMN_X, doubleX);
+      record.putValue(DOUBLE_COLUMN_Y, doubleY);
+      record.putValue(LONG_COLUMN, longVal);
+      record.putValue(FLOAT_COLUMN, floatVal);
+      record.putValue(GROUP_BY_COLUMN, groupByVal);
+      records.add(record);
+    }
+    _expectedGroupByResultVer1[groupByVal] = new CovarianceTuple(sumX, sumGroupBy, sumXGroupBy, groupSize);
+    _expectedGroupByResultVer2[groupByVal] = new CovarianceTuple(sumX, sumY, sumXY, groupSize);
+
+    // calculate inter segment result
+    Covariance cov = new Covariance();
+    double[] newIntColX = Arrays.stream(intColX).asDoubleStream().toArray();
+    double[] newIntColY = Arrays.stream(intColY).asDoubleStream().toArray();
+    double[] newLongCol = Arrays.stream(longCol).asDoubleStream().toArray();
+    _expectedCovIntXY = cov.covariance(newIntColX, newIntColY, false);
+    _expectedCovDoubleXY = cov.covariance(doubleColX, doubleColY, false);
+    _expectedCovIntDouble = cov.covariance(newIntColX, doubleColX, false);
+    _expectedCovIntLong = cov.covariance(newIntColX, newLongCol, false);
+    _expectedCovIntFloat = cov.covariance(newIntColX, floatCol, false);
+    _expectedCovDoubleLong = cov.covariance(doubleColX, newLongCol, false);
+    _expectedCovDoubleFloat = cov.covariance(doubleColX, floatCol, false);
+    _expectedCovLongFloat = cov.covariance(newLongCol, floatCol, false);
+
+    double[] filteredX = Arrays.copyOfRange(doubleColX, 0, NUM_RECORDS / 2);
+    double[] filteredY = Arrays.copyOfRange(doubleColY, 0, NUM_RECORDS / 2);
+    _expectedCovWithFilter = cov.covariance(filteredX, filteredY, false);
+
+    // calculate inter segment group by results
+    for (int i = 0; i < NUM_GROUPS; i++) {
+      double[] colX = Arrays.copyOfRange(doubleColX, i * groupSize, (i + 1) * groupSize);
+      double[] colGroupBy = Arrays.copyOfRange(groupByCol, i * groupSize, (i + 1) * groupSize);
+      double[] colY = Arrays.copyOfRange(doubleColY, i * groupSize, (i + 1) * groupSize);
+      _expectedFinalResultVer1[i] = cov.covariance(colX, colGroupBy, false);
+      _expectedFinalResultVer2[i] = cov.covariance(colX, colY, false);
+    }
+
+    // generate testSegment
+    ImmutableSegment immutableSegment = setUpSingleSegment(records, SEGMENT_NAME);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+
+    // divide testSegment into 4 distinct segments for distinct inter segment tests
+    // by doing so, we can avoid calculating global covariance again
+    _distinctInstances = new ArrayList<>();
+    int segmentSize = NUM_RECORDS / 4;
+    ImmutableSegment immutableSegment1 = setUpSingleSegment(records.subList(0, segmentSize), SEGMENT_NAME_1);
+    ImmutableSegment immutableSegment2 =
+        setUpSingleSegment(records.subList(segmentSize, segmentSize * 2), SEGMENT_NAME_2);
+    ImmutableSegment immutableSegment3 =
+        setUpSingleSegment(records.subList(segmentSize * 2, segmentSize * 3), SEGMENT_NAME_3);
+    ImmutableSegment immutableSegment4 =
+        setUpSingleSegment(records.subList(segmentSize * 3, NUM_RECORDS), SEGMENT_NAME_4);
+    // generate 2 instances each with 2 distinct segments
+    _distinctInstances.add(Arrays.asList(immutableSegment1, immutableSegment2));
+    _distinctInstances.add(Arrays.asList(immutableSegment3, immutableSegment4));
+  }
+
+  private ImmutableSegment setUpSingleSegment(List<GenericRow> recordSet, String segmentName)
+      throws Exception {
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(segmentName);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(recordSet));
+    driver.build();
+
+    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap);
+  }
+
+  @Test
+  public void testAggregationOnly() {
+    // Inner Segment
+    String query =
+        "SELECT COVAR_POP(intColumnX, intColumnY), COVAR_POP(doubleColumnX, doubleColumnY), COVAR_POP(intColumnX, "
+            + "doubleColumnX), " + "COVAR_POP(intColumnX, longColumn), COVAR_POP(intColumnX, floatColumn), "
+            + "COVAR_POP(doubleColumnX, longColumn), COVAR_POP(doubleColumnX, floatColumn), COVAR_POP(longColumn, "
+            + "floatColumn)  FROM testTable";
+    AggregationOperator aggregationOperator = getOperator(query);
+    AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+    QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0,
+        NUM_RECORDS * 6, NUM_RECORDS);
+    List<Object> aggregationResult = resultsBlock.getResults();
+    assertNotNull(aggregationResult);
+    checkWithPrecision((CovarianceTuple) aggregationResult.get(0), _sumIntX, _sumIntY, _sumIntXY, NUM_RECORDS);
+    checkWithPrecision((CovarianceTuple) aggregationResult.get(1), _sumDoubleX, _sumDoubleY, _sumDoubleXY, NUM_RECORDS);
+    checkWithPrecision((CovarianceTuple) aggregationResult.get(2), _sumIntX, _sumDoubleX, _sumIntDouble, NUM_RECORDS);
+    checkWithPrecision((CovarianceTuple) aggregationResult.get(3), _sumIntX, _sumLong, _sumIntLong, NUM_RECORDS);
+    checkWithPrecision((CovarianceTuple) aggregationResult.get(4), _sumIntX, _sumFloat, _sumIntFloat, NUM_RECORDS);
+    checkWithPrecision((CovarianceTuple) aggregationResult.get(5), _sumDoubleX, _sumLong, _sumDoubleLong, NUM_RECORDS);
+    checkWithPrecision((CovarianceTuple) aggregationResult.get(6), _sumDoubleX, _sumFloat, _sumDoubleFloat,
+        NUM_RECORDS);
+    checkWithPrecision((CovarianceTuple) aggregationResult.get(7), _sumLong, _sumFloat, _sumLongFloat, NUM_RECORDS);
+
+    // Inter segments with 4 identical segments (2 instances each having 2 identical segments)
+    _useIdenticalSegment = true;
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    _useIdenticalSegment = false;
+    assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+    assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 6 * NUM_RECORDS);
+    assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+    checkResultTableWithPrecision(brokerResponse);
+
+    // Inter segments with 4 distinct segments (2 instances each having 2 distinct segments)
+    brokerResponse = getBrokerResponse(query);
+    assertEquals(brokerResponse.getNumDocsScanned(), NUM_RECORDS);
+    assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 6 * NUM_RECORDS);
+    assertEquals(brokerResponse.getTotalDocs(), NUM_RECORDS);
+    checkResultTableWithPrecision(brokerResponse);
+
+    // Inter segments with 4 identical segments with filter
+    _useIdenticalSegment = true;
+    query = "SELECT COVAR_POP(doubleColumnX, doubleColumnY) FROM testTable" + getFilter();
+    brokerResponse = getBrokerResponse(query);
+    _useIdenticalSegment = false;
+    assertEquals(brokerResponse.getNumDocsScanned(), 2 * NUM_RECORDS);
+    assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * NUM_RECORDS);
+    assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+    Object[] results = brokerResponse.getResultTable().getRows().get(0);
+    assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], _expectedCovWithFilter, RELATIVE_EPSILON));
+  }
+
+  @Test
+  public void testAggregationGroupBy() {
+
+    // Inner Segment
+    // case 1: (col1, groupByCol) group by groupByCol => all covariances are 0's
+    String query =
+        "SELECT COVAR_POP(doubleColumnX, groupByColumn) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn";
+    GroupByOperator groupByOperator = getOperator(query);
+    GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
+    QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0,
+        NUM_RECORDS * 2, NUM_RECORDS);
+    AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
+    assertNotNull(aggregationGroupByResult);
+    for (int i = 0; i < NUM_GROUPS; i++) {
+      CovarianceTuple actualCovTuple = (CovarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i);
+      CovarianceTuple expectedCovTuple = _expectedGroupByResultVer1[i];
+      checkWithPrecision(actualCovTuple, expectedCovTuple);
+    }
+
+    // Inter Segment with 4 identical segments
+    _useIdenticalSegment = true;
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    checkGroupByResults(brokerResponse, _expectedFinalResultVer1);
+    _useIdenticalSegment = false;
+    // Inter Segment with 4 distinct segments
+    brokerResponse = getBrokerResponse(query);
+    checkGroupByResults(brokerResponse, _expectedFinalResultVer1);
+
+    // Inner Segment
+    // case 2: COVAR_POP(col1, col2) group by groupByCol => nondeterministic cov
+    query =
+        "SELECT COVAR_POP(doubleColumnX, doubleColumnY) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn";
+    groupByOperator = getOperator(query);
+    resultsBlock = groupByOperator.nextBlock();
+    QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0,
+        NUM_RECORDS * 3, NUM_RECORDS);
+    aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
+    assertNotNull(aggregationGroupByResult);
+
+    for (int i = 0; i < NUM_GROUPS; i++) {
+      CovarianceTuple actualCovTuple = (CovarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i);
+      CovarianceTuple expectedCovTuple = _expectedGroupByResultVer2[i];
+      checkWithPrecision(actualCovTuple, expectedCovTuple);
+    }
+
+    // Inter Segment with 4 identical segments
+    _useIdenticalSegment = true;
+    brokerResponse = getBrokerResponse(query);
+    checkGroupByResults(brokerResponse, _expectedFinalResultVer2);
+    _useIdenticalSegment = false;
+    // Inter Segment with 4 distinct segments
+    brokerResponse = getBrokerResponse(query);
+    checkGroupByResults(brokerResponse, _expectedFinalResultVer2);
+  }
+
+  private void checkWithPrecision(CovarianceTuple tuple, double sumX, double sumY, double sumXY, int count) {
+    assertEquals(tuple.getCount(), count);
+    assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumX(), sumX, RELATIVE_EPSILON));
+    assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumY(), sumY, RELATIVE_EPSILON));
+    assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumXY(), sumXY, RELATIVE_EPSILON));
+  }
+
+  private void checkWithPrecision(CovarianceTuple actual, CovarianceTuple expected) {
+    checkWithPrecision(actual, expected.getSumX(), expected.getSumY(), expected.getSumXY(), (int) expected.getCount());
+  }
+
+  private void checkResultTableWithPrecision(BrokerResponseNative brokerResponse) {
+    Object[] results = brokerResponse.getResultTable().getRows().get(0);
+    assertEquals(results.length, 8);
+    assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], _expectedCovIntXY, RELATIVE_EPSILON));
+    assertTrue(Precision.equalsWithRelativeTolerance((double) results[1], _expectedCovDoubleXY, RELATIVE_EPSILON));
+    assertTrue(Precision.equalsWithRelativeTolerance((double) results[2], _expectedCovIntDouble, RELATIVE_EPSILON));
+    assertTrue(Precision.equalsWithRelativeTolerance((double) results[3], _expectedCovIntLong, RELATIVE_EPSILON));
+    assertTrue(Precision.equalsWithRelativeTolerance((double) results[4], _expectedCovIntFloat, RELATIVE_EPSILON));
+    assertTrue(Precision.equalsWithRelativeTolerance((double) results[5], _expectedCovDoubleLong, RELATIVE_EPSILON));
+    assertTrue(Precision.equalsWithRelativeTolerance((double) results[6], _expectedCovDoubleFloat, RELATIVE_EPSILON));
+    assertTrue(Precision.equalsWithRelativeTolerance((double) results[7], _expectedCovLongFloat, RELATIVE_EPSILON));
+  }
+
+  private void checkGroupByResults(BrokerResponseNative brokerResponse, double[] expectedResults) {
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+    for (int i = 0; i < NUM_GROUPS; i++) {
+      assertTrue(Precision.equals((double) rows.get(i)[0], expectedResults[i], DELTA));
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    _indexSegment.destroy();
+    for (List<IndexSegment> indexList : _distinctInstances) {
+      for (IndexSegment seg : indexList) {
+        seg.destroy();
+      }
+    }
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java
deleted file mode 100644
index d61dcaa18f..0000000000
--- a/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java
+++ /dev/null
@@ -1,749 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.queries;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.math3.stat.correlation.Covariance;
-import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
-import org.apache.commons.math3.stat.descriptive.moment.Variance;
-import org.apache.commons.math3.util.Precision;
-import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.response.broker.ResultTable;
-import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
-import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
-import org.apache.pinot.core.operator.query.AggregationOperator;
-import org.apache.pinot.core.operator.query.GroupByOperator;
-import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
-import org.apache.pinot.segment.local.customobject.CovarianceTuple;
-import org.apache.pinot.segment.local.customobject.VarianceTuple;
-import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
-import org.apache.pinot.segment.spi.ImmutableSegment;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-
-/**
- * Queries test for statistical queries (i.e Variance, Covariance, Standard Deviation etc)
- */
-public class StatisticalQueriesTest extends BaseQueriesTest {
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "CovarianceQueriesTest");
-  private static final String RAW_TABLE_NAME = "testTable";
-  private static final String SEGMENT_NAME = "testSegment";
-
-  // test segments 1-4 evenly divide testSegment into 4 distinct segments
-  private static final String SEGMENT_NAME_1 = "testSegment1";
-  private static final String SEGMENT_NAME_2 = "testSegment2";
-  private static final String SEGMENT_NAME_3 = "testSegment3";
-  private static final String SEGMENT_NAME_4 = "testSegment4";
-
-  private static final int NUM_RECORDS = 2000;
-  private static final int NUM_GROUPS = 10;
-  private static final int MAX_VALUE = 500;
-  private static final double RELATIVE_EPSILON = 0.0001;
-  private static final double DELTA = 0.0001;
-
-  private static final String INT_COLUMN_X = "intColumnX";
-  private static final String INT_COLUMN_Y = "intColumnY";
-  private static final String DOUBLE_COLUMN_X = "doubleColumnX";
-  private static final String DOUBLE_COLUMN_Y = "doubleColumnY";
-  private static final String LONG_COLUMN = "longColumn";
-  private static final String FLOAT_COLUMN = "floatColumn";
-  private static final String GROUP_BY_COLUMN = "groupByColumn";
-
-  private static final Schema SCHEMA =
-      new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN_X, FieldSpec.DataType.INT)
-          .addSingleValueDimension(INT_COLUMN_Y, FieldSpec.DataType.INT)
-          .addSingleValueDimension(DOUBLE_COLUMN_X, FieldSpec.DataType.DOUBLE)
-          .addSingleValueDimension(DOUBLE_COLUMN_Y, FieldSpec.DataType.DOUBLE)
-          .addSingleValueDimension(LONG_COLUMN, FieldSpec.DataType.LONG)
-          .addSingleValueDimension(FLOAT_COLUMN, FieldSpec.DataType.FLOAT)
-          .addSingleValueDimension(GROUP_BY_COLUMN, FieldSpec.DataType.DOUBLE).build();
-  private static final TableConfig TABLE_CONFIG =
-      new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
-
-  private IndexSegment _indexSegment;
-  private List<IndexSegment> _indexSegments;
-  private List<List<IndexSegment>> _distinctInstances;
-  private int _sumIntX = 0;
-  private int _sumIntY = 0;
-  private int _sumIntXY = 0;
-
-  private double _sumDoubleX = 0;
-  private double _sumDoubleY = 0;
-  private double _sumDoubleXY = 0;
-
-  private long _sumLong = 0L;
-  private double _sumFloat = 0;
-
-  private double _sumIntDouble = 0;
-  private long _sumIntLong = 0L;
-  private double _sumIntFloat = 0;
-  private double _sumDoubleLong = 0;
-  private double _sumDoubleFloat = 0;
-  private double _sumLongFloat = 0;
-
-  private double _expectedCovIntXY;
-  private double _expectedCovDoubleXY;
-  private double _expectedCovIntDouble;
-  private double _expectedCovIntLong;
-  private double _expectedCovIntFloat;
-  private double _expectedCovDoubleLong;
-  private double _expectedCovDoubleFloat;
-  private double _expectedCovLongFloat;
-
-  private double _expectedCovWithFilter;
-
-  private final CovarianceTuple[] _expectedGroupByResultVer1 = new CovarianceTuple[NUM_GROUPS];
-  private final CovarianceTuple[] _expectedGroupByResultVer2 = new CovarianceTuple[NUM_GROUPS];
-  private final double[] _expectedFinalResultVer1 = new double[NUM_GROUPS];
-  private final double[] _expectedFinalResultVer2 = new double[NUM_GROUPS];
-
-  private boolean _useIdenticalSegment = false;
-
-  int[] _intColX = new int[NUM_RECORDS];
-  int[] _intColY = new int[NUM_RECORDS];
-  long[] _longCol = new long[NUM_RECORDS];
-  double[] _floatCol = new double[NUM_RECORDS];
-  double[] _doubleColX = new double[NUM_RECORDS];
-  double[] _doubleColY = new double[NUM_RECORDS];
-  double[] _groupByCol = new double[NUM_RECORDS];
-
-  @Override
-  protected String getFilter() {
-    // filter out half of the rows based on group id
-    return " WHERE groupByColumn < " + (NUM_GROUPS / 2);
-  }
-
-  @Override
-  protected IndexSegment getIndexSegment() {
-    return _indexSegment;
-  }
-
-  @Override
-  protected List<IndexSegment> getIndexSegments() {
-    return _indexSegments;
-  }
-
-  @Override
-  protected List<List<IndexSegment>> getDistinctInstances() {
-    if (_useIdenticalSegment) {
-      return Collections.singletonList(_indexSegments);
-    }
-    return _distinctInstances;
-  }
-
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    FileUtils.deleteDirectory(INDEX_DIR);
-
-    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
-
-    Random rand = new Random();
-    _intColX = rand.ints(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
-    _intColY = rand.ints(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
-    _doubleColX = rand.doubles(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
-    _doubleColY = rand.doubles(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
-    _longCol = rand.longs(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
-
-    int groupSize = NUM_RECORDS / NUM_GROUPS;
-    double sumX = 0;
-    double sumY = 0;
-    double sumGroupBy = 0;
-    double sumXY = 0;
-    double sumXGroupBy = 0;
-    int groupByVal = 0;
-
-    for (int i = 0; i < NUM_RECORDS; i++) {
-      GenericRow record = new GenericRow();
-      int intX = _intColX[i];
-      int intY = _intColY[i];
-      double doubleX = _doubleColX[i];
-      double doubleY = _doubleColY[i];
-      long longVal = _longCol[i];
-      float floatVal = -MAX_VALUE + rand.nextFloat() * 2 * MAX_VALUE;
-
-      // set up inner segment group by results
-      groupByVal = (int) Math.floor(i / groupSize);
-      if (i % groupSize == 0 && groupByVal > 0) {
-        _expectedGroupByResultVer1[groupByVal - 1] = new CovarianceTuple(sumX, sumGroupBy, sumXGroupBy, groupSize);
-        _expectedGroupByResultVer2[groupByVal - 1] = new CovarianceTuple(sumX, sumY, sumXY, groupSize);
-        sumX = 0;
-        sumY = 0;
-        sumGroupBy = 0;
-        sumXY = 0;
-        sumXGroupBy = 0;
-      }
-
-      sumX += doubleX;
-      sumY += doubleY;
-      sumGroupBy += groupByVal;
-      sumXY += doubleX * doubleY;
-      sumXGroupBy += doubleX * groupByVal;
-
-      _floatCol[i] = floatVal;
-      _groupByCol[i] = groupByVal;
-
-      // calculate inner segment results
-      _sumIntX += intX;
-      _sumIntY += intY;
-      _sumDoubleX += doubleX;
-      _sumDoubleY += doubleY;
-      _sumLong += longVal;
-      _sumFloat += floatVal;
-      _sumIntXY += intX * intY;
-      _sumDoubleXY += doubleX * doubleY;
-      _sumIntDouble += intX * doubleX;
-      _sumIntLong += intX * longVal;
-      _sumIntFloat += intX * _floatCol[i];
-      _sumDoubleLong += doubleX * longVal;
-      _sumDoubleFloat += doubleX * _floatCol[i];
-      _sumLongFloat += longVal * _floatCol[i];
-
-      record.putValue(INT_COLUMN_X, intX);
-      record.putValue(INT_COLUMN_Y, intY);
-      record.putValue(DOUBLE_COLUMN_X, doubleX);
-      record.putValue(DOUBLE_COLUMN_Y, doubleY);
-      record.putValue(LONG_COLUMN, longVal);
-      record.putValue(FLOAT_COLUMN, floatVal);
-      record.putValue(GROUP_BY_COLUMN, groupByVal);
-      records.add(record);
-    }
-    _expectedGroupByResultVer1[groupByVal] = new CovarianceTuple(sumX, sumGroupBy, sumXGroupBy, groupSize);
-    _expectedGroupByResultVer2[groupByVal] = new CovarianceTuple(sumX, sumY, sumXY, groupSize);
-
-    // calculate inter segment result
-    Covariance cov = new Covariance();
-    double[] newIntColX = Arrays.stream(_intColX).asDoubleStream().toArray();
-    double[] newIntColY = Arrays.stream(_intColY).asDoubleStream().toArray();
-    double[] newLongCol = Arrays.stream(_longCol).asDoubleStream().toArray();
-    _expectedCovIntXY = cov.covariance(newIntColX, newIntColY, false);
-    _expectedCovDoubleXY = cov.covariance(_doubleColX, _doubleColY, false);
-    _expectedCovIntDouble = cov.covariance(newIntColX, _doubleColX, false);
-    _expectedCovIntLong = cov.covariance(newIntColX, newLongCol, false);
-    _expectedCovIntFloat = cov.covariance(newIntColX, _floatCol, false);
-    _expectedCovDoubleLong = cov.covariance(_doubleColX, newLongCol, false);
-    _expectedCovDoubleFloat = cov.covariance(_doubleColX, _floatCol, false);
-    _expectedCovLongFloat = cov.covariance(newLongCol, _floatCol, false);
-
-    double[] filteredX = Arrays.copyOfRange(_doubleColX, 0, NUM_RECORDS / 2);
-    double[] filteredY = Arrays.copyOfRange(_doubleColY, 0, NUM_RECORDS / 2);
-    _expectedCovWithFilter = cov.covariance(filteredX, filteredY, false);
-
-    // calculate inter segment group by results
-    for (int i = 0; i < NUM_GROUPS; i++) {
-      double[] colX = Arrays.copyOfRange(_doubleColX, i * groupSize, (i + 1) * groupSize);
-      double[] colGroupBy = Arrays.copyOfRange(_groupByCol, i * groupSize, (i + 1) * groupSize);
-      double[] colY = Arrays.copyOfRange(_doubleColY, i * groupSize, (i + 1) * groupSize);
-      _expectedFinalResultVer1[i] = cov.covariance(colX, colGroupBy, false);
-      _expectedFinalResultVer2[i] = cov.covariance(colX, colY, false);
-    }
-
-    // generate testSegment
-    ImmutableSegment immutableSegment = setUpSingleSegment(records, SEGMENT_NAME);
-    _indexSegment = immutableSegment;
-    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
-
-    // divide testSegment into 4 distinct segments for distinct inter segment tests
-    // by doing so, we can avoid calculating global covariance again
-    _distinctInstances = new ArrayList<>();
-    int segmentSize = NUM_RECORDS / 4;
-    ImmutableSegment immutableSegment1 = setUpSingleSegment(records.subList(0, segmentSize), SEGMENT_NAME_1);
-    ImmutableSegment immutableSegment2 =
-        setUpSingleSegment(records.subList(segmentSize, segmentSize * 2), SEGMENT_NAME_2);
-    ImmutableSegment immutableSegment3 =
-        setUpSingleSegment(records.subList(segmentSize * 2, segmentSize * 3), SEGMENT_NAME_3);
-    ImmutableSegment immutableSegment4 =
-        setUpSingleSegment(records.subList(segmentSize * 3, NUM_RECORDS), SEGMENT_NAME_4);
-    // generate 2 instances each with 2 distinct segments
-    _distinctInstances.add(Arrays.asList(immutableSegment1, immutableSegment2));
-    _distinctInstances.add(Arrays.asList(immutableSegment3, immutableSegment4));
-  }
-
-  private ImmutableSegment setUpSingleSegment(List<GenericRow> recordSet, String segmentName)
-      throws Exception {
-    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
-    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
-    segmentGeneratorConfig.setSegmentName(segmentName);
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
-
-    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(recordSet));
-    driver.build();
-
-    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap);
-  }
-
-  @Test
-  public void testCovarianceAggregationOnly() {
-    // Inner Segment
-    String query =
-        "SELECT COVAR_POP(intColumnX, intColumnY), COVAR_POP(doubleColumnX, doubleColumnY), COVAR_POP(intColumnX, "
-            + "doubleColumnX), " + "COVAR_POP(intColumnX, longColumn), COVAR_POP(intColumnX, floatColumn), "
-            + "COVAR_POP(doubleColumnX, longColumn), COVAR_POP(doubleColumnX, floatColumn), COVAR_POP(longColumn, "
-            + "floatColumn)  FROM testTable";
-    AggregationOperator aggregationOperator = getOperator(query);
-    AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
-    QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0,
-        NUM_RECORDS * 6, NUM_RECORDS);
-    List<Object> aggregationResult = resultsBlock.getResults();
-    assertNotNull(aggregationResult);
-    checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(0), _sumIntX, _sumIntY, _sumIntXY,
-        NUM_RECORDS);
-    checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(1), _sumDoubleX, _sumDoubleY, _sumDoubleXY,
-        NUM_RECORDS);
-    checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(2), _sumIntX, _sumDoubleX, _sumIntDouble,
-        NUM_RECORDS);
-    checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(3), _sumIntX, _sumLong, _sumIntLong,
-        NUM_RECORDS);
-    checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(4), _sumIntX, _sumFloat, _sumIntFloat,
-        NUM_RECORDS);
-    checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(5), _sumDoubleX, _sumLong, _sumDoubleLong,
-        NUM_RECORDS);
-    checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(6), _sumDoubleX, _sumFloat, _sumDoubleFloat,
-        NUM_RECORDS);
-    checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(7), _sumLong, _sumFloat, _sumLongFloat,
-        NUM_RECORDS);
-
-    // Inter segments with 4 identical segments (2 instances each having 2 identical segments)
-    _useIdenticalSegment = true;
-    BrokerResponseNative brokerResponse = getBrokerResponse(query);
-    _useIdenticalSegment = false;
-    assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
-    assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
-    assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 6 * NUM_RECORDS);
-    assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
-    checkResultTableWithPrecisionForCovariance(brokerResponse);
-
-    // Inter segments with 4 distinct segments (2 instances each having 2 distinct segments)
-    brokerResponse = getBrokerResponse(query);
-    assertEquals(brokerResponse.getNumDocsScanned(), NUM_RECORDS);
-    assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
-    assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 6 * NUM_RECORDS);
-    assertEquals(brokerResponse.getTotalDocs(), NUM_RECORDS);
-    checkResultTableWithPrecisionForCovariance(brokerResponse);
-
-    // Inter segments with 4 identical segments with filter
-    _useIdenticalSegment = true;
-    query = "SELECT COVAR_POP(doubleColumnX, doubleColumnY) FROM testTable" + getFilter();
-    brokerResponse = getBrokerResponse(query);
-    _useIdenticalSegment = false;
-    assertEquals(brokerResponse.getNumDocsScanned(), 2 * NUM_RECORDS);
-    assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
-    assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * NUM_RECORDS);
-    assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
-    Object[] results = brokerResponse.getResultTable().getRows().get(0);
-    assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], _expectedCovWithFilter, RELATIVE_EPSILON));
-  }
-
-  @Test
-  public void testCovarianceAggregationGroupBy() {
-    // Inner Segment
-    // case 1: (col1, groupByCol) group by groupByCol => all covariances are 0's
-    String query =
-        "SELECT COVAR_POP(doubleColumnX, groupByColumn) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn";
-    GroupByOperator groupByOperator = getOperator(query);
-    GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
-    QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0,
-        NUM_RECORDS * 2, NUM_RECORDS);
-    AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
-    assertNotNull(aggregationGroupByResult);
-    for (int i = 0; i < NUM_GROUPS; i++) {
-      CovarianceTuple actualCovTuple = (CovarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i);
-      CovarianceTuple expectedCovTuple = _expectedGroupByResultVer1[i];
-      checkWithPrecisionForCovariance(actualCovTuple, expectedCovTuple);
-    }
-
-    // Inter Segment with 4 identical segments
-    _useIdenticalSegment = true;
-    BrokerResponseNative brokerResponse = getBrokerResponse(query);
-    checkGroupByResultsForCovariance(brokerResponse, _expectedFinalResultVer1);
-    _useIdenticalSegment = false;
-    // Inter Segment with 4 distinct segments
-    brokerResponse = getBrokerResponse(query);
-    checkGroupByResultsForCovariance(brokerResponse, _expectedFinalResultVer1);
-
-    // Inner Segment
-    // case 2: COVAR_POP(col1, col2) group by groupByCol => nondeterministic cov
-    query =
-        "SELECT COVAR_POP(doubleColumnX, doubleColumnY) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn";
-    groupByOperator = getOperator(query);
-    resultsBlock = groupByOperator.nextBlock();
-    QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0,
-        NUM_RECORDS * 3, NUM_RECORDS);
-    aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
-    assertNotNull(aggregationGroupByResult);
-
-    for (int i = 0; i < NUM_GROUPS; i++) {
-      CovarianceTuple actualCovTuple = (CovarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i);
-      CovarianceTuple expectedCovTuple = _expectedGroupByResultVer2[i];
-      checkWithPrecisionForCovariance(actualCovTuple, expectedCovTuple);
-    }
-
-    // Inter Segment with 4 identical segments
-    _useIdenticalSegment = true;
-    brokerResponse = getBrokerResponse(query);
-    checkGroupByResultsForCovariance(brokerResponse, _expectedFinalResultVer2);
-    _useIdenticalSegment = false;
-    // Inter Segment with 4 distinct segments
-    brokerResponse = getBrokerResponse(query);
-    checkGroupByResultsForCovariance(brokerResponse, _expectedFinalResultVer2);
-  }
-
-  @Test
-  public void testVarianceAggregationOnly() {
-    // Compute the expected values
-    Variance[] expectedVariances = new Variance[8];
-    for (int i = 0; i < 8; i++) {
-      if (i < 4) {
-        expectedVariances[i] = new Variance(false);
-      } else {
-        expectedVariances[i] = new Variance(true);
-      }
-    }
-    for (int i = 0; i < NUM_RECORDS; i++) {
-      expectedVariances[0].increment(_intColX[i]);
-      expectedVariances[1].increment(_longCol[i]);
-      expectedVariances[2].increment(_floatCol[i]);
-      expectedVariances[3].increment(_doubleColX[i]);
-      expectedVariances[4].increment(_intColX[i]);
-      expectedVariances[5].increment(_longCol[i]);
-      expectedVariances[6].increment(_floatCol[i]);
-      expectedVariances[7].increment(_doubleColX[i]);
-    }
-    double expectedIntSum = Arrays.stream(_intColX).asDoubleStream().sum();
-    double expectedLongSum = Arrays.stream(_longCol).asDoubleStream().sum();
-    double expectedFloatSum = 0.0;
-    for (int i = 0; i < _floatCol.length; i++) {
-      expectedFloatSum += _floatCol[i];
-    }
-    double expectedDoubleSum = Arrays.stream(_doubleColX).sum();
-
-    // Compute the query
-    String query = "SELECT VAR_POP(intColumnX), VAR_POP(longColumn), VAR_POP(floatColumn), VAR_POP(doubleColumnX),"
-        + "VAR_SAMP(intColumnX), VAR_SAMP(longColumn), VAR_SAMP(floatColumn), VAR_SAMP(doubleColumnX) FROM testTable";
-    AggregationOperator aggregationOperator = getOperator(query);
-    AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
-    QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0,
-        NUM_RECORDS * 4, NUM_RECORDS);
-    List<Object> aggregationResult = resultsBlock.getResults();
-
-    // Validate the aggregation results
-    checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(0), NUM_RECORDS, expectedIntSum,
-        expectedVariances[0].getResult(), false);
-    checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(1), NUM_RECORDS, expectedLongSum,
-        expectedVariances[1].getResult(), false);
-    checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(2), NUM_RECORDS, expectedFloatSum,
-        expectedVariances[2].getResult(), false);
-    checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(3), NUM_RECORDS, expectedDoubleSum,
-        expectedVariances[3].getResult(), false);
-    checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(4), NUM_RECORDS, expectedIntSum,
-        expectedVariances[4].getResult(), true);
-    checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(5), NUM_RECORDS, expectedLongSum,
-        expectedVariances[5].getResult(), true);
-    checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(6), NUM_RECORDS, expectedFloatSum,
-        expectedVariances[6].getResult(), true);
-    checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(7), NUM_RECORDS, expectedDoubleSum,
-        expectedVariances[7].getResult(), true);
-
-    // Validate the response
-    BrokerResponseNative brokerResponse = getBrokerResponse(query);
-    Object[] results = brokerResponse.getResultTable().getRows().get(0);
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[0], expectedVariances[0].getResult(), RELATIVE_EPSILON));
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[1], expectedVariances[1].getResult(), RELATIVE_EPSILON));
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[2], expectedVariances[2].getResult(), RELATIVE_EPSILON));
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[3], expectedVariances[3].getResult(), RELATIVE_EPSILON));
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[4], expectedVariances[4].getResult(), RELATIVE_EPSILON));
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[5], expectedVariances[5].getResult(), RELATIVE_EPSILON));
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[6], expectedVariances[6].getResult(), RELATIVE_EPSILON));
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[7], expectedVariances[7].getResult(), RELATIVE_EPSILON));
-
-    VarianceTuple test = ((VarianceTuple) aggregationResult.get(0));
-    test.apply((new VarianceTuple(0, 0, 0.0d)));
-    System.out.println(test.getM2());
-    // Validate the response for a query with a filter
-    query = "SELECT VAR_POP(intColumnX) from testTable" + getFilter();
-    brokerResponse = getBrokerResponse(query);
-    brokerResponse.getResultTable();
-    results = brokerResponse.getResultTable().getRows().get(0);
-    Variance filterExpectedVariance = new Variance(false);
-    for (int i = 0; i < NUM_RECORDS / 2; i++) {
-      filterExpectedVariance.increment(_intColX[i]);
-    }
-    assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], filterExpectedVariance.getResult(),
-        RELATIVE_EPSILON));
-  }
-
-  @Test
-  public void testVarianceAggregationGroupBy() {
-    // Compute expected group results
-    Variance[] expectedGroupByResult = new Variance[NUM_GROUPS];
-    double[] expectedSum = new double[NUM_GROUPS];
-
-    for (int i = 0; i < NUM_GROUPS; i++) {
-      expectedGroupByResult[i] = new Variance(false);
-    }
-    for (int j = 0; j < NUM_RECORDS; j++) {
-      int pos = j / (NUM_RECORDS / NUM_GROUPS);
-      expectedGroupByResult[pos].increment(_intColX[j]);
-      expectedSum[pos] += _intColX[j];
-    }
-
-    String query = "SELECT VAR_POP(intColumnX) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn";
-    GroupByOperator groupByOperator = getOperator(query);
-    GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
-    QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0,
-        NUM_RECORDS * 2, NUM_RECORDS);
-    AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
-    assertNotNull(aggregationGroupByResult);
-    for (int i = 0; i < NUM_GROUPS; i++) {
-
-      VarianceTuple actualVarianceTuple = (VarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i);
-      checkWithPrecisionForVariance(actualVarianceTuple, NUM_RECORDS / NUM_GROUPS, expectedSum[i],
-          expectedGroupByResult[i].getResult(), false);
-    }
-  }
-
-  @Test
-  public void testStandardDeviationAggregationOnly() {
-    // Compute the expected values
-    StandardDeviation[] expectedStdDevs = new StandardDeviation[8];
-    for (int i = 0; i < 8; i++) {
-      if (i < 4) {
-        expectedStdDevs[i] = new StandardDeviation(false);
-      } else {
-        expectedStdDevs[i] = new StandardDeviation(true);
-      }
-    }
-    for (int i = 0; i < NUM_RECORDS; i++) {
-      expectedStdDevs[0].increment(_intColX[i]);
-      expectedStdDevs[1].increment(_longCol[i]);
-      expectedStdDevs[2].increment(_floatCol[i]);
-      expectedStdDevs[3].increment(_doubleColX[i]);
-      expectedStdDevs[4].increment(_intColX[i]);
-      expectedStdDevs[5].increment(_longCol[i]);
-      expectedStdDevs[6].increment(_floatCol[i]);
-      expectedStdDevs[7].increment(_doubleColX[i]);
-    }
-
-    double expectedIntSum = Arrays.stream(_intColX).asDoubleStream().sum();
-    double expectedLongSum = Arrays.stream(_longCol).asDoubleStream().sum();
-    double expectedFloatSum = 0.0;
-    for (int i = 0; i < _floatCol.length; i++) {
-      expectedFloatSum += _floatCol[i];
-    }
-    double expectedDoubleSum = Arrays.stream(_doubleColX).sum();
-
-    // Compute the query
-    String query =
-        "SELECT STDDEV_POP(intColumnX), STDDEV_POP(longColumn), STDDEV_POP(floatColumn), STDDEV_POP(doubleColumnX),"
-            + "STDDEV_SAMP(intColumnX), STDDEV_SAMP(longColumn), STDDEV_SAMP(floatColumn), STDDEV_SAMP(doubleColumnX) "
-            + "FROM testTable";
-    AggregationOperator aggregationOperator = getOperator(query);
-    AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
-    QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0,
-        NUM_RECORDS * 4, NUM_RECORDS);
-    List<Object> aggregationResult = resultsBlock.getResults();
-
-    // Validate the aggregation results
-    checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(0), NUM_RECORDS, expectedIntSum,
-        expectedStdDevs[0].getResult(), false);
-    checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(1), NUM_RECORDS, expectedLongSum,
-        expectedStdDevs[1].getResult(), false);
-    checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(2), NUM_RECORDS, expectedFloatSum,
-        expectedStdDevs[2].getResult(), false);
-    checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(3), NUM_RECORDS, expectedDoubleSum,
-        expectedStdDevs[3].getResult(), false);
-    checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(4), NUM_RECORDS, expectedIntSum,
-        expectedStdDevs[4].getResult(), true);
-    checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(5), NUM_RECORDS, expectedLongSum,
-        expectedStdDevs[5].getResult(), true);
-    checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(6), NUM_RECORDS, expectedFloatSum,
-        expectedStdDevs[6].getResult(), true);
-    checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(7), NUM_RECORDS, expectedDoubleSum,
-        expectedStdDevs[7].getResult(), true);
-
-    // Validate the response
-    BrokerResponseNative brokerResponse = getBrokerResponse(query);
-    brokerResponse.getResultTable();
-    Object[] results = brokerResponse.getResultTable().getRows().get(0);
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[0], expectedStdDevs[0].getResult(), RELATIVE_EPSILON));
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[1], expectedStdDevs[1].getResult(), RELATIVE_EPSILON));
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[2], expectedStdDevs[2].getResult(), RELATIVE_EPSILON));
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[3], expectedStdDevs[3].getResult(), RELATIVE_EPSILON));
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[4], expectedStdDevs[4].getResult(), RELATIVE_EPSILON));
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[5], expectedStdDevs[5].getResult(), RELATIVE_EPSILON));
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[6], expectedStdDevs[6].getResult(), RELATIVE_EPSILON));
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[7], expectedStdDevs[7].getResult(), RELATIVE_EPSILON));
-
-    // Validate the response for a query with a filter
-    query = "SELECT STDDEV_POP(intColumnX) from testTable" + getFilter();
-    brokerResponse = getBrokerResponse(query);
-    brokerResponse.getResultTable();
-    results = brokerResponse.getResultTable().getRows().get(0);
-    StandardDeviation filterExpectedStdDev = new StandardDeviation(false);
-    for (int i = 0; i < NUM_RECORDS / 2; i++) {
-      filterExpectedStdDev.increment(_intColX[i]);
-    }
-    assertTrue(
-        Precision.equalsWithRelativeTolerance((double) results[0], filterExpectedStdDev.getResult(), RELATIVE_EPSILON));
-  }
-
-  @Test
-  public void testStandardDeviationAggreagtionGroupBy() {
-    // Compute expected group results
-    StandardDeviation[] expectedGroupByResult = new StandardDeviation[NUM_GROUPS];
-    double[] expectedSum = new double[NUM_GROUPS];
-
-    for (int i = 0; i < NUM_GROUPS; i++) {
-      expectedGroupByResult[i] = new StandardDeviation(false);
-    }
-    for (int j = 0; j < NUM_RECORDS; j++) {
-      int pos = j / (NUM_RECORDS / NUM_GROUPS);
-      expectedGroupByResult[pos].increment(_intColX[j]);
-      expectedSum[pos] += _intColX[j];
-    }
-
-    String query = "SELECT STDDEV_POP(intColumnX) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn";
-    GroupByOperator groupByOperator = getOperator(query);
-    GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
-    QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0,
-        NUM_RECORDS * 2, NUM_RECORDS);
-    AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
-    assertNotNull(aggregationGroupByResult);
-    for (int i = 0; i < NUM_GROUPS; i++) {
-      VarianceTuple actualVarianceTuple = (VarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i);
-      checkWithPrecisionForStandardDeviation(actualVarianceTuple, NUM_RECORDS / NUM_GROUPS, expectedSum[i],
-          expectedGroupByResult[i].getResult(), false);
-    }
-  }
-
-  private void checkWithPrecisionForCovariance(CovarianceTuple tuple, double sumX, double sumY, double sumXY,
-      int count) {
-    assertEquals(tuple.getCount(), count);
-    assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumX(), sumX, RELATIVE_EPSILON));
-    assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumY(), sumY, RELATIVE_EPSILON));
-    assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumXY(), sumXY, RELATIVE_EPSILON));
-  }
-
-  private void checkWithPrecisionForCovariance(CovarianceTuple actual, CovarianceTuple expected) {
-    checkWithPrecisionForCovariance(actual, expected.getSumX(), expected.getSumY(), expected.getSumXY(),
-        (int) expected.getCount());
-  }
-
-  private void checkResultTableWithPrecisionForCovariance(BrokerResponseNative brokerResponse) {
-    Object[] results = brokerResponse.getResultTable().getRows().get(0);
-    assertEquals(results.length, 8);
-    assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], _expectedCovIntXY, RELATIVE_EPSILON));
-    assertTrue(Precision.equalsWithRelativeTolerance((double) results[1], _expectedCovDoubleXY, RELATIVE_EPSILON));
-    assertTrue(Precision.equalsWithRelativeTolerance((double) results[2], _expectedCovIntDouble, RELATIVE_EPSILON));
-    assertTrue(Precision.equalsWithRelativeTolerance((double) results[3], _expectedCovIntLong, RELATIVE_EPSILON));
-    assertTrue(Precision.equalsWithRelativeTolerance((double) results[4], _expectedCovIntFloat, RELATIVE_EPSILON));
-    assertTrue(Precision.equalsWithRelativeTolerance((double) results[5], _expectedCovDoubleLong, RELATIVE_EPSILON));
-    assertTrue(Precision.equalsWithRelativeTolerance((double) results[6], _expectedCovDoubleFloat, RELATIVE_EPSILON));
-    assertTrue(Precision.equalsWithRelativeTolerance((double) results[7], _expectedCovLongFloat, RELATIVE_EPSILON));
-  }
-
-  private void checkGroupByResultsForCovariance(BrokerResponseNative brokerResponse, double[] expectedResults) {
-    ResultTable resultTable = brokerResponse.getResultTable();
-    List<Object[]> rows = resultTable.getRows();
-    for (int i = 0; i < NUM_GROUPS; i++) {
-      assertTrue(Precision.equals((double) rows.get(i)[0], expectedResults[i], DELTA));
-    }
-  }
-
-  private void checkWithPrecisionForVariance(VarianceTuple tuple, int expectedCount, double expectedSum,
-      double expectedVariance, boolean isBiasCorrected) {
-    assertEquals(tuple.getCount(), expectedCount);
-    assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSum(), expectedSum, RELATIVE_EPSILON));
-    if (!isBiasCorrected) {
-      assertTrue(
-          Precision.equalsWithRelativeTolerance(tuple.getM2(), expectedVariance * expectedCount, RELATIVE_EPSILON));
-    } else {
-      assertTrue(Precision.equalsWithRelativeTolerance(tuple.getM2(), expectedVariance * (expectedCount - 1),
-          RELATIVE_EPSILON));
-    }
-  }
-
-  private void checkWithPrecisionForStandardDeviation(VarianceTuple tuple, int expectedCount, double expectedSum,
-      double expectedStdDev, boolean isBiasCorrected) {
-    assertEquals(tuple.getCount(), expectedCount);
-    assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSum(), expectedSum, RELATIVE_EPSILON));
-    if (!isBiasCorrected) {
-      assertTrue(Precision.equalsWithRelativeTolerance(tuple.getM2(), expectedStdDev * expectedStdDev * expectedCount,
-          RELATIVE_EPSILON));
-    } else {
-      assertTrue(
-          Precision.equalsWithRelativeTolerance(tuple.getM2(), expectedStdDev * expectedStdDev * (expectedCount - 1),
-              RELATIVE_EPSILON));
-    }
-  }
-
-  @AfterClass
-  public void tearDown()
-      throws IOException {
-    _indexSegment.destroy();
-    for (List<IndexSegment> indexList : _distinctInstances) {
-      for (IndexSegment seg : indexList) {
-        seg.destroy();
-      }
-    }
-    FileUtils.deleteDirectory(INDEX_DIR);
-  }
-}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/VarianceTuple.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/VarianceTuple.java
deleted file mode 100644
index 09594364b4..0000000000
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/VarianceTuple.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.customobject;
-
-import java.nio.ByteBuffer;
-
-
-public class VarianceTuple implements Comparable<VarianceTuple> {
-  private long _count;
-  private double _sum;
-  private double _m2;
-
-  public VarianceTuple(long count, double sum, double m2) {
-    _count = count;
-    _sum = sum;
-    _m2 = m2;
-  }
-
-  public void apply(long count, double sum, double m2) {
-    if (count == 0) {
-      return;
-    }
-    double delta = (sum / count) - (_sum / _count);
-    _m2 += m2 + delta * delta * count * _count / (count + _count);
-    _count += count;
-    _sum += sum;
-  }
-
-  public void apply(VarianceTuple varianceTuple) {
-    if (varianceTuple._count == 0) {
-      return;
-    }
-    double delta = (varianceTuple._sum / varianceTuple._count) - (_sum / _count);
-    _m2 += varianceTuple._m2 + delta * delta * varianceTuple._count * _count / (varianceTuple._count + _count);
-    _count += varianceTuple._count;
-    _sum += varianceTuple._sum;
-  }
-
-  public long getCount() {
-    return _count;
-  }
-
-  public double getSum() {
-    return _sum;
-  }
-
-  public double getM2() {
-    return _m2;
-  }
-
-  public byte[] toBytes() {
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Double.BYTES * 2 + Long.BYTES);
-    byteBuffer.putLong(_count);
-    byteBuffer.putDouble(_sum);
-    byteBuffer.putDouble(_m2);
-    return byteBuffer.array();
-  }
-
-  public static VarianceTuple fromBytes(byte[] bytes) {
-    return fromByteBuffer(ByteBuffer.wrap(bytes));
-  }
-
-  public static VarianceTuple fromByteBuffer(ByteBuffer byteBuffer) {
-    return new VarianceTuple(byteBuffer.getLong(), byteBuffer.getDouble(), byteBuffer.getDouble());
-  }
-
-  @Override
-  public int compareTo(VarianceTuple varianceTuple) {
-    if (_count == 0) {
-      if (varianceTuple._count == 0) {
-        return 0;
-      } else {
-        return -1;
-      }
-    } else {
-      if (varianceTuple._count == 0) {
-        return 1;
-      } else {
-        if (_m2 > varianceTuple._m2) {
-          return 1;
-        }
-        if (_m2 < varianceTuple._m2) {
-          return -1;
-        }
-        return 0;
-      }
-    }
-  }
-}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 584c99a7b2..19d2f4d6e5 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -59,10 +59,6 @@ public enum AggregationFunctionType {
   HISTOGRAM("histogram"),
   COVARPOP("covarPop"),
   COVARSAMP("covarSamp"),
-  VARPOP("varPop"),
-  VARSAMP("varSamp"),
-  STDDEVPOP("stdDevPop"),
-  STDDEVSAMP("stdDevSamp"),
 
   // Geo aggregation functions
   STUNION("STUnion"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org