You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/06/19 11:52:30 UTC

[impala] 02/02: IMPALA-11205: Implement Statistical functions: CORR(), COVAR_SAMP() and COVAR_POP()

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 256f37f17f62c21927e3aebee57d512758a42f81
Author: pranav.lodha <pr...@cloudera.com>
AuthorDate: Wed Apr 13 17:22:28 2022 +0530

    IMPALA-11205: Implement Statistical functions: CORR(), COVAR_SAMP()
     and COVAR_POP()
    
    CORR() function takes two numeric type columns as arguments and returns
    the Pearson's correlation coefficient between them.
    COVAR_SAMP() function takes two numeric type columns and returns sample
     covariance between them.
    COVAR_POP() function takes two numeric type columns and returns
     population covariance between them.
    These UDAFs are tested with a few query tests written in aggregation.test.
    
    Change-Id: I32ad627c953ba24d9cde2d5549bdd0d27a9c0d06
    Reviewed-on: http://gerrit.cloudera.org:8080/18413
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/aggregate-functions-ir.cc             | 356 ++++++++++++
 be/src/exprs/aggregate-functions.h                 |  32 ++
 .../java/org/apache/impala/catalog/BuiltinsDb.java |  75 +++
 .../queries/QueryTest/aggregation.test             | 611 +++++++++++++++++++++
 4 files changed, 1074 insertions(+)

diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index fe5b3248b..0bb0b9cc6 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -288,6 +288,362 @@ void AggregateFunctions::CountMerge(FunctionContext*, const BigIntVal& src,
   dst->val += src.val;
 }
 
+// Implementation of CORR() function which takes two arguments of numeric type
+// and returns the Pearson's correlation coefficient between them using the Welford's
+// online algorithm. This is calculated using a stable one-pass algorithm, based on
+// work by Philippe Pébay and Donald Knuth.
+// Implementation of CORR() is independent of the implementation of COVAR_SAMP() and
+// COVAR_POP() so changes in one would probably need to be reflected in the other as well.
+// Few useful links :
+// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online
+// https://www.osti.gov/biblio/1028931
+// Correlation coefficient formula used:
+// r = covar / (√(xvar * yvar)
+struct CorrState {
+  int64_t count; // number of elements
+  double xavg; // average of x elements
+  double yavg; // average of y elements
+  double xvar; // n times the variance of x elements
+  double yvar; // n times the variance of y elements
+  double covar; // n times the covariance
+};
+
+void AggregateFunctions::CorrInit(FunctionContext* ctx, StringVal* dst) {
+  dst->is_null = false;
+  dst->len = sizeof(CorrState);
+  dst->ptr = ctx->Allocate(dst->len);
+  memset(dst->ptr, 0, dst->len);
+}
+
+static inline void CorrUpdateState(double x, double y, CorrState* state) {
+  double deltaX = x - state->xavg;
+  double deltaY = y - state->yavg;
+  ++state->count;
+  // mx_n = mx_(n - 1) + [x_n - mx_(n - 1)] / n
+  state->xavg += deltaX / state->count;
+  // my_n = my_(n - 1) + [y_n - my_(n - 1)] / n
+  state->yavg += deltaY / state->count;
+  if (state->count > 1) {
+    // c_n = c_(n - 1) + (x_n - mx_n) * (y_n - my_(n - 1)) OR
+    // c_n = c_(n - 1) + (x_n - mx_(n - 1)) * (y_n - my_n)
+    // The apparent asymmetry in the equations is due to the fact that,
+    // x_n - mx_n = (n - 1) * (x_n - mx_(n - 1)) / n, so both update terms are equal to
+    // (n - 1) * (x_n - mx_(n - 1)) * (y_n - my_(n - 1)) / n
+    state->covar += deltaX * (y - state->yavg);
+    // vx_n = vx_(n - 1) + (x_n - mx_(n - 1)) * (x_n - mx_n)
+    state->xvar += deltaX * (x - state->xavg);
+    // vy_n = vy_(n - 1) + (y_n - my_(n - 1)) * (y_n - my_n)
+    state->yvar += deltaY * (y - state->yavg);
+  }
+}
+
+static inline void CorrRemoveState(double x, double y, CorrState* state) {
+  double deltaX = x - state->xavg;
+  double deltaY = y - state->yavg;
+  if (state->count <= 1) {
+    memset(state, 0, sizeof(CorrState));
+  } else {
+    --state->count;
+    // mx_(n - 1) = mx_n - (x_n - mx_n) / (n - 1)
+    state->xavg -= deltaX / state->count;
+    // my_(n - 1) = my_n - (y_n - my_n) / (n - 1)
+    state->yavg -= deltaY / state->count;
+    // c_(n - 1) = c_n - (x_n - mx_n) * (y_n - my_(n -1))
+    state->covar -= deltaX * (y - state->yavg);
+    // vx_(n - 1) = vx_n - (x_n - mx_n) * (x_n - mx_(n - 1))
+    state->xvar -= deltaX * (x - state->xavg);
+    // vy_(n - 1) = vy_n - (y_n - my_n) * (y_n - my_(n - 1))
+    state->yvar -= deltaY * (y - state->yavg);
+  }
+}
+
+void AggregateFunctions::CorrUpdate(FunctionContext* ctx,
+    const DoubleVal& src1, const DoubleVal& src2, StringVal* dst) {
+  if (src1.is_null || src2.is_null) return;
+  DCHECK(dst->ptr != nullptr);
+  DCHECK_EQ(sizeof(CorrState), dst->len);
+  CorrState* state = reinterpret_cast<CorrState*>(dst->ptr);
+  CorrUpdateState(src1.val, src2.val, state);
+}
+
+void AggregateFunctions::CorrRemove(FunctionContext* ctx,
+    const DoubleVal& src1, const DoubleVal& src2, StringVal* dst) {
+  // Remove doesn't need to explicitly check the number of calls to Update() or Remove()
+  // because Finalize() returns NULL if count is 0. In other words, it's not needed to
+  // check if num_removes() >= num_updates() as it's accounted for in Finalize().
+  if (src1.is_null || src2.is_null) return;
+  DCHECK(dst->ptr != nullptr);
+  DCHECK_EQ(sizeof(CorrState), dst->len);
+  CorrState* state = reinterpret_cast<CorrState*>(dst->ptr);
+  CorrRemoveState(src1.val, src2.val, state);
+}
+
+void AggregateFunctions::TimestampCorrUpdate(FunctionContext* ctx,
+    const TimestampVal& src1, const TimestampVal& src2, StringVal* dst) {
+  if (src1.is_null || src2.is_null) return;
+  CorrState* state = reinterpret_cast<CorrState*>(dst->ptr);
+  const TimestampValue& tm_src1 = TimestampValue::FromTimestampVal(src1);
+  const TimestampValue& tm_src2 = TimestampValue::FromTimestampVal(src2);
+  double val1, val2;
+  if (tm_src1.ToSubsecondUnixTime(UTCPTR, &val1) &&
+      tm_src2.ToSubsecondUnixTime(UTCPTR, &val2)) {
+    CorrUpdateState(val1, val2, state);
+  }
+}
+
+void AggregateFunctions::TimestampCorrRemove(FunctionContext* ctx,
+    const TimestampVal& src1, const TimestampVal& src2, StringVal* dst) {
+  // Remove doesn't need to explicitly check the number of calls to Update() or Remove()
+  // because Finalize() returns NULL if count is 0. In other words, it's not needed to
+  // check if num_removes() >= num_updates() as it's accounted for in Finalize().
+  if (src1.is_null || src2.is_null) return;
+  CorrState* state = reinterpret_cast<CorrState*>(dst->ptr);
+  const TimestampValue& tm_src1 = TimestampValue::FromTimestampVal(src1);
+  const TimestampValue& tm_src2 = TimestampValue::FromTimestampVal(src2);
+  double val1, val2;
+  if (tm_src1.ToSubsecondUnixTime(UTCPTR, &val1) &&
+      tm_src2.ToSubsecondUnixTime(UTCPTR, &val2)) {
+    CorrRemoveState(val1, val2, state);
+  }
+}
+
+void AggregateFunctions::CorrMerge(FunctionContext* ctx,
+    const StringVal& src, StringVal* dst) {
+  CorrState* src_state = reinterpret_cast<CorrState*>(src.ptr);
+  DCHECK(dst->ptr != nullptr);
+  DCHECK_EQ(sizeof(CorrState), dst->len);
+  CorrState* dst_state = reinterpret_cast<CorrState*>(dst->ptr);
+  if (src.ptr != nullptr) {
+    int64_t nA = dst_state->count;
+    int64_t nB = src_state->count;
+    if (nA == 0) {
+      memcpy(dst_state, src_state, sizeof(CorrState));
+      return;
+    }
+    if (nA != 0 && nB != 0) {
+      double xavgA = dst_state->xavg;
+      double yavgA = dst_state->yavg;
+      double xavgB = src_state->xavg;
+      double yavgB = src_state->yavg;
+      double xvarB = src_state->xvar;
+      double yvarB = src_state->yvar;
+      double covarB = src_state->covar;
+
+      dst_state->count += nB;
+      dst_state->xavg = (xavgA * nA + xavgB * nB) / dst_state->count;
+      dst_state->yavg = (yavgA * nA + yavgB * nB) / dst_state->count;
+      // vx_(A,B) = vx_A + vx_B + (mx_A - mx_B) * (mx_A - mx_B) * n_A * n_B / (n_A + n_B)
+      dst_state->xvar +=
+          xvarB + (xavgA - xavgB) * (xavgA - xavgB) * nA * nB / dst_state->count;
+      // vy_(A,B) = vy_A + vy_B + (my_A - my_B) * (my_A - my_B) * n_A * n_B / (n_A + n_B)
+      dst_state->yvar +=
+          yvarB + (yavgA - yavgB) * (yavgA - yavgB) * nA * nB / dst_state->count;
+      // c_(A,B) = c_A + c_B + (mx_A - mx_B) * (my_A - my_B) * n_A * n_B / (n_A + n_B)
+      dst_state->covar += covarB
+          + (xavgA - xavgB) * (yavgA - yavgB) * ((double)(nA * nB)) / (dst_state->count);
+    }
+  }
+}
+
+DoubleVal AggregateFunctions::CorrGetValue(FunctionContext* ctx, const StringVal& src) {
+  CorrState* state = reinterpret_cast<CorrState*>(src.ptr);
+  // Calculating Pearson's correlation coefficient
+  // xvar and yvar become negative in certain cases due to floating point rounding error.
+  // Since these values are very small, they can be ignored and rounded to 0.
+  DCHECK(state->xvar >= -1E-8);
+  DCHECK(state->yvar >= -1E-8);
+  if (state->count == 0 || state->count == 1 || state->xvar <= 0.0 ||
+      state->yvar <= 0.0) {
+    return DoubleVal::null();
+  }
+  double r = sqrt(state->xvar * state->yvar);
+  if (r == 0.0) return DoubleVal::null();
+  double corr = state->covar / r;
+  return DoubleVal(corr);
+}
+
+DoubleVal AggregateFunctions::CorrFinalize(FunctionContext* ctx, const StringVal& src) {
+  CorrState* state = reinterpret_cast<CorrState*>(src.ptr);
+  if (UNLIKELY(src.is_null) || state->count == 0 || state->count == 1) {
+    ctx->Free(src.ptr);
+    return DoubleVal::null();
+  }
+  DoubleVal r = CorrGetValue(ctx, src);
+  ctx->Free(src.ptr);
+  return r;
+}
+
+// Implementation of COVAR_SAMP() and COVAR_POP() which calculates sample and
+// population covariance between two columns of numeric types respectively using
+// the Welford's online algorithm.
+// Sample covariance:
+// r = covar / (n-1)
+// Population covariance:
+// r = covar / (n)
+struct CovarState {
+  int64_t count; // number of elements
+  double xavg; // average of x elements
+  double yavg; // average of y elements
+  double covar; // n times the covariance
+};
+
+void AggregateFunctions::CovarInit(FunctionContext* ctx, StringVal* dst) {
+  dst->is_null = false;
+  dst->len = sizeof(CovarState);
+  dst->ptr = ctx->Allocate(dst->len);
+  memset(dst->ptr, 0, dst->len);
+}
+
+static inline void CovarUpdateState(double x, double y, CovarState* state) {
+  ++state->count;
+  // my_n = my_(n - 1) + [y_n - my_(n - 1)] / n
+  state->yavg += (y - state->yavg) / state->count;
+  // c_n = c_(n - 1) + (x_n - mx_(n - 1)) * (y_n - my_n) OR
+  // c_n = c_(n - 1) + (x_n - mx_n) * (y_n - my_(n - 1))
+  // The apparent asymmetry in the equations is due to the fact that,
+  // x_n - mx_n = (n - 1) * (x_n - mx_(n - 1)) / n, so both terms are equal to
+  // (n - 1) * (x_n - mx_(n - 1)) * (y_n - my_(n - 1)) / n
+  if (state->count > 1) state->covar += (x - state->xavg) * (y - state->yavg);
+  // mx_n = mx_(n - 1) + [x_n - mx_(n - 1)] / n
+  state->xavg += (x - state->xavg) / state->count;
+}
+
+static inline void CovarRemoveState(double x, double y, CovarState* state){
+  if (state->count <= 1) {
+    memset(state, 0, sizeof(CovarState));
+  } else {
+    --state->count;
+    // my_(n - 1) = my_n - (y_n - my_n) / (n - 1)
+    state->yavg -= (y - state->yavg) / state->count;
+    // c_(n - 1) = c_n - (x_n - mx_(n - 1)) * (y_n - my_n)
+    state->covar -= (x - state->xavg) * (y - state->yavg);
+    // mx_(n - 1) = mx_n - (x_n - mx_n) / (n - 1)
+    state->xavg -= (x - state->xavg) / state->count;
+  }
+}
+
+void AggregateFunctions::CovarUpdate(FunctionContext* ctx,
+    const DoubleVal& src1, const DoubleVal& src2, StringVal* dst) {
+  if (src1.is_null || src2.is_null) return;
+  DCHECK(dst->ptr != nullptr);
+  DCHECK_EQ(sizeof(CovarState), dst->len);
+  CovarState* state = reinterpret_cast<CovarState*>(dst->ptr);
+  CovarUpdateState(src1.val, src2.val, state);
+}
+
+void AggregateFunctions::CovarRemove(FunctionContext* ctx,
+    const DoubleVal& src1, const DoubleVal& src2, StringVal* dst) {
+  // Remove doesn't need to explicitly check the number of calls to Update() or Remove()
+  // because Finalize() returns NULL if count is 0. In other words, it's not needed to
+  // check if num_removes() >= num_updates() as it's accounted for in Finalize().
+  if (src1.is_null || src2.is_null) return;
+  DCHECK(dst->ptr != nullptr);
+  DCHECK_EQ(sizeof(CovarState), dst->len);
+  CovarState* state = reinterpret_cast<CovarState*>(dst->ptr);
+  CovarRemoveState(src1.val, src2.val, state);
+}
+
+void AggregateFunctions::TimestampCovarUpdate(FunctionContext* ctx,
+    const TimestampVal& src1, const TimestampVal& src2, StringVal* dst) {
+  if (src1.is_null || src2.is_null) return;
+  CovarState* state = reinterpret_cast<CovarState*>(dst->ptr);
+  const TimestampValue& tm_src1 = TimestampValue::FromTimestampVal(src1);
+  const TimestampValue& tm_src2 = TimestampValue::FromTimestampVal(src2);
+  double val1, val2;
+  if (tm_src1.ToSubsecondUnixTime(UTCPTR, &val1) &&
+      tm_src2.ToSubsecondUnixTime(UTCPTR, &val2)) {
+    CovarUpdateState(val1, val2, state);
+  }
+}
+
+void AggregateFunctions::TimestampCovarRemove(FunctionContext* ctx,
+    const TimestampVal& src1, const TimestampVal& src2, StringVal* dst) {
+  // Remove doesn't need to explicitly check the number of calls to Update() or Remove()
+  // because Finalize() returns NULL if count is 0. In other words, it's not needed to
+  // check if num_removes() >= num_updates() as it's accounted for in Finalize().
+  if (src1.is_null || src2.is_null) return;
+  CovarState* state = reinterpret_cast<CovarState*>(dst->ptr);
+  const TimestampValue& tm_src1 = TimestampValue::FromTimestampVal(src1);
+  const TimestampValue& tm_src2 = TimestampValue::FromTimestampVal(src2);
+  double val1, val2;
+  if (tm_src1.ToSubsecondUnixTime(UTCPTR, &val1) &&
+      tm_src2.ToSubsecondUnixTime(UTCPTR, &val2)) {
+    CovarRemoveState(val1, val2, state);
+  }
+}
+
+void AggregateFunctions::CovarMerge(FunctionContext* ctx,
+    const StringVal& src, StringVal* dst) {
+  CovarState* src_state = reinterpret_cast<CovarState*>(src.ptr);
+  DCHECK(dst->ptr != nullptr);
+  DCHECK_EQ(sizeof(CovarState), dst->len);
+  CovarState* dst_state = reinterpret_cast<CovarState*>(dst->ptr);
+  if (src.ptr != nullptr) {
+    int64_t nA = dst_state->count;
+    int64_t nB = src_state->count;
+    if (nA == 0) {
+      memcpy(dst_state, src_state, sizeof(CovarState));
+      return;
+    }
+    if (nA != 0 && nB != 0) {
+      double xavgA = dst_state->xavg;
+      double yavgA = dst_state->yavg;
+      double xavgB = src_state->xavg;
+      double yavgB = src_state->yavg;
+      double covarB = src_state->covar;
+
+      dst_state->count += nB;
+      dst_state->xavg = (xavgA * nA + xavgB * nB) / dst_state->count;
+      dst_state->yavg = (yavgA * nA + yavgB * nB) / dst_state->count;
+      // c_(A,B) = c_A + c_B + (mx_A - mx_B) * (my_A - my_B) * n_A * n_B / (n_A + n_B)
+      dst_state->covar += covarB
+          + (xavgA - xavgB) * (yavgA - yavgB) * ((double)(nA * nB)) / (dst_state->count);
+    }
+  }
+}
+
+DoubleVal AggregateFunctions::CovarSampleGetValue(FunctionContext* ctx,
+    const StringVal& src) {
+  // Calculating sample covariance
+  CovarState* state = reinterpret_cast<CovarState*>(src.ptr);
+  if (state->count == 0 || state->count == 1) return DoubleVal::null();
+  double covar_samp = state->covar / (state->count - 1);
+  return DoubleVal(covar_samp);
+}
+
+DoubleVal AggregateFunctions::CovarPopulationGetValue(FunctionContext* ctx,
+    const StringVal& src) {
+  // Calculating population covariance
+  CovarState* state = reinterpret_cast<CovarState*>(src.ptr);
+  if (state->count == 0) return DoubleVal::null();
+  double covar_pop = state->covar / (state->count);
+  return DoubleVal(covar_pop);
+}
+
+DoubleVal AggregateFunctions::CovarSampleFinalize(FunctionContext* ctx,
+    const StringVal& src) {
+  CovarState* state = reinterpret_cast<CovarState*>(src.ptr);
+  if (UNLIKELY(src.is_null) || state->count == 0 || state->count == 1) {
+    ctx->Free(src.ptr);
+    return DoubleVal::null();
+  }
+  DoubleVal r = CovarSampleGetValue(ctx, src);
+  ctx->Free(src.ptr);
+  return r;
+}
+
+DoubleVal AggregateFunctions::CovarPopulationFinalize(FunctionContext* ctx,
+    const StringVal& src) {
+  CovarState* state = reinterpret_cast<CovarState*>(src.ptr);
+  if (UNLIKELY(src.is_null) || state->count == 0) {
+    ctx->Free(src.ptr);
+    return DoubleVal::null();
+  }
+  DoubleVal r = CovarPopulationGetValue(ctx, src);
+  ctx->Free(src.ptr);
+  return r;
+}
+
 struct AvgState {
   double sum;
   int64_t count;
diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h
index cec5b8ed6..0e8c53a50 100644
--- a/be/src/exprs/aggregate-functions.h
+++ b/be/src/exprs/aggregate-functions.h
@@ -64,6 +64,38 @@ class AggregateFunctions {
   static StringVal StringValSerializeOrFinalize(
       FunctionContext* ctx, const StringVal& src);
 
+  /// Implementation of Corr()
+  static void CorrInit(FunctionContext* ctx, StringVal* dst);
+  static void CorrUpdate(FunctionContext* ctx, const DoubleVal& src1,
+      const DoubleVal& src2, StringVal* dst);
+  static void CorrRemove(FunctionContext* ctx, const DoubleVal& src1,
+      const DoubleVal& src2, StringVal* dst);
+  static void TimestampCorrUpdate(FunctionContext* ctx,
+      const TimestampVal& src1, const TimestampVal& src2, StringVal* dst);
+  static void TimestampCorrRemove(FunctionContext* ctx,
+      const TimestampVal& src1, const TimestampVal& src2, StringVal* dst);
+  static void CorrMerge(FunctionContext* ctx, const StringVal& src, StringVal* dst);
+  static DoubleVal CorrGetValue(FunctionContext* ctx, const StringVal& src);
+  static DoubleVal CorrFinalize(FunctionContext* ctx, const StringVal& src);
+
+  /// Implementation of Covar_samp() and Covar_pop()
+  static void CovarInit(FunctionContext* ctx, StringVal* dst);
+  static void CovarUpdate(FunctionContext* ctx, const DoubleVal& src1,
+      const DoubleVal& src2, StringVal* dst);
+  static void CovarRemove(FunctionContext* ctx, const DoubleVal& src1,
+      const DoubleVal& src2, StringVal* dst);
+  static void TimestampCovarUpdate(FunctionContext* ctx,
+      const TimestampVal& src1, const TimestampVal& src2, StringVal* dst);
+  static void TimestampCovarRemove(FunctionContext* ctx,
+      const TimestampVal& src1, const TimestampVal& src2, StringVal* dst);
+  static void CovarMerge(FunctionContext* ctx, const StringVal& src, StringVal* dst);
+  static DoubleVal CovarSampleGetValue(FunctionContext* ctx, const StringVal& src);
+  static DoubleVal CovarPopulationGetValue(FunctionContext* ctx, const StringVal& src);
+  static DoubleVal CovarSampleFinalize(FunctionContext* ctx,
+      const StringVal& src);
+  static DoubleVal CovarPopulationFinalize(FunctionContext* ctx,
+      const StringVal& src);
+
   /// Implementation of Count and Count(*)
   static void CountUpdate(FunctionContext*, const AnyVal& src, BigIntVal* dst);
   static void CountStarUpdate(FunctionContext*, BigIntVal* dst);
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index 356cf5461..889210851 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -1319,6 +1319,81 @@ public class BuiltinsDb extends Db {
         prefix + "9SumRemoveIN10impala_udf9BigIntValES3_EEvPNS2_15FunctionContextERKT_PT0_",
         null, false, true, true));
 
+    // Corr()
+    db.addBuiltin(AggregateFunction.createBuiltin(db, "corr",
+        Lists.<Type>newArrayList(Type.DOUBLE, Type.DOUBLE), Type.DOUBLE, Type.STRING,
+        prefix + "8CorrInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+        prefix + "10CorrUpdateEPN10impala_udf15FunctionContextERKNS1_9DoubleValES6_PNS1_9StringValE",
+        prefix + "9CorrMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+        stringValSerializeOrFinalize,
+        prefix + "12CorrGetValueEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        prefix + "10CorrRemoveEPN10impala_udf15FunctionContextERKNS1_9DoubleValES6_PNS1_9StringValE",
+        prefix + "12CorrFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        false, true, false));
+
+    db.addBuiltin(AggregateFunction.createBuiltin(db, "corr",
+        Lists.<Type>newArrayList(Type.TIMESTAMP, Type.TIMESTAMP), Type.DOUBLE, Type.STRING,
+        prefix + "8CorrInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+        prefix +
+                "19TimestampCorrUpdateEPN10impala_udf15FunctionContextERKNS1_12TimestampValES6_PNS1_9StringValE",
+        prefix + "9CorrMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+        stringValSerializeOrFinalize,
+        prefix + "12CorrGetValueEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        prefix +
+                "19TimestampCorrRemoveEPN10impala_udf15FunctionContextERKNS1_12TimestampValES6_PNS1_9StringValE",
+        prefix + "12CorrFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        false, true, false));
+
+    // Covar_samp()
+    db.addBuiltin(AggregateFunction.createBuiltin(db, "covar_samp",
+        Lists.<Type>newArrayList(Type.DOUBLE, Type.DOUBLE), Type.DOUBLE, Type.STRING,
+        prefix + "9CovarInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+        prefix + "11CovarUpdateEPN10impala_udf15FunctionContextERKNS1_9DoubleValES6_PNS1_9StringValE",
+        prefix + "10CovarMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+        stringValSerializeOrFinalize,
+        prefix + "19CovarSampleGetValueEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        prefix + "11CovarRemoveEPN10impala_udf15FunctionContextERKNS1_9DoubleValES6_PNS1_9StringValE",
+        prefix + "19CovarSampleFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        false, true, false));
+
+    db.addBuiltin(AggregateFunction.createBuiltin(db, "covar_samp",
+        Lists.<Type>newArrayList(Type.TIMESTAMP, Type.TIMESTAMP), Type.DOUBLE, Type.STRING,
+        prefix + "9CovarInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+        prefix +
+                "20TimestampCovarUpdateEPN10impala_udf15FunctionContextERKNS1_12TimestampValES6_PNS1_9StringValE",
+        prefix + "10CovarMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+        stringValSerializeOrFinalize,
+        prefix + "19CovarSampleGetValueEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        prefix +
+                "20TimestampCovarRemoveEPN10impala_udf15FunctionContextERKNS1_12TimestampValES6_PNS1_9StringValE",
+        prefix + "19CovarSampleFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        false, true, false));
+
+    // Covar_pop()
+    db.addBuiltin(AggregateFunction.createBuiltin(db, "covar_pop",
+        Lists.<Type>newArrayList(Type.DOUBLE, Type.DOUBLE), Type.DOUBLE, Type.STRING,
+        prefix + "9CovarInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+        prefix + "11CovarUpdateEPN10impala_udf15FunctionContextERKNS1_9DoubleValES6_PNS1_9StringValE",
+        prefix + "10CovarMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+        stringValSerializeOrFinalize,
+        prefix + "23CovarPopulationGetValueEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        prefix + "11CovarRemoveEPN10impala_udf15FunctionContextERKNS1_9DoubleValES6_PNS1_9StringValE",
+        prefix + "23CovarPopulationFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        false, true, false));
+
+    db.addBuiltin(AggregateFunction.createBuiltin(db, "covar_pop",
+        Lists.<Type>newArrayList(Type.TIMESTAMP, Type.TIMESTAMP), Type.DOUBLE, Type.STRING,
+        prefix + "9CovarInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+        prefix +
+                "20TimestampCovarUpdateEPN10impala_udf15FunctionContextERKNS1_12TimestampValES6_PNS1_9StringValE",
+        prefix + "10CovarMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+        stringValSerializeOrFinalize,
+        prefix + "23CovarPopulationGetValueEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        prefix +
+                "20TimestampCovarRemoveEPN10impala_udf15FunctionContextERKNS1_12TimestampValES6_PNS1_9StringValE",
+        prefix + "23CovarPopulationFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        false, true, false));
+
     // Avg
     Type avgIntermediateType =
         ScalarType.createFixedUdaIntermediateType(AVG_INTERMEDIATE_SIZE);
diff --git a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
index f0f311226..98267d4c7 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
@@ -1427,3 +1427,614 @@ select count(*) from tpch_parquet.orders o group by o.o_clerk limit 10
 ---- TYPES
 bigint
 ====
+---- QUERY
+# CORR() function examples
+select corr(ps_availqty, ps_supplycost) from tpch.partsupp;
+---- RESULTS
+0.000321849166315
+---- TYPES
+double
+====
+---- QUERY
+# Behavior of CORR() on null table
+select corr(d, e) from functional.nulltable;
+---- RESULTS
+NULL
+---- TYPES
+double
+====
+---- QUERY
+# Behavior of CORR() on empty table
+select corr(f2, f2) from functional.emptytable;
+---- RESULTS
+NULL
+---- TYPES
+double
+====
+---- QUERY
+# CORR() on different datatypes
+select corr(tinyint_col, tinyint_col), corr(smallint_col, smallint_col),
+ corr(int_col, int_col), corr(bigint_col, bigint_col), corr(float_col, float_col),
+ corr(double_col, double_col), corr(timestamp_col, timestamp_col) from functional.alltypes;
+---- RESULTS
+1.0,1.0,1.0,1.0,1.0,1.0,1.0
+---- TYPES
+double,double,double,double,double,double,double
+====
+---- QUERY
+# CORR() on timestamp columns
+select corr(utctime, localtime) from functional.alltimezones;
+---- RESULTS
+0.999995807708
+---- TYPES
+double
+====
+---- QUERY
+# Since group by id will result in a single row, this test shows that corr() returns null in case of a single row.
+select id, corr(int_col, int_col) from functional.alltypestiny group by id;
+---- RESULTS
+2,NULL
+4,NULL
+0,NULL
+6,NULL
+1,NULL
+7,NULL
+3,NULL
+5,NULL
+---- TYPES
+int,double
+====
+---- QUERY
+# CORR() on decimal datatype
+select corr(d3, d4) from functional.decimal_tbl;
+---- RESULTS
+NULL
+---- TYPES
+double
+====
+---- QUERY
+select year, corr(double_col, double_col) from functional.alltypes group by year;
+---- RESULTS
+2009,1.0
+2010,1.0
+---- TYPES
+int,double
+====
+---- QUERY
+select corr(double_col, -double_col) from functional.alltypes;
+---- RESULTS
+-1.0
+---- TYPES
+double
+====
+---- QUERY
+select corr(double_col, double_col) from functional.alltypes;
+---- RESULTS
+1.0
+---- TYPES
+double
+====
+---- QUERY
+select corr(ss_sold_time_sk, ss_quantity) from tpcds.store_sales;
+---- RESULTS
+0.000136790587885
+---- TYPES
+double
+====
+---- QUERY
+select s_store_sk, corr(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk)
+ from tpcds.store;
+---- RESULTS
+5,NULL
+8,1.0
+12,-0.328248435346
+1,NULL
+2,-1.0
+3,-0.511481796057
+4,-0.881269090479
+6,-0.853760432996
+7,0.0825005896565
+9,0.0282032912339
+10,0.223462144551
+11,0.325760965741
+---- TYPES
+int,double
+====
+---- QUERY
+select id, double_col, corr(double_col, int_col) over (partition by month order by id) from functional.alltypes
+ order by id limit 10;
+---- RESULTS
+0,0.0,NULL
+1,10.1,1.0
+2,20.2,1.0
+3,30.3,1.0
+4,40.4,1.0
+5,50.5,1.0
+6,60.6,1.0
+7,70.7,1.0
+8,80.8,1.0
+9,90.9,1.0
+---- TYPES
+int,double,double
+====
+---- QUERY
+# CORR() when one column is filled with null
+select corr(null_int, rand()), corr(rand(), null_int) from functional.nullrows;
+---- RESULTS
+NULL,NULL
+---- TYPES
+double,double
+====
+---- QUERY
+# CORR() supporting join
+select corr(A.double_col, B.double_col) from functional.alltypes A, functional.alltypes B where A.id=B.id;
+---- RESULTS
+1.0
+---- TYPES
+double
+====
+---- QUERY
+# Tests functioning of CorrRemoveState()
+select s_store_sk, corr(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between 5 preceding and 2 following) from tpcds.store;
+---- RESULTS
+5,-0.328248435346
+8,-0.328248435346
+12,-0.328248435346
+1,-0.511481796057
+2,-0.881269090479
+3,-0.853760432996
+4,0.0825005896565
+6,0.0282032912339
+7,0.223462144551
+9,0.294637370885
+10,0.115190500757
+11,-0.0690212865931
+---- TYPES
+int,double
+====
+---- QUERY
+# Mathematical operations on double can lead to variance becoming negative by a very small amount (around +1e-13),
+# to avoid that a check is added (state->xvar <= 0.0 || state->yvar <= 0.0), without which the below test will
+# result in nan for certain cases.
+select s_store_sk, corr(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between 1 preceding and 1 following) from tpcds.store;
+---- RESULTS
+5,1.0
+8,-0.328248435346
+12,-1.0
+1,-1.0
+2,-0.511481796057
+3,-0.829250636825
+4,-0.884750027905
+6,-0.735742100541
+7,-0.165224541153
+9,0.97066733931
+10,1.0
+11,NULL
+---- TYPES
+int,double
+====
+---- QUERY
+# Corr() when window size is 2
+select s_store_sk, corr(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between 1 preceding and current row) from tpcds.store;
+---- RESULTS
+5,NULL
+8,1.0
+12,-1.0
+1,NULL
+2,-1.0
+3,NULL
+4,-1.0
+6,-1.0
+7,-1.0
+9,1.0
+10,1.0
+11,NULL
+---- TYPES
+int,double
+====
+---- QUERY
+select s_store_sk, corr(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between current row and 1 following) from tpcds.store;
+---- RESULTS
+5,1.0
+8,-1.0
+12,NULL
+1,-1.0
+2,NULL
+3,-1.0
+4,-1.0
+6,-1.0
+7,1.0
+9,1.0
+10,NULL
+11,NULL
+---- TYPES
+int,double
+====
+---- QUERY
+select s_store_sk, corr(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between current row and unbounded following) from tpcds.store;
+---- RESULTS
+12,NULL
+8,-1.0
+5,-0.328248435346
+11,NULL
+10,NULL
+9,1.0
+7,0.965705311825
+6,0.17735847272
+4,-0.0690212865931
+3,0.115190500757
+2,0.294637370885
+1,0.325760965741
+---- TYPES
+int,double
+====
+---- QUERY
+select s_store_sk, corr(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between unbounded preceding and current row) from tpcds.store;
+---- RESULTS
+5,NULL
+8,1.0
+12,-0.328248435346
+1,NULL
+2,-1.0
+3,-0.511481796057
+4,-0.881269090479
+6,-0.853760432996
+7,0.0825005896565
+9,0.0282032912339
+10,0.223462144551
+11,0.325760965741
+---- TYPES
+int,double
+====
+---- QUERY
+# Sample and population covariance
+select covar_samp(ps_availqty, ps_supplycost), covar_pop(ps_availqty, ps_supplycost) from tpch.partsupp;
+---- RESULTS
+267.607127282,267.606792773
+---- TYPES
+double,double
+====
+---- QUERY
+# covar_samp() and covar_pop() on null table
+select covar_samp(d, e), covar_pop(d, e) from functional.nulltable;
+---- RESULTS
+NULL,NULL
+---- TYPES
+double,double
+====
+---- QUERY
+# covar_samp() on different datatypes
+select covar_samp(tinyint_col, tinyint_col), covar_samp(smallint_col, smallint_col),
+ covar_samp(int_col, int_col), covar_samp(bigint_col, bigint_col), covar_samp(float_col, float_col),
+ covar_samp(double_col, double_col) from functional.alltypes;
+---- RESULTS
+8.251130291820797,8.251130291820797,8.251130291820797,825.1130291820797,9.983867246543589,841.6978010686292
+---- TYPES
+double,double,double,double,double,double
+====
+---- QUERY
+# covar_pop() on different datatypes
+select covar_pop(tinyint_col, tinyint_col), covar_pop(smallint_col, smallint_col),
+ covar_pop(int_col, int_col), covar_pop(bigint_col, bigint_col), covar_pop(float_col, float_col),
+ covar_pop(double_col, double_col) from functional.alltypes;
+---- RESULTS
+8.25,8.25,8.25,825.0,9.9824995935,841.5825
+---- TYPES
+double,double,double,double,double,double
+====
+---- QUERY
+# Behavior of covar_samp() and covar_pop on empty table
+select covar_samp(f2, f2), covar_pop(f2, f2) from functional.emptytable;
+---- RESULTS
+NULL,NULL
+---- TYPES
+double,double
+====
+---- QUERY
+# covar_samp() and covar_pop() on timestamp datatype. The expression is divided by a large number like 1E+10
+# because unlike overfow in int, overflow in double looses precision
+select covar_samp(utctime, localtime)/1E+10, covar_pop(utctime, localtime)/1E+10 from functional.alltimezones;
+---- RESULTS
+5502.915940016994,5496.494801230509
+---- TYPES
+double,double
+====
+---- QUERY
+# Since group by id will result in a single row, this test shows
+# behavior of covar_samp() and covar_pop() on a single row
+select id, covar_samp(int_col, int_col), covar_pop(int_col, int_col) from functional.alltypestiny group by id;
+---- RESULTS
+2,NULL,0.0
+4,NULL,0.0
+0,NULL,0.0
+6,NULL,0.0
+1,NULL,0.0
+7,NULL,0.0
+3,NULL,0.0
+5,NULL,0.0
+---- TYPES
+int,double,double
+====
+---- QUERY
+# covar_samp() and covar_pop() on decimal type
+select covar_samp(d3, d4), covar_pop(d3, d4) from functional.decimal_tbl;
+---- RESULTS
+0.0,0.0
+---- TYPES
+double,double
+====
+---- QUERY
+# covar_samp() and covar_pop() with group by clause
+select year, covar_samp(double_col, double_col),
+ covar_pop(double_col, double_col) from functional.alltypes group by year;
+---- RESULTS
+2009,841.813133735,841.5825
+2010,841.813133735,841.5825
+---- TYPES
+int,double,double
+====
+---- QUERY
+# Example of negative covar_samp() and covar_pop()
+select covar_samp(double_col, -double_col), covar_pop(double_col, -double_col) from functional.alltypes;
+---- RESULTS
+-841.697801069,-841.5825
+---- TYPES
+double,double
+====
+---- QUERY
+select covar_samp(double_col, double_col), covar_pop(double_col, double_col) from functional.alltypes;
+---- RESULTS
+841.697801069,841.5825
+---- TYPES
+double,double
+====
+---- QUERY
+select covar_samp(ss_sold_time_sk, ss_quantity), covar_pop(ss_sold_time_sk, ss_quantity) from tpcds.store_sales;
+---- RESULTS
+50.2400502845,50.2400315775
+---- TYPES
+double,double
+====
+---- QUERY
+# covar_samp() on analytic query
+select s_store_sk, covar_samp(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk)
+ from tpcds.store;
+---- RESULTS
+5,NULL
+8,10414050.0
+12,-5125047.666666668
+1,NULL
+2,-158355.0
+3,-3513583.5
+4,-19719420.0
+6,-16848393.6
+7,4309087.100000003
+9,1337692.761904765
+10,11663050.25
+11,17362201.83333334
+---- TYPES
+int,double
+====
+---- QUERY
+# covar_pop() on analytic query
+select s_store_sk, covar_pop(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk)
+ from tpcds.store;
+---- RESULTS
+5,0.0
+8,5207025.0
+12,-3416698.444444445
+1,0.0
+2,-79177.5
+3,-2342389.0
+4,-14789565.0
+6,-13478714.88
+7,3590905.916666669
+9,1146593.79591837
+10,10205168.96875
+11,15433068.2962963
+---- TYPES
+int,double
+====
+---- QUERY
+select id, double_col, covar_samp(double_col, int_col) over (partition by month order by id) from
+ functional.alltypes order by id limit 10;
+---- RESULTS
+0,0.0,NULL
+1,10.1,5.05
+2,20.2,10.1
+3,30.3,16.8333333333
+4,40.4,25.25
+5,50.5,35.35
+6,60.6,47.1333333333
+7,70.7,60.6
+8,80.8,75.75
+9,90.9,92.5833333333
+---- TYPES
+int,double,double
+====
+---- QUERY
+select id, double_col, covar_pop(double_col, int_col) over (partition by month order by id) from
+ functional.alltypes order by id limit 10;
+---- RESULTS
+0,0.0,0.0
+1,10.1,2.525
+2,20.2,6.73333333333
+3,30.3,12.625
+4,40.4,20.2
+5,50.5,29.4583333333
+6,60.6,40.4
+7,70.7,53.025
+8,80.8,67.3333333333
+9,90.9,83.325
+---- TYPES
+int,double,double
+====
+---- QUERY
+# covar_samp() and covar_pop() when one column is filled with null
+select covar_samp(null_int, rand()), covar_samp(rand(), null_int), covar_pop(null_int, rand()),
+ covar_pop(rand(), null_int) from functional.nullrows;
+---- RESULTS
+NULL,NULL,NULL,NULL
+---- TYPES
+double,double,double,double
+====
+---- QUERY
+# covar_samp() and covar_pop() supporting join
+select covar_samp(A.double_col, B.double_col), covar_pop(A.double_col, B.double_col) from functional.alltypes A,
+ functional.alltypes B where A.id=B.id;
+---- RESULTS
+841.697801069,841.5825
+---- TYPES
+double,double
+====
+---- QUERY
+# covar_samp() and covar_pop() supporting timestamp datatype
+select covar_samp(timestamp_col, timestamp_col)/1E+13, covar_pop(timestamp_col, timestamp_col)/1E+13
+ from functional.alltypes;
+---- RESULTS
+33.1559162368,33.1513743305
+---- TYPES
+double,double
+====
+---- QUERY
+# corr(), covar_samp(), covar_pop() when both columns are filled with 0
+select corr(double_col*0, double_col*0), covar_samp(double_col*0, double_col*0),
+ covar_pop(double_col*0, double_col*0) from functional.alltypes;
+---- RESULTS
+NULL,0.0,0.0
+---- TYPES
+double,double,double
+====
+---- QUERY
+# corr(), covar_samp(), covar_pop() when one column is filled with 0
+select corr(double_col, double_col*0), covar_samp(double_col, double_col*0),
+ covar_pop(double_col, double_col*0) from functional.alltypes;
+---- RESULTS
+NULL,0.0,0.0
+---- TYPES
+double,double,double
+====
+---- QUERY
+select s_store_sk, covar_samp(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between unbounded preceding and current row) from tpcds.store;
+---- RESULTS
+5,NULL
+8,10414050.0
+12,-5125047.666666668
+1,NULL
+2,-158355.0
+3,-3513583.5
+4,-19719420.0
+6,-16848393.6
+7,4309087.100000003
+9,1337692.761904765
+10,11663050.25
+11,17362201.83333334
+---- TYPES
+int,double
+====
+---- QUERY
+select s_store_sk, covar_pop(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between unbounded preceding and current row) from tpcds.store;
+---- RESULTS
+5,0.0
+8,5207025.0
+12,-3416698.444444445
+1,0.0
+2,-79177.5
+3,-2342389.0
+4,-14789565.0
+6,-13478714.88
+7,3590905.916666669
+9,1146593.79591837
+10,10205168.96875
+11,15433068.2962963
+---- TYPES
+int,double
+====
+---- QUERY
+# Tests functioning of CovarRemoveState()
+select s_store_sk, covar_samp(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between 5 preceding and 2 following) from tpcds.store;
+---- RESULTS
+5,-5125047.666666668
+8,-5125047.666666668
+12,-5125047.666666668
+1,-3513583.5
+2,-19719420.0
+3,-16848393.6
+4,4309087.100000003
+6,1337692.761904765
+7,11663050.25
+9,14424596.67857143
+10,3770362.571428577
+11,-2206709.166666666
+---- TYPES
+int,double
+====
+---- QUERY
+# Tests functioning of CovarRemoveState()
+select s_store_sk, covar_pop(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between 5 preceding and 2 following) from tpcds.store;
+---- RESULTS
+5,-3416698.444444445
+8,-3416698.444444445
+12,-3416698.444444445
+1,-2342389.0
+2,-14789565.0
+3,-13478714.88
+4,3590905.916666669
+6,1146593.79591837
+7,10205168.96875
+9,12621522.09375001
+10,3231739.346938781
+11,-1838924.305555555
+---- TYPES
+int,double
+====
+---- QUERY
+# Covar_samp() when window size is 2
+select s_store_sk, covar_samp(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between 1 preceding and current row) from tpcds.store;
+---- RESULTS
+5,NULL
+8,10414050.0
+12,-14211464
+1,NULL
+2,-158355.0
+3,0
+4,-16051572
+6,-1733847.5
+7,-2425526
+9,25465544
+10,26428357
+11,0
+---- TYPES
+int,double
+====
+---- QUERY
+# Covar_pop() when window size is 2
+select s_store_sk, covar_pop(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between 1 preceding and current row) from tpcds.store;
+---- RESULTS
+5,0.0
+8,5207025.0
+12,-7105732
+1,0.0
+2,-79177.5
+3,0
+4,-8025786
+6,-866923.75
+7,-1212763
+9,12732772
+10,13214178.5
+11,0
+---- TYPES
+int,double
+====
\ No newline at end of file