You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/25 10:17:11 UTC
tajo git commit: TAJO-1661: Implement CORR function. (jihoon)
Repository: tajo
Updated Branches:
refs/heads/master 2ec307d62 -> aa49dc4a8
TAJO-1661: Implement CORR function. (jihoon)
Closes #616
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/aa49dc4a
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/aa49dc4a
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/aa49dc4a
Branch: refs/heads/master
Commit: aa49dc4a8af9f836e44896c516d8b0cdb738e5dd
Parents: 2ec307d
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Jun 25 17:16:41 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Jun 25 17:16:41 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../tajo/engine/function/builtin/Corr.java | 224 +++++++++++++++++++
tajo-core/src/main/proto/InternalTypes.proto | 9 +
.../engine/function/TestBuiltinFunctions.java | 33 +++
4 files changed, 268 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 425ac5d..9412179 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.11.0 - unreleased
NEW FEATURES
+ TAJO-1661: Implement CORR function. (jihoon)
+
TAJO-1537: Implement a virtual table for sessions.
(Contributed by Yongjin Choi, Committed by hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java
new file mode 100644
index 0000000..310169f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.InternalTypes.CorrProto;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.plan.function.AggFunction;
+import org.apache.tajo.plan.function.FunctionContext;
+import org.apache.tajo.storage.Tuple;
+
+/**
+ * Compute the Pearson correlation coefficient corr(x, y), using the following
+ * stable one-pass method, based on:
+ * "Formulas for Robust, One-Pass Parallel Computation of Covariances and
+ * Arbitrary-Order Statistical Moments", Philippe Pebay, Sandia Labs
+ * and "The Art of Computer Programming, volume 2: Seminumerical Algorithms",
+ * Donald Knuth.
+ *
+ * Incremental:
+ * n : <count>
+ * mx_n = mx_(n-1) + [x_n - mx_(n-1)]/n : <xavg>
+ * my_n = my_(n-1) + [y_n - my_(n-1)]/n : <yavg>
+ * c_n = c_(n-1) + (x_n - mx_(n-1))*(y_n - my_n) : <covariance * n>
+ * vx_n = vx_(n-1) + (x_n - mx_n)(x_n - mx_(n-1)): <variance * n>
+ * vy_n = vy_(n-1) + (y_n - my_n)(y_n - my_(n-1)): <variance * n>
+ *
+ * Merge:
+ * c_(A,B) = c_A + c_B + (mx_A - mx_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B)
+ * vx_(A,B) = vx_A + vx_B + (mx_A - mx_B)*(mx_A - mx_B)*n_A*n_B/(n_A+n_B)
+ * vy_(A,B) = vy_A + vy_B + (my_A - my_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B)
+ *
+ */
+@Description(
+ functionName = "corr",
+ example = "> SELECT corr(expr, expr);",
+ description = "Returns the Pearson coefficient of correlation between a set of number pairs.\n" +
+ "The function takes as arguments any pair of numeric types and returns a double.\n"
+ + "Any pair with a NULL is ignored. If the function is applied to an empty set or\n"
+ + "a singleton set, NULL will be returned. Otherwise, it computes the following:\n"
+ + " COVAR_POP(x,y)/(STDDEV_POP(x)*STDDEV_POP(y))\n"
+ + "where neither x nor y is null,\n"
+ + "COVAR_POP is the population covariance,\n"
+ + "and STDDEV_POP is the population standard deviation.",
+ returnType = Type.FLOAT8,
+ paramTypes = {
+ @ParamTypes(paramTypes = {Type.INT8, Type.INT8}),
+ @ParamTypes(paramTypes = {Type.INT8, Type.INT4}),
+ @ParamTypes(paramTypes = {Type.INT4, Type.INT8}),
+ @ParamTypes(paramTypes = {Type.INT4, Type.INT4}),
+ @ParamTypes(paramTypes = {Type.INT8, Type.FLOAT8}),
+ @ParamTypes(paramTypes = {Type.INT8, Type.FLOAT4}),
+ @ParamTypes(paramTypes = {Type.INT4, Type.FLOAT8}),
+ @ParamTypes(paramTypes = {Type.INT4, Type.FLOAT4}),
+ @ParamTypes(paramTypes = {Type.FLOAT8, Type.INT8}),
+ @ParamTypes(paramTypes = {Type.FLOAT8, Type.INT4}),
+ @ParamTypes(paramTypes = {Type.FLOAT4, Type.INT8}),
+ @ParamTypes(paramTypes = {Type.FLOAT4, Type.INT4}),
+ @ParamTypes(paramTypes = {Type.FLOAT8, Type.FLOAT8}),
+ @ParamTypes(paramTypes = {Type.FLOAT4, Type.FLOAT8}),
+ @ParamTypes(paramTypes = {Type.FLOAT8, Type.FLOAT4}),
+ @ParamTypes(paramTypes = {Type.FLOAT4, Type.FLOAT4}),
+ }
+)
+public class Corr extends AggFunction<Datum> {
+
+ /**
+ * Evaluate the Pearson correlation coefficient using a stable one-pass
+ * algorithm, based on work by Philippe Pébay and Donald Knuth.
+ *
+ * Incremental:
+ * n : <count>
+ * mx_n = mx_(n-1) + [x_n - mx_(n-1)]/n : <xavg>
+ * my_n = my_(n-1) + [y_n - my_(n-1)]/n : <yavg>
+ * c_n = c_(n-1) + (x_n - mx_(n-1))*(y_n - my_n) : <covariance * n>
+ * vx_n = vx_(n-1) + (x_n - mx_n)(x_n - mx_(n-1)): <variance * n>
+ * vy_n = vy_(n-1) + (y_n - my_n)(y_n - my_(n-1)): <variance * n>
+ *
+ * Merge:
+ * c_X = c_A + c_B + (mx_A - mx_B)*(my_A - my_B)*n_A*n_B/n_X
+ * vx_(A,B) = vx_A + vx_B + (mx_A - mx_B)*(mx_A - mx_B)*n_A*n_B/(n_A+n_B)
+ * vy_(A,B) = vy_A + vy_B + (my_A - my_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B)
+ *
+ */
+ public Corr() {
+ super(new Column[] {
+ new Column("expr", Type.FLOAT8),
+ new Column("expr", Type.FLOAT8)
+ });
+ }
+
+ public Corr(Column[] definedArgs) {
+ super(definedArgs);
+ }
+
+ @Override
+ public FunctionContext newContext() {
+ return new CorrContext();
+ }
+
+ @Override
+ public void eval(FunctionContext ctx, Tuple params) {
+ if (!params.isBlankOrNull(0) && !params.isBlankOrNull(1)) {
+ CorrContext corrContext = (CorrContext) ctx;
+ double vx = params.getFloat8(0);
+ double vy = params.getFloat8(1);
+ double deltaX = vx - corrContext.xavg;
+ double deltaY = vy - corrContext.yavg;
+ corrContext.count++;
+ corrContext.xavg += deltaX / corrContext.count;
+ corrContext.yavg += deltaY / corrContext.count;
+ if (corrContext.count > 1) {
+ corrContext.covar += deltaX * (vy - corrContext.yavg);
+ corrContext.xvar += deltaX * (vx - corrContext.xavg);
+ corrContext.yvar += deltaY * (vy - corrContext.yavg);
+ }
+ }
+ }
+
+ @Override
+ public void merge(FunctionContext ctx, Tuple part) {
+ CorrContext corrContext = (CorrContext) ctx;
+ if (part.isBlankOrNull(0)) {
+ return;
+ }
+ ProtobufDatum datum = (ProtobufDatum) part.getProtobufDatum(0);
+ CorrProto proto = (CorrProto) datum.get();
+ long nA = corrContext.count;
+ long nB = proto.getCount();
+
+ if (nA == 0) {
+ corrContext.count = proto.getCount();
+ corrContext.xavg = proto.getXavg();
+ corrContext.yavg = proto.getYavg();
+ corrContext.xvar = proto.getXvar();
+ corrContext.yvar = proto.getYvar();
+ corrContext.covar = proto.getCovar();
+ } else {
+ // Merge the two partials
+ double xavgA = corrContext.xavg;
+ double yavgA = corrContext.yavg;
+ double xavgB = proto.getXavg();
+ double yavgB = proto.getYavg();
+ double xvarB = proto.getXvar();
+ double yvarB = proto.getYvar();
+ double covarB = proto.getCovar();
+
+ corrContext.count += nB;
+ corrContext.xavg = (xavgA * nA + xavgB * nB) / corrContext.count;
+ corrContext.yavg = (yavgA * nA + yavgB * nB) / corrContext.count;
+ corrContext.xvar += xvarB + (xavgA - xavgB) * (xavgA - xavgB) * nA * nB / corrContext.count;
+ corrContext.yvar += yvarB + (yavgA - yavgB) * (yavgA - yavgB) * nA * nB / corrContext.count;
+ corrContext.covar +=
+ covarB + (xavgA - xavgB) * (yavgA - yavgB) * ((double) (nA * nB) / corrContext.count);
+ }
+ }
+
+ @Override
+ public Datum getPartialResult(FunctionContext ctx) {
+ CorrContext corrContext = (CorrContext) ctx;
+ if (corrContext.count == 0) {
+ return NullDatum.get();
+ }
+ CorrProto.Builder builder = CorrProto.newBuilder();
+ builder.setCount(corrContext.count);
+ builder.setXavg(corrContext.xavg);
+ builder.setYavg(corrContext.yavg);
+ builder.setXvar(corrContext.xvar);
+ builder.setYvar(corrContext.yvar);
+ builder.setCovar(corrContext.covar);
+ return new ProtobufDatum(builder.build());
+ }
+
+ @Override
+ public DataType getPartialResultType() {
+ return CatalogUtil.newDataType(Type.PROTOBUF, CorrProto.class.getName());
+ }
+
+ @Override
+ public Datum terminate(FunctionContext ctx) {
+ CorrContext corrContext = (CorrContext) ctx;
+
+ if (corrContext.count < 2) { // SQL standard - return null for zero or one pair
+ return NullDatum.get();
+ } else {
+ return DatumFactory.createFloat8(corrContext.covar
+ / java.lang.Math.sqrt(corrContext.xvar)
+ / java.lang.Math.sqrt(corrContext.yvar));
+ }
+ }
+
+ protected static class CorrContext implements FunctionContext {
+ long count = 0; // number n of elements
+ double xavg = 0; // average of x elements
+ double yavg = 0; // average of y elements
+ double xvar = 0; // n times the variance of x elements
+ double yvar = 0; // n times the variance of y elements
+ double covar = 0; // n times the covariance
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/tajo-core/src/main/proto/InternalTypes.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/InternalTypes.proto b/tajo-core/src/main/proto/InternalTypes.proto
index 13dd107..d23e244 100644
--- a/tajo-core/src/main/proto/InternalTypes.proto
+++ b/tajo-core/src/main/proto/InternalTypes.proto
@@ -36,3 +36,12 @@ message VarianceProto {
required double avg = 2;
required int64 count = 3;
}
+
+message CorrProto {
+ required int64 count = 1;
+ required double xavg = 2;
+ required double yavg = 3;
+ required double xvar = 4;
+ required double yvar = 5;
+ required double covar = 6;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
index 5dae452..72fdd6f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
@@ -788,4 +788,37 @@ public class TestBuiltinFunctions extends QueryTestCaseBase {
executeString("DROP TABLE rank_table2 PURGE");
}
}
+
+ @Test
+ public void testCorr() throws Exception {
+ KeyValueSet tableOptions = new KeyValueSet();
+ tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+ Schema schema = new Schema();
+ schema.addColumn("id", TajoDataTypes.Type.INT4);
+ schema.addColumn("value_int", TajoDataTypes.Type.INT4);
+ schema.addColumn("value_long", TajoDataTypes.Type.INT8);
+ schema.addColumn("value_float", TajoDataTypes.Type.FLOAT4);
+ schema.addColumn("value_double", TajoDataTypes.Type.FLOAT8);
+ String[] data = new String[]{
+ "1|\\N|-111|1.2|-50.5",
+ "2|1|\\N|\\N|52.5",
+ "3|2|-333|2.8|\\N",
+ "4|3|-555|2.8|43.2",
+ "5|4|-111|1.1|10.2",};
+ TajoTestingCluster.createTable("testbuiltin11", schema, tableOptions, data, 1);
+
+ try {
+ ResultSet res = executeString("select corr(value_int, value_long) as corr1, corr(value_long, value_float) as corr2, corr(value_float, value_double) as corr3, corr(value_double, value_int) as corr4 from testbuiltin11");
+ String ascExpected = "corr1,corr2,corr3,corr4\n" +
+ "-------------------------------\n" +
+ "0.5,-0.9037045658322675,0.7350290063698216,-0.8761489936497805\n";
+
+ assertEquals(ascExpected, resultSetToString(res));
+ res.close();
+ } finally {
+ executeString("DROP TABLE testbuiltin11 PURGE");
+ }
+ }
}