You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/06/19 11:52:30 UTC
[impala] 02/02: IMPALA-11205: Implement Statistical functions: CORR(), COVAR_SAMP() and COVAR_POP()
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 256f37f17f62c21927e3aebee57d512758a42f81
Author: pranav.lodha <pr...@cloudera.com>
AuthorDate: Wed Apr 13 17:22:28 2022 +0530
IMPALA-11205: Implement Statistical functions: CORR(), COVAR_SAMP()
and COVAR_POP()
CORR() function takes two numeric type columns as arguments and returns
the Pearson's correlation coefficient between them.
COVAR_SAMP() function takes two numeric type columns and returns sample
covariance between them.
COVAR_POP() function takes two numeric type columns and returns
population covariance between them.
These UDAFs are tested with a few query tests written in aggregation.test.
Change-Id: I32ad627c953ba24d9cde2d5549bdd0d27a9c0d06
Reviewed-on: http://gerrit.cloudera.org:8080/18413
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exprs/aggregate-functions-ir.cc | 356 ++++++++++++
be/src/exprs/aggregate-functions.h | 32 ++
.../java/org/apache/impala/catalog/BuiltinsDb.java | 75 +++
.../queries/QueryTest/aggregation.test | 611 +++++++++++++++++++++
4 files changed, 1074 insertions(+)
diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index fe5b3248b..0bb0b9cc6 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -288,6 +288,362 @@ void AggregateFunctions::CountMerge(FunctionContext*, const BigIntVal& src,
dst->val += src.val;
}
+// Implementation of CORR() function which takes two arguments of numeric type
+// and returns the Pearson's correlation coefficient between them using the Welford's
+// online algorithm. This is calculated using a stable one-pass algorithm, based on
+// work by Philippe Pébay and Donald Knuth.
+// Implementation of CORR() is independent of the implementation of COVAR_SAMP() and
+// COVAR_POP() so changes in one would probably need to be reflected in the other as well.
+// Few useful links :
+// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online
+// https://www.osti.gov/biblio/1028931
+// Correlation coefficient formula used:
+// r = covar / (√(xvar * yvar)
+struct CorrState {
+ int64_t count; // number of elements
+ double xavg; // average of x elements
+ double yavg; // average of y elements
+ double xvar; // n times the variance of x elements
+ double yvar; // n times the variance of y elements
+ double covar; // n times the covariance
+};
+
+void AggregateFunctions::CorrInit(FunctionContext* ctx, StringVal* dst) {
+ dst->is_null = false;
+ dst->len = sizeof(CorrState);
+ dst->ptr = ctx->Allocate(dst->len);
+ memset(dst->ptr, 0, dst->len);
+}
+
+static inline void CorrUpdateState(double x, double y, CorrState* state) {
+ double deltaX = x - state->xavg;
+ double deltaY = y - state->yavg;
+ ++state->count;
+ // mx_n = mx_(n - 1) + [x_n - mx_(n - 1)] / n
+ state->xavg += deltaX / state->count;
+ // my_n = my_(n - 1) + [y_n - my_(n - 1)] / n
+ state->yavg += deltaY / state->count;
+ if (state->count > 1) {
+ // c_n = c_(n - 1) + (x_n - mx_n) * (y_n - my_(n - 1)) OR
+ // c_n = c_(n - 1) + (x_n - mx_(n - 1)) * (y_n - my_n)
+ // The apparent asymmetry in the equations is due to the fact that,
+ // x_n - mx_n = (n - 1) * (x_n - mx_(n - 1)) / n, so both update terms are equal to
+ // (n - 1) * (x_n - mx_(n - 1)) * (y_n - my_(n - 1)) / n
+ state->covar += deltaX * (y - state->yavg);
+ // vx_n = vx_(n - 1) + (x_n - mx_(n - 1)) * (x_n - mx_n)
+ state->xvar += deltaX * (x - state->xavg);
+ // vy_n = vy_(n - 1) + (y_n - my_(n - 1)) * (y_n - my_n)
+ state->yvar += deltaY * (y - state->yavg);
+ }
+}
+
+static inline void CorrRemoveState(double x, double y, CorrState* state) {
+ double deltaX = x - state->xavg;
+ double deltaY = y - state->yavg;
+ if (state->count <= 1) {
+ memset(state, 0, sizeof(CorrState));
+ } else {
+ --state->count;
+ // mx_(n - 1) = mx_n - (x_n - mx_n) / (n - 1)
+ state->xavg -= deltaX / state->count;
+ // my_(n - 1) = my_n - (y_n - my_n) / (n - 1)
+ state->yavg -= deltaY / state->count;
+ // c_(n - 1) = c_n - (x_n - mx_n) * (y_n - my_(n -1))
+ state->covar -= deltaX * (y - state->yavg);
+ // vx_(n - 1) = vx_n - (x_n - mx_n) * (x_n - mx_(n - 1))
+ state->xvar -= deltaX * (x - state->xavg);
+ // vy_(n - 1) = vy_n - (y_n - my_n) * (y_n - my_(n - 1))
+ state->yvar -= deltaY * (y - state->yavg);
+ }
+}
+
+void AggregateFunctions::CorrUpdate(FunctionContext* ctx,
+ const DoubleVal& src1, const DoubleVal& src2, StringVal* dst) {
+ if (src1.is_null || src2.is_null) return;
+ DCHECK(dst->ptr != nullptr);
+ DCHECK_EQ(sizeof(CorrState), dst->len);
+ CorrState* state = reinterpret_cast<CorrState*>(dst->ptr);
+ CorrUpdateState(src1.val, src2.val, state);
+}
+
+void AggregateFunctions::CorrRemove(FunctionContext* ctx,
+ const DoubleVal& src1, const DoubleVal& src2, StringVal* dst) {
+ // Remove doesn't need to explicitly check the number of calls to Update() or Remove()
+ // because Finalize() returns NULL if count is 0. In other words, it's not needed to
+ // check if num_removes() >= num_updates() as it's accounted for in Finalize().
+ if (src1.is_null || src2.is_null) return;
+ DCHECK(dst->ptr != nullptr);
+ DCHECK_EQ(sizeof(CorrState), dst->len);
+ CorrState* state = reinterpret_cast<CorrState*>(dst->ptr);
+ CorrRemoveState(src1.val, src2.val, state);
+}
+
+void AggregateFunctions::TimestampCorrUpdate(FunctionContext* ctx,
+ const TimestampVal& src1, const TimestampVal& src2, StringVal* dst) {
+ if (src1.is_null || src2.is_null) return;
+ CorrState* state = reinterpret_cast<CorrState*>(dst->ptr);
+ const TimestampValue& tm_src1 = TimestampValue::FromTimestampVal(src1);
+ const TimestampValue& tm_src2 = TimestampValue::FromTimestampVal(src2);
+ double val1, val2;
+ if (tm_src1.ToSubsecondUnixTime(UTCPTR, &val1) &&
+ tm_src2.ToSubsecondUnixTime(UTCPTR, &val2)) {
+ CorrUpdateState(val1, val2, state);
+ }
+}
+
+void AggregateFunctions::TimestampCorrRemove(FunctionContext* ctx,
+ const TimestampVal& src1, const TimestampVal& src2, StringVal* dst) {
+ // Remove doesn't need to explicitly check the number of calls to Update() or Remove()
+ // because Finalize() returns NULL if count is 0. In other words, it's not needed to
+ // check if num_removes() >= num_updates() as it's accounted for in Finalize().
+ if (src1.is_null || src2.is_null) return;
+ CorrState* state = reinterpret_cast<CorrState*>(dst->ptr);
+ const TimestampValue& tm_src1 = TimestampValue::FromTimestampVal(src1);
+ const TimestampValue& tm_src2 = TimestampValue::FromTimestampVal(src2);
+ double val1, val2;
+ if (tm_src1.ToSubsecondUnixTime(UTCPTR, &val1) &&
+ tm_src2.ToSubsecondUnixTime(UTCPTR, &val2)) {
+ CorrRemoveState(val1, val2, state);
+ }
+}
+
+void AggregateFunctions::CorrMerge(FunctionContext* ctx,
+ const StringVal& src, StringVal* dst) {
+ CorrState* src_state = reinterpret_cast<CorrState*>(src.ptr);
+ DCHECK(dst->ptr != nullptr);
+ DCHECK_EQ(sizeof(CorrState), dst->len);
+ CorrState* dst_state = reinterpret_cast<CorrState*>(dst->ptr);
+ if (src.ptr != nullptr) {
+ int64_t nA = dst_state->count;
+ int64_t nB = src_state->count;
+ if (nA == 0) {
+ memcpy(dst_state, src_state, sizeof(CorrState));
+ return;
+ }
+ if (nA != 0 && nB != 0) {
+ double xavgA = dst_state->xavg;
+ double yavgA = dst_state->yavg;
+ double xavgB = src_state->xavg;
+ double yavgB = src_state->yavg;
+ double xvarB = src_state->xvar;
+ double yvarB = src_state->yvar;
+ double covarB = src_state->covar;
+
+ dst_state->count += nB;
+ dst_state->xavg = (xavgA * nA + xavgB * nB) / dst_state->count;
+ dst_state->yavg = (yavgA * nA + yavgB * nB) / dst_state->count;
+ // vx_(A,B) = vx_A + vx_B + (mx_A - mx_B) * (mx_A - mx_B) * n_A * n_B / (n_A + n_B)
+ dst_state->xvar +=
+ xvarB + (xavgA - xavgB) * (xavgA - xavgB) * nA * nB / dst_state->count;
+ // vy_(A,B) = vy_A + vy_B + (my_A - my_B) * (my_A - my_B) * n_A * n_B / (n_A + n_B)
+ dst_state->yvar +=
+ yvarB + (yavgA - yavgB) * (yavgA - yavgB) * nA * nB / dst_state->count;
+ // c_(A,B) = c_A + c_B + (mx_A - mx_B) * (my_A - my_B) * n_A * n_B / (n_A + n_B)
+ dst_state->covar += covarB
+ + (xavgA - xavgB) * (yavgA - yavgB) * ((double)(nA * nB)) / (dst_state->count);
+ }
+ }
+}
+
+DoubleVal AggregateFunctions::CorrGetValue(FunctionContext* ctx, const StringVal& src) {
+ CorrState* state = reinterpret_cast<CorrState*>(src.ptr);
+ // Calculating Pearson's correlation coefficient
+ // xvar and yvar become negative in certain cases due to floating point rounding error.
+ // Since these values are very small, they can be ignored and rounded to 0.
+ DCHECK(state->xvar >= -1E-8);
+ DCHECK(state->yvar >= -1E-8);
+ if (state->count == 0 || state->count == 1 || state->xvar <= 0.0 ||
+ state->yvar <= 0.0) {
+ return DoubleVal::null();
+ }
+ double r = sqrt(state->xvar * state->yvar);
+ if (r == 0.0) return DoubleVal::null();
+ double corr = state->covar / r;
+ return DoubleVal(corr);
+}
+
+DoubleVal AggregateFunctions::CorrFinalize(FunctionContext* ctx, const StringVal& src) {
+ CorrState* state = reinterpret_cast<CorrState*>(src.ptr);
+ if (UNLIKELY(src.is_null) || state->count == 0 || state->count == 1) {
+ ctx->Free(src.ptr);
+ return DoubleVal::null();
+ }
+ DoubleVal r = CorrGetValue(ctx, src);
+ ctx->Free(src.ptr);
+ return r;
+}
+
+// Implementation of COVAR_SAMP() and COVAR_POP() which calculates sample and
+// population covariance between two columns of numeric types respectively using
+// the Welford's online algorithm.
+// Sample covariance:
+// r = covar / (n-1)
+// Population covariance:
+// r = covar / (n)
+struct CovarState {
+ int64_t count; // number of elements
+ double xavg; // average of x elements
+ double yavg; // average of y elements
+ double covar; // n times the covariance
+};
+
+void AggregateFunctions::CovarInit(FunctionContext* ctx, StringVal* dst) {
+ dst->is_null = false;
+ dst->len = sizeof(CovarState);
+ dst->ptr = ctx->Allocate(dst->len);
+ memset(dst->ptr, 0, dst->len);
+}
+
+static inline void CovarUpdateState(double x, double y, CovarState* state) {
+ ++state->count;
+ // my_n = my_(n - 1) + [y_n - my_(n - 1)] / n
+ state->yavg += (y - state->yavg) / state->count;
+ // c_n = c_(n - 1) + (x_n - mx_(n - 1)) * (y_n - my_n) OR
+ // c_n = c_(n - 1) + (x_n - mx_n) * (y_n - my_(n - 1))
+ // The apparent asymmetry in the equations is due to the fact that,
+ // x_n - mx_n = (n - 1) * (x_n - mx_(n - 1)) / n, so both terms are equal to
+ // (n - 1) * (x_n - mx_(n - 1)) * (y_n - my_(n - 1)) / n
+ if (state->count > 1) state->covar += (x - state->xavg) * (y - state->yavg);
+ // mx_n = mx_(n - 1) + [x_n - mx_(n - 1)] / n
+ state->xavg += (x - state->xavg) / state->count;
+}
+
+static inline void CovarRemoveState(double x, double y, CovarState* state){
+ if (state->count <= 1) {
+ memset(state, 0, sizeof(CovarState));
+ } else {
+ --state->count;
+ // my_(n - 1) = my_n - (y_n - my_n) / (n - 1)
+ state->yavg -= (y - state->yavg) / state->count;
+ // c_(n - 1) = c_n - (x_n - mx_(n - 1)) * (y_n - my_n)
+ state->covar -= (x - state->xavg) * (y - state->yavg);
+ // mx_(n - 1) = mx_n - (x_n - mx_n) / (n - 1)
+ state->xavg -= (x - state->xavg) / state->count;
+ }
+}
+
+void AggregateFunctions::CovarUpdate(FunctionContext* ctx,
+ const DoubleVal& src1, const DoubleVal& src2, StringVal* dst) {
+ if (src1.is_null || src2.is_null) return;
+ DCHECK(dst->ptr != nullptr);
+ DCHECK_EQ(sizeof(CovarState), dst->len);
+ CovarState* state = reinterpret_cast<CovarState*>(dst->ptr);
+ CovarUpdateState(src1.val, src2.val, state);
+}
+
+void AggregateFunctions::CovarRemove(FunctionContext* ctx,
+ const DoubleVal& src1, const DoubleVal& src2, StringVal* dst) {
+ // Remove doesn't need to explicitly check the number of calls to Update() or Remove()
+ // because Finalize() returns NULL if count is 0. In other words, it's not needed to
+ // check if num_removes() >= num_updates() as it's accounted for in Finalize().
+ if (src1.is_null || src2.is_null) return;
+ DCHECK(dst->ptr != nullptr);
+ DCHECK_EQ(sizeof(CovarState), dst->len);
+ CovarState* state = reinterpret_cast<CovarState*>(dst->ptr);
+ CovarRemoveState(src1.val, src2.val, state);
+}
+
+void AggregateFunctions::TimestampCovarUpdate(FunctionContext* ctx,
+ const TimestampVal& src1, const TimestampVal& src2, StringVal* dst) {
+ if (src1.is_null || src2.is_null) return;
+ CovarState* state = reinterpret_cast<CovarState*>(dst->ptr);
+ const TimestampValue& tm_src1 = TimestampValue::FromTimestampVal(src1);
+ const TimestampValue& tm_src2 = TimestampValue::FromTimestampVal(src2);
+ double val1, val2;
+ if (tm_src1.ToSubsecondUnixTime(UTCPTR, &val1) &&
+ tm_src2.ToSubsecondUnixTime(UTCPTR, &val2)) {
+ CovarUpdateState(val1, val2, state);
+ }
+}
+
+void AggregateFunctions::TimestampCovarRemove(FunctionContext* ctx,
+ const TimestampVal& src1, const TimestampVal& src2, StringVal* dst) {
+ // Remove doesn't need to explicitly check the number of calls to Update() or Remove()
+ // because Finalize() returns NULL if count is 0. In other words, it's not needed to
+ // check if num_removes() >= num_updates() as it's accounted for in Finalize().
+ if (src1.is_null || src2.is_null) return;
+ CovarState* state = reinterpret_cast<CovarState*>(dst->ptr);
+ const TimestampValue& tm_src1 = TimestampValue::FromTimestampVal(src1);
+ const TimestampValue& tm_src2 = TimestampValue::FromTimestampVal(src2);
+ double val1, val2;
+ if (tm_src1.ToSubsecondUnixTime(UTCPTR, &val1) &&
+ tm_src2.ToSubsecondUnixTime(UTCPTR, &val2)) {
+ CovarRemoveState(val1, val2, state);
+ }
+}
+
+void AggregateFunctions::CovarMerge(FunctionContext* ctx,
+ const StringVal& src, StringVal* dst) {
+ CovarState* src_state = reinterpret_cast<CovarState*>(src.ptr);
+ DCHECK(dst->ptr != nullptr);
+ DCHECK_EQ(sizeof(CovarState), dst->len);
+ CovarState* dst_state = reinterpret_cast<CovarState*>(dst->ptr);
+ if (src.ptr != nullptr) {
+ int64_t nA = dst_state->count;
+ int64_t nB = src_state->count;
+ if (nA == 0) {
+ memcpy(dst_state, src_state, sizeof(CovarState));
+ return;
+ }
+ if (nA != 0 && nB != 0) {
+ double xavgA = dst_state->xavg;
+ double yavgA = dst_state->yavg;
+ double xavgB = src_state->xavg;
+ double yavgB = src_state->yavg;
+ double covarB = src_state->covar;
+
+ dst_state->count += nB;
+ dst_state->xavg = (xavgA * nA + xavgB * nB) / dst_state->count;
+ dst_state->yavg = (yavgA * nA + yavgB * nB) / dst_state->count;
+ // c_(A,B) = c_A + c_B + (mx_A - mx_B) * (my_A - my_B) * n_A * n_B / (n_A + n_B)
+ dst_state->covar += covarB
+ + (xavgA - xavgB) * (yavgA - yavgB) * ((double)(nA * nB)) / (dst_state->count);
+ }
+ }
+}
+
+DoubleVal AggregateFunctions::CovarSampleGetValue(FunctionContext* ctx,
+ const StringVal& src) {
+ // Calculating sample covariance
+ CovarState* state = reinterpret_cast<CovarState*>(src.ptr);
+ if (state->count == 0 || state->count == 1) return DoubleVal::null();
+ double covar_samp = state->covar / (state->count - 1);
+ return DoubleVal(covar_samp);
+}
+
+DoubleVal AggregateFunctions::CovarPopulationGetValue(FunctionContext* ctx,
+ const StringVal& src) {
+ // Calculating population covariance
+ CovarState* state = reinterpret_cast<CovarState*>(src.ptr);
+ if (state->count == 0) return DoubleVal::null();
+ double covar_pop = state->covar / (state->count);
+ return DoubleVal(covar_pop);
+}
+
+DoubleVal AggregateFunctions::CovarSampleFinalize(FunctionContext* ctx,
+ const StringVal& src) {
+ CovarState* state = reinterpret_cast<CovarState*>(src.ptr);
+ if (UNLIKELY(src.is_null) || state->count == 0 || state->count == 1) {
+ ctx->Free(src.ptr);
+ return DoubleVal::null();
+ }
+ DoubleVal r = CovarSampleGetValue(ctx, src);
+ ctx->Free(src.ptr);
+ return r;
+}
+
+DoubleVal AggregateFunctions::CovarPopulationFinalize(FunctionContext* ctx,
+ const StringVal& src) {
+ CovarState* state = reinterpret_cast<CovarState*>(src.ptr);
+ if (UNLIKELY(src.is_null) || state->count == 0) {
+ ctx->Free(src.ptr);
+ return DoubleVal::null();
+ }
+ DoubleVal r = CovarPopulationGetValue(ctx, src);
+ ctx->Free(src.ptr);
+ return r;
+}
+
struct AvgState {
double sum;
int64_t count;
diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h
index cec5b8ed6..0e8c53a50 100644
--- a/be/src/exprs/aggregate-functions.h
+++ b/be/src/exprs/aggregate-functions.h
@@ -64,6 +64,38 @@ class AggregateFunctions {
static StringVal StringValSerializeOrFinalize(
FunctionContext* ctx, const StringVal& src);
+ /// Implementation of Corr()
+ static void CorrInit(FunctionContext* ctx, StringVal* dst);
+ static void CorrUpdate(FunctionContext* ctx, const DoubleVal& src1,
+ const DoubleVal& src2, StringVal* dst);
+ static void CorrRemove(FunctionContext* ctx, const DoubleVal& src1,
+ const DoubleVal& src2, StringVal* dst);
+ static void TimestampCorrUpdate(FunctionContext* ctx,
+ const TimestampVal& src1, const TimestampVal& src2, StringVal* dst);
+ static void TimestampCorrRemove(FunctionContext* ctx,
+ const TimestampVal& src1, const TimestampVal& src2, StringVal* dst);
+ static void CorrMerge(FunctionContext* ctx, const StringVal& src, StringVal* dst);
+ static DoubleVal CorrGetValue(FunctionContext* ctx, const StringVal& src);
+ static DoubleVal CorrFinalize(FunctionContext* ctx, const StringVal& src);
+
+ /// Implementation of Covar_samp() and Covar_pop()
+ static void CovarInit(FunctionContext* ctx, StringVal* dst);
+ static void CovarUpdate(FunctionContext* ctx, const DoubleVal& src1,
+ const DoubleVal& src2, StringVal* dst);
+ static void CovarRemove(FunctionContext* ctx, const DoubleVal& src1,
+ const DoubleVal& src2, StringVal* dst);
+ static void TimestampCovarUpdate(FunctionContext* ctx,
+ const TimestampVal& src1, const TimestampVal& src2, StringVal* dst);
+ static void TimestampCovarRemove(FunctionContext* ctx,
+ const TimestampVal& src1, const TimestampVal& src2, StringVal* dst);
+ static void CovarMerge(FunctionContext* ctx, const StringVal& src, StringVal* dst);
+ static DoubleVal CovarSampleGetValue(FunctionContext* ctx, const StringVal& src);
+ static DoubleVal CovarPopulationGetValue(FunctionContext* ctx, const StringVal& src);
+ static DoubleVal CovarSampleFinalize(FunctionContext* ctx,
+ const StringVal& src);
+ static DoubleVal CovarPopulationFinalize(FunctionContext* ctx,
+ const StringVal& src);
+
/// Implementation of Count and Count(*)
static void CountUpdate(FunctionContext*, const AnyVal& src, BigIntVal* dst);
static void CountStarUpdate(FunctionContext*, BigIntVal* dst);
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index 356cf5461..889210851 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -1319,6 +1319,81 @@ public class BuiltinsDb extends Db {
prefix + "9SumRemoveIN10impala_udf9BigIntValES3_EEvPNS2_15FunctionContextERKT_PT0_",
null, false, true, true));
+ // Corr()
+ db.addBuiltin(AggregateFunction.createBuiltin(db, "corr",
+ Lists.<Type>newArrayList(Type.DOUBLE, Type.DOUBLE), Type.DOUBLE, Type.STRING,
+ prefix + "8CorrInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+ prefix + "10CorrUpdateEPN10impala_udf15FunctionContextERKNS1_9DoubleValES6_PNS1_9StringValE",
+ prefix + "9CorrMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+ stringValSerializeOrFinalize,
+ prefix + "12CorrGetValueEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ prefix + "10CorrRemoveEPN10impala_udf15FunctionContextERKNS1_9DoubleValES6_PNS1_9StringValE",
+ prefix + "12CorrFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ false, true, false));
+
+ db.addBuiltin(AggregateFunction.createBuiltin(db, "corr",
+ Lists.<Type>newArrayList(Type.TIMESTAMP, Type.TIMESTAMP), Type.DOUBLE, Type.STRING,
+ prefix + "8CorrInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+ prefix +
+ "19TimestampCorrUpdateEPN10impala_udf15FunctionContextERKNS1_12TimestampValES6_PNS1_9StringValE",
+ prefix + "9CorrMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+ stringValSerializeOrFinalize,
+ prefix + "12CorrGetValueEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ prefix +
+ "19TimestampCorrRemoveEPN10impala_udf15FunctionContextERKNS1_12TimestampValES6_PNS1_9StringValE",
+ prefix + "12CorrFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ false, true, false));
+
+ // Covar_samp()
+ db.addBuiltin(AggregateFunction.createBuiltin(db, "covar_samp",
+ Lists.<Type>newArrayList(Type.DOUBLE, Type.DOUBLE), Type.DOUBLE, Type.STRING,
+ prefix + "9CovarInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+ prefix + "11CovarUpdateEPN10impala_udf15FunctionContextERKNS1_9DoubleValES6_PNS1_9StringValE",
+ prefix + "10CovarMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+ stringValSerializeOrFinalize,
+ prefix + "19CovarSampleGetValueEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ prefix + "11CovarRemoveEPN10impala_udf15FunctionContextERKNS1_9DoubleValES6_PNS1_9StringValE",
+ prefix + "19CovarSampleFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ false, true, false));
+
+ db.addBuiltin(AggregateFunction.createBuiltin(db, "covar_samp",
+ Lists.<Type>newArrayList(Type.TIMESTAMP, Type.TIMESTAMP), Type.DOUBLE, Type.STRING,
+ prefix + "9CovarInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+ prefix +
+ "20TimestampCovarUpdateEPN10impala_udf15FunctionContextERKNS1_12TimestampValES6_PNS1_9StringValE",
+ prefix + "10CovarMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+ stringValSerializeOrFinalize,
+ prefix + "19CovarSampleGetValueEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ prefix +
+ "20TimestampCovarRemoveEPN10impala_udf15FunctionContextERKNS1_12TimestampValES6_PNS1_9StringValE",
+ prefix + "19CovarSampleFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ false, true, false));
+
+ // Covar_pop()
+ db.addBuiltin(AggregateFunction.createBuiltin(db, "covar_pop",
+ Lists.<Type>newArrayList(Type.DOUBLE, Type.DOUBLE), Type.DOUBLE, Type.STRING,
+ prefix + "9CovarInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+ prefix + "11CovarUpdateEPN10impala_udf15FunctionContextERKNS1_9DoubleValES6_PNS1_9StringValE",
+ prefix + "10CovarMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+ stringValSerializeOrFinalize,
+ prefix + "23CovarPopulationGetValueEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ prefix + "11CovarRemoveEPN10impala_udf15FunctionContextERKNS1_9DoubleValES6_PNS1_9StringValE",
+ prefix + "23CovarPopulationFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ false, true, false));
+
+ db.addBuiltin(AggregateFunction.createBuiltin(db, "covar_pop",
+ Lists.<Type>newArrayList(Type.TIMESTAMP, Type.TIMESTAMP), Type.DOUBLE, Type.STRING,
+ prefix + "9CovarInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+ prefix +
+ "20TimestampCovarUpdateEPN10impala_udf15FunctionContextERKNS1_12TimestampValES6_PNS1_9StringValE",
+ prefix + "10CovarMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+ stringValSerializeOrFinalize,
+ prefix + "23CovarPopulationGetValueEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ prefix +
+ "20TimestampCovarRemoveEPN10impala_udf15FunctionContextERKNS1_12TimestampValES6_PNS1_9StringValE",
+ prefix + "23CovarPopulationFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ false, true, false));
+
// Avg
Type avgIntermediateType =
ScalarType.createFixedUdaIntermediateType(AVG_INTERMEDIATE_SIZE);
diff --git a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
index f0f311226..98267d4c7 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
@@ -1427,3 +1427,614 @@ select count(*) from tpch_parquet.orders o group by o.o_clerk limit 10
---- TYPES
bigint
====
+---- QUERY
+# CORR() function examples
+select corr(ps_availqty, ps_supplycost) from tpch.partsupp;
+---- RESULTS
+0.000321849166315
+---- TYPES
+double
+====
+---- QUERY
+# Behavior of CORR() on null table
+select corr(d, e) from functional.nulltable;
+---- RESULTS
+NULL
+---- TYPES
+double
+====
+---- QUERY
+# Behavior of CORR() on empty table
+select corr(f2, f2) from functional.emptytable;
+---- RESULTS
+NULL
+---- TYPES
+double
+====
+---- QUERY
+# CORR() on different datatypes
+select corr(tinyint_col, tinyint_col), corr(smallint_col, smallint_col),
+ corr(int_col, int_col), corr(bigint_col, bigint_col), corr(float_col, float_col),
+ corr(double_col, double_col), corr(timestamp_col, timestamp_col) from functional.alltypes;
+---- RESULTS
+1.0,1.0,1.0,1.0,1.0,1.0,1.0
+---- TYPES
+double,double,double,double,double,double,double
+====
+---- QUERY
+# CORR() on timestamp columns
+select corr(utctime, localtime) from functional.alltimezones;
+---- RESULTS
+0.999995807708
+---- TYPES
+double
+====
+---- QUERY
+# Since group by id will result in a single row, this test shows that corr() returns null in case of a single row.
+select id, corr(int_col, int_col) from functional.alltypestiny group by id;
+---- RESULTS
+2,NULL
+4,NULL
+0,NULL
+6,NULL
+1,NULL
+7,NULL
+3,NULL
+5,NULL
+---- TYPES
+int,double
+====
+---- QUERY
+# CORR() on decimal datatype
+select corr(d3, d4) from functional.decimal_tbl;
+---- RESULTS
+NULL
+---- TYPES
+double
+====
+---- QUERY
+select year, corr(double_col, double_col) from functional.alltypes group by year;
+---- RESULTS
+2009,1.0
+2010,1.0
+---- TYPES
+int,double
+====
+---- QUERY
+select corr(double_col, -double_col) from functional.alltypes;
+---- RESULTS
+-1.0
+---- TYPES
+double
+====
+---- QUERY
+select corr(double_col, double_col) from functional.alltypes;
+---- RESULTS
+1.0
+---- TYPES
+double
+====
+---- QUERY
+select corr(ss_sold_time_sk, ss_quantity) from tpcds.store_sales;
+---- RESULTS
+0.000136790587885
+---- TYPES
+double
+====
+---- QUERY
+select s_store_sk, corr(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk)
+ from tpcds.store;
+---- RESULTS
+5,NULL
+8,1.0
+12,-0.328248435346
+1,NULL
+2,-1.0
+3,-0.511481796057
+4,-0.881269090479
+6,-0.853760432996
+7,0.0825005896565
+9,0.0282032912339
+10,0.223462144551
+11,0.325760965741
+---- TYPES
+int,double
+====
+---- QUERY
+select id, double_col, corr(double_col, int_col) over (partition by month order by id) from functional.alltypes
+ order by id limit 10;
+---- RESULTS
+0,0.0,NULL
+1,10.1,1.0
+2,20.2,1.0
+3,30.3,1.0
+4,40.4,1.0
+5,50.5,1.0
+6,60.6,1.0
+7,70.7,1.0
+8,80.8,1.0
+9,90.9,1.0
+---- TYPES
+int,double,double
+====
+---- QUERY
+# CORR() when one column is filled with null
+select corr(null_int, rand()), corr(rand(), null_int) from functional.nullrows;
+---- RESULTS
+NULL,NULL
+---- TYPES
+double,double
+====
+---- QUERY
+# CORR() supporting join
+select corr(A.double_col, B.double_col) from functional.alltypes A, functional.alltypes B where A.id=B.id;
+---- RESULTS
+1.0
+---- TYPES
+double
+====
+---- QUERY
+# Tests functioning of CorrRemoveState()
+select s_store_sk, corr(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between 5 preceding and 2 following) from tpcds.store;
+---- RESULTS
+5,-0.328248435346
+8,-0.328248435346
+12,-0.328248435346
+1,-0.511481796057
+2,-0.881269090479
+3,-0.853760432996
+4,0.0825005896565
+6,0.0282032912339
+7,0.223462144551
+9,0.294637370885
+10,0.115190500757
+11,-0.0690212865931
+---- TYPES
+int,double
+====
+---- QUERY
+# Mathematical operations on double can lead to variance becoming negative by a very small amount (around +1e-13),
+# to avoid that a check is added (state->xvar <= 0.0 || state->yvar <= 0.0), without which the below test will
+# result in nan for certain cases.
+select s_store_sk, corr(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between 1 preceding and 1 following) from tpcds.store;
+---- RESULTS
+5,1.0
+8,-0.328248435346
+12,-1.0
+1,-1.0
+2,-0.511481796057
+3,-0.829250636825
+4,-0.884750027905
+6,-0.735742100541
+7,-0.165224541153
+9,0.97066733931
+10,1.0
+11,NULL
+---- TYPES
+int,double
+====
+---- QUERY
+# Corr() when window size is 2
+select s_store_sk, corr(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between 1 preceding and current row) from tpcds.store;
+---- RESULTS
+5,NULL
+8,1.0
+12,-1.0
+1,NULL
+2,-1.0
+3,NULL
+4,-1.0
+6,-1.0
+7,-1.0
+9,1.0
+10,1.0
+11,NULL
+---- TYPES
+int,double
+====
+---- QUERY
+select s_store_sk, corr(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between current row and 1 following) from tpcds.store;
+---- RESULTS
+5,1.0
+8,-1.0
+12,NULL
+1,-1.0
+2,NULL
+3,-1.0
+4,-1.0
+6,-1.0
+7,1.0
+9,1.0
+10,NULL
+11,NULL
+---- TYPES
+int,double
+====
+---- QUERY
+select s_store_sk, corr(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between current row and unbounded following) from tpcds.store;
+---- RESULTS
+12,NULL
+8,-1.0
+5,-0.328248435346
+11,NULL
+10,NULL
+9,1.0
+7,0.965705311825
+6,0.17735847272
+4,-0.0690212865931
+3,0.115190500757
+2,0.294637370885
+1,0.325760965741
+---- TYPES
+int,double
+====
+---- QUERY
+select s_store_sk, corr(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between unbounded preceding and current row) from tpcds.store;
+---- RESULTS
+5,NULL
+8,1.0
+12,-0.328248435346
+1,NULL
+2,-1.0
+3,-0.511481796057
+4,-0.881269090479
+6,-0.853760432996
+7,0.0825005896565
+9,0.0282032912339
+10,0.223462144551
+11,0.325760965741
+---- TYPES
+int,double
+====
+---- QUERY
+# Sample and population covariance
+select covar_samp(ps_availqty, ps_supplycost), covar_pop(ps_availqty, ps_supplycost) from tpch.partsupp;
+---- RESULTS
+267.607127282,267.606792773
+---- TYPES
+double,double
+====
+---- QUERY
+# covar_samp() and covar_pop() on null table
+select covar_samp(d, e), covar_pop(d, e) from functional.nulltable;
+---- RESULTS
+NULL,NULL
+---- TYPES
+double,double
+====
+---- QUERY
+# covar_samp() on different datatypes
+select covar_samp(tinyint_col, tinyint_col), covar_samp(smallint_col, smallint_col),
+ covar_samp(int_col, int_col), covar_samp(bigint_col, bigint_col), covar_samp(float_col, float_col),
+ covar_samp(double_col, double_col) from functional.alltypes;
+---- RESULTS
+8.251130291820797,8.251130291820797,8.251130291820797,825.1130291820797,9.983867246543589,841.6978010686292
+---- TYPES
+double,double,double,double,double,double
+====
+---- QUERY
+# covar_pop() on different datatypes
+select covar_pop(tinyint_col, tinyint_col), covar_pop(smallint_col, smallint_col),
+ covar_pop(int_col, int_col), covar_pop(bigint_col, bigint_col), covar_pop(float_col, float_col),
+ covar_pop(double_col, double_col) from functional.alltypes;
+---- RESULTS
+8.25,8.25,8.25,825.0,9.9824995935,841.5825
+---- TYPES
+double,double,double,double,double,double
+====
+---- QUERY
+# Behavior of covar_samp() and covar_pop on empty table
+select covar_samp(f2, f2), covar_pop(f2, f2) from functional.emptytable;
+---- RESULTS
+NULL,NULL
+---- TYPES
+double,double
+====
+---- QUERY
+# covar_samp() and covar_pop() on timestamp datatype. The expression is divided by a large number like 1E+10
+# because unlike overfow in int, overflow in double looses precision
+select covar_samp(utctime, localtime)/1E+10, covar_pop(utctime, localtime)/1E+10 from functional.alltimezones;
+---- RESULTS
+5502.915940016994,5496.494801230509
+---- TYPES
+double,double
+====
+---- QUERY
+# Since group by id will result in a single row, this test shows
+# behavior of covar_samp() and covar_pop() on a single row
+select id, covar_samp(int_col, int_col), covar_pop(int_col, int_col) from functional.alltypestiny group by id;
+---- RESULTS
+2,NULL,0.0
+4,NULL,0.0
+0,NULL,0.0
+6,NULL,0.0
+1,NULL,0.0
+7,NULL,0.0
+3,NULL,0.0
+5,NULL,0.0
+---- TYPES
+int,double,double
+====
+---- QUERY
+# covar_samp() and covar_pop() on decimal type
+select covar_samp(d3, d4), covar_pop(d3, d4) from functional.decimal_tbl;
+---- RESULTS
+0.0,0.0
+---- TYPES
+double,double
+====
+---- QUERY
+# covar_samp() and covar_pop() with group by clause
+select year, covar_samp(double_col, double_col),
+ covar_pop(double_col, double_col) from functional.alltypes group by year;
+---- RESULTS
+2009,841.813133735,841.5825
+2010,841.813133735,841.5825
+---- TYPES
+int,double,double
+====
+---- QUERY
+# Example of negative covar_samp() and covar_pop()
+select covar_samp(double_col, -double_col), covar_pop(double_col, -double_col) from functional.alltypes;
+---- RESULTS
+-841.697801069,-841.5825
+---- TYPES
+double,double
+====
+---- QUERY
+select covar_samp(double_col, double_col), covar_pop(double_col, double_col) from functional.alltypes;
+---- RESULTS
+841.697801069,841.5825
+---- TYPES
+double,double
+====
+---- QUERY
+select covar_samp(ss_sold_time_sk, ss_quantity), covar_pop(ss_sold_time_sk, ss_quantity) from tpcds.store_sales;
+---- RESULTS
+50.2400502845,50.2400315775
+---- TYPES
+double,double
+====
+---- QUERY
+# covar_samp() on analytic query
+select s_store_sk, covar_samp(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk)
+ from tpcds.store;
+---- RESULTS
+5,NULL
+8,10414050.0
+12,-5125047.666666668
+1,NULL
+2,-158355.0
+3,-3513583.5
+4,-19719420.0
+6,-16848393.6
+7,4309087.100000003
+9,1337692.761904765
+10,11663050.25
+11,17362201.83333334
+---- TYPES
+int,double
+====
+---- QUERY
+# covar_pop() on analytic query
+select s_store_sk, covar_pop(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk)
+ from tpcds.store;
+---- RESULTS
+5,0.0
+8,5207025.0
+12,-3416698.444444445
+1,0.0
+2,-79177.5
+3,-2342389.0
+4,-14789565.0
+6,-13478714.88
+7,3590905.916666669
+9,1146593.79591837
+10,10205168.96875
+11,15433068.2962963
+---- TYPES
+int,double
+====
+---- QUERY
+select id, double_col, covar_samp(double_col, int_col) over (partition by month order by id) from
+ functional.alltypes order by id limit 10;
+---- RESULTS
+0,0.0,NULL
+1,10.1,5.05
+2,20.2,10.1
+3,30.3,16.8333333333
+4,40.4,25.25
+5,50.5,35.35
+6,60.6,47.1333333333
+7,70.7,60.6
+8,80.8,75.75
+9,90.9,92.5833333333
+---- TYPES
+int,double,double
+====
+---- QUERY
+select id, double_col, covar_pop(double_col, int_col) over (partition by month order by id) from
+ functional.alltypes order by id limit 10;
+---- RESULTS
+0,0.0,0.0
+1,10.1,2.525
+2,20.2,6.73333333333
+3,30.3,12.625
+4,40.4,20.2
+5,50.5,29.4583333333
+6,60.6,40.4
+7,70.7,53.025
+8,80.8,67.3333333333
+9,90.9,83.325
+---- TYPES
+int,double,double
+====
+---- QUERY
+# covar_samp() and covar_pop() when one column is filled with null
+select covar_samp(null_int, rand()), covar_samp(rand(), null_int), covar_pop(null_int, rand()),
+ covar_pop(rand(), null_int) from functional.nullrows;
+---- RESULTS
+NULL,NULL,NULL,NULL
+---- TYPES
+double,double,double,double
+====
+---- QUERY
+# covar_samp() and covar_pop() supporting join
+select covar_samp(A.double_col, B.double_col), covar_pop(A.double_col, B.double_col) from functional.alltypes A,
+ functional.alltypes B where A.id=B.id;
+---- RESULTS
+841.697801069,841.5825
+---- TYPES
+double,double
+====
+---- QUERY
+# covar_samp() and covar_pop() supporting timestamp datatype
+select covar_samp(timestamp_col, timestamp_col)/1E+13, covar_pop(timestamp_col, timestamp_col)/1E+13
+ from functional.alltypes;
+---- RESULTS
+33.1559162368,33.1513743305
+---- TYPES
+double,double
+====
+---- QUERY
+# corr(), covar_samp(), covar_pop() when both columns are filled with 0
+select corr(double_col*0, double_col*0), covar_samp(double_col*0, double_col*0),
+ covar_pop(double_col*0, double_col*0) from functional.alltypes;
+---- RESULTS
+NULL,0.0,0.0
+---- TYPES
+double,double,double
+====
+---- QUERY
+# corr(), covar_samp(), covar_pop() when one column is filled with 0
+select corr(double_col, double_col*0), covar_samp(double_col, double_col*0),
+ covar_pop(double_col, double_col*0) from functional.alltypes;
+---- RESULTS
+NULL,0.0,0.0
+---- TYPES
+double,double,double
+====
+---- QUERY
+select s_store_sk, covar_samp(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between unbounded preceding and current row) from tpcds.store;
+---- RESULTS
+5,NULL
+8,10414050.0
+12,-5125047.666666668
+1,NULL
+2,-158355.0
+3,-3513583.5
+4,-19719420.0
+6,-16848393.6
+7,4309087.100000003
+9,1337692.761904765
+10,11663050.25
+11,17362201.83333334
+---- TYPES
+int,double
+====
+---- QUERY
+select s_store_sk, covar_pop(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between unbounded preceding and current row) from tpcds.store;
+---- RESULTS
+5,0.0
+8,5207025.0
+12,-3416698.444444445
+1,0.0
+2,-79177.5
+3,-2342389.0
+4,-14789565.0
+6,-13478714.88
+7,3590905.916666669
+9,1146593.79591837
+10,10205168.96875
+11,15433068.2962963
+---- TYPES
+int,double
+====
+---- QUERY
+# Tests functioning of CovarRemoveState()
+select s_store_sk, covar_samp(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between 5 preceding and 2 following) from tpcds.store;
+---- RESULTS
+5,-5125047.666666668
+8,-5125047.666666668
+12,-5125047.666666668
+1,-3513583.5
+2,-19719420.0
+3,-16848393.6
+4,4309087.100000003
+6,1337692.761904765
+7,11663050.25
+9,14424596.67857143
+10,3770362.571428577
+11,-2206709.166666666
+---- TYPES
+int,double
+====
+---- QUERY
+# Tests functioning of CovarRemoveState()
+select s_store_sk, covar_pop(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between 5 preceding and 2 following) from tpcds.store;
+---- RESULTS
+5,-3416698.444444445
+8,-3416698.444444445
+12,-3416698.444444445
+1,-2342389.0
+2,-14789565.0
+3,-13478714.88
+4,3590905.916666669
+6,1146593.79591837
+7,10205168.96875
+9,12621522.09375001
+10,3231739.346938781
+11,-1838924.305555555
+---- TYPES
+int,double
+====
+---- QUERY
+# Covar_samp() when window size is 2
+select s_store_sk, covar_samp(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between 1 preceding and current row) from tpcds.store;
+---- RESULTS
+5,NULL
+8,10414050.0
+12,-14211464
+1,NULL
+2,-158355.0
+3,0
+4,-16051572
+6,-1733847.5
+7,-2425526
+9,25465544
+10,26428357
+11,0
+---- TYPES
+int,double
+====
+---- QUERY
+# Covar_pop() when window size is 2
+select s_store_sk, covar_pop(s_number_employees, s_floor_space) over (partition by s_city order by s_store_sk
+ rows between 1 preceding and current row) from tpcds.store;
+---- RESULTS
+5,0.0
+8,5207025.0
+12,-7105732
+1,0.0
+2,-79177.5
+3,0
+4,-8025786
+6,-866923.75
+7,-1212763
+9,12732772
+10,13214178.5
+11,0
+---- TYPES
+int,double
+====
\ No newline at end of file