You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/25 10:17:11 UTC

tajo git commit: TAJO-1661: Implement CORR function. (jihoon)

Repository: tajo
Updated Branches:
  refs/heads/master 2ec307d62 -> aa49dc4a8


TAJO-1661: Implement CORR function. (jihoon)

Closes #616


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/aa49dc4a
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/aa49dc4a
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/aa49dc4a

Branch: refs/heads/master
Commit: aa49dc4a8af9f836e44896c516d8b0cdb738e5dd
Parents: 2ec307d
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Jun 25 17:16:41 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Jun 25 17:16:41 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../tajo/engine/function/builtin/Corr.java      | 224 +++++++++++++++++++
 tajo-core/src/main/proto/InternalTypes.proto    |   9 +
 .../engine/function/TestBuiltinFunctions.java   |  33 +++
 4 files changed, 268 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 425ac5d..9412179 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.11.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-1661: Implement CORR function. (jihoon)
+
     TAJO-1537: Implement a virtual table for sessions. 
     (Contributed by Yongjin Choi, Committed by hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java
new file mode 100644
index 0000000..310169f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.InternalTypes.CorrProto;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.plan.function.AggFunction;
+import org.apache.tajo.plan.function.FunctionContext;
+import org.apache.tajo.storage.Tuple;
+
+/**
+ * Compute the Pearson correlation coefficient corr(x, y), using the following
+ * stable one-pass method, based on:
+ * "Formulas for Robust, One-Pass Parallel Computation of Covariances and
+ * Arbitrary-Order Statistical Moments", Philippe Pebay, Sandia Labs
+ * and "The Art of Computer Programming, volume 2: Seminumerical Algorithms",
+ * Donald Knuth.
+ *
+ *  Incremental:
+ *   n : <count>
+ *   mx_n = mx_(n-1) + [x_n - mx_(n-1)]/n : <xavg>
+ *   my_n = my_(n-1) + [y_n - my_(n-1)]/n : <yavg>
+ *   c_n = c_(n-1) + (x_n - mx_(n-1))*(y_n - my_n) : <covariance * n>
+ *   vx_n = vx_(n-1) + (x_n - mx_n)(x_n - mx_(n-1)): <variance * n>
+ *   vy_n = vy_(n-1) + (y_n - my_n)(y_n - my_(n-1)): <variance * n>
+ *
+ *  Merge:
+ *   c_(A,B) = c_A + c_B + (mx_A - mx_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B)
+ *   vx_(A,B) = vx_A + vx_B + (mx_A - mx_B)*(mx_A - mx_B)*n_A*n_B/(n_A+n_B)
+ *   vy_(A,B) = vy_A + vy_B + (my_A - my_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B)
+ *
+ */
+@Description(
+    functionName = "corr",
+    example = "> SELECT corr(expr, expr);",
+    description = "Returns the Pearson coefficient of correlation between a set of number pairs.\n" +
+        "The function takes as arguments any pair of numeric types and returns a double.\n"
+        + "Any pair with a NULL is ignored. If the function is applied to an empty set or\n"
+        + "a singleton set, NULL will be returned. Otherwise, it computes the following:\n"
+        + "   COVAR_POP(x,y)/(STDDEV_POP(x)*STDDEV_POP(y))\n"
+        + "where neither x nor y is null,\n"
+        + "COVAR_POP is the population covariance,\n"
+        + "and STDDEV_POP is the population standard deviation.",
+    returnType = Type.FLOAT8,
+    paramTypes = {
+        @ParamTypes(paramTypes = {Type.INT8, Type.INT8}),
+        @ParamTypes(paramTypes = {Type.INT8, Type.INT4}),
+        @ParamTypes(paramTypes = {Type.INT4, Type.INT8}),
+        @ParamTypes(paramTypes = {Type.INT4, Type.INT4}),
+        @ParamTypes(paramTypes = {Type.INT8, Type.FLOAT8}),
+        @ParamTypes(paramTypes = {Type.INT8, Type.FLOAT4}),
+        @ParamTypes(paramTypes = {Type.INT4, Type.FLOAT8}),
+        @ParamTypes(paramTypes = {Type.INT4, Type.FLOAT4}),
+        @ParamTypes(paramTypes = {Type.FLOAT8, Type.INT8}),
+        @ParamTypes(paramTypes = {Type.FLOAT8, Type.INT4}),
+        @ParamTypes(paramTypes = {Type.FLOAT4, Type.INT8}),
+        @ParamTypes(paramTypes = {Type.FLOAT4, Type.INT4}),
+        @ParamTypes(paramTypes = {Type.FLOAT8, Type.FLOAT8}),
+        @ParamTypes(paramTypes = {Type.FLOAT4, Type.FLOAT8}),
+        @ParamTypes(paramTypes = {Type.FLOAT8, Type.FLOAT4}),
+        @ParamTypes(paramTypes = {Type.FLOAT4, Type.FLOAT4}),
+    }
+)
+public class Corr extends AggFunction<Datum> {
+
+  /**
+   * Evaluate the Pearson correlation coefficient using a stable one-pass
+   * algorithm, based on work by Philippe Pébay and Donald Knuth.
+   *
+   *  Incremental:
+   *   n : <count>
+   *   mx_n = mx_(n-1) + [x_n - mx_(n-1)]/n : <xavg>
+   *   my_n = my_(n-1) + [y_n - my_(n-1)]/n : <yavg>
+   *   c_n = c_(n-1) + (x_n - mx_(n-1))*(y_n - my_n) : <covariance * n>
+   *   vx_n = vx_(n-1) + (x_n - mx_n)(x_n - mx_(n-1)): <variance * n>
+   *   vy_n = vy_(n-1) + (y_n - my_n)(y_n - my_(n-1)): <variance * n>
+   *
+   *  Merge:
+   *   c_X = c_A + c_B + (mx_A - mx_B)*(my_A - my_B)*n_A*n_B/n_X
+   *   vx_(A,B) = vx_A + vx_B + (mx_A - mx_B)*(mx_A - mx_B)*n_A*n_B/(n_A+n_B)
+   *   vy_(A,B) = vy_A + vy_B + (my_A - my_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B)
+   *
+   */
+  public Corr() {
+    super(new Column[] {
+        new Column("expr", Type.FLOAT8),
+        new Column("expr", Type.FLOAT8)
+    });
+  }
+
+  public Corr(Column[] definedArgs) {
+    super(definedArgs);
+  }
+
+  @Override
+  public FunctionContext newContext() {
+    return new CorrContext();
+  }
+
+  @Override
+  public void eval(FunctionContext ctx, Tuple params) {
+    if (!params.isBlankOrNull(0) && !params.isBlankOrNull(1)) {
+      CorrContext corrContext = (CorrContext) ctx;
+      double vx = params.getFloat8(0);
+      double vy = params.getFloat8(1);
+      double deltaX = vx - corrContext.xavg;
+      double deltaY = vy - corrContext.yavg;
+      corrContext.count++;
+      corrContext.xavg += deltaX / corrContext.count;
+      corrContext.yavg += deltaY / corrContext.count;
+      if (corrContext.count > 1) {
+        corrContext.covar += deltaX * (vy - corrContext.yavg);
+        corrContext.xvar += deltaX * (vx - corrContext.xavg);
+        corrContext.yvar += deltaY * (vy - corrContext.yavg);
+      }
+    }
+  }
+
+  @Override
+  public void merge(FunctionContext ctx, Tuple part) {
+    CorrContext corrContext = (CorrContext) ctx;
+    if (part.isBlankOrNull(0)) {
+      return;
+    }
+    ProtobufDatum datum = (ProtobufDatum) part.getProtobufDatum(0);
+    CorrProto proto = (CorrProto) datum.get();
+    long nA = corrContext.count;
+    long nB = proto.getCount();
+
+    if (nA == 0) {
+      corrContext.count = proto.getCount();
+      corrContext.xavg = proto.getXavg();
+      corrContext.yavg = proto.getYavg();
+      corrContext.xvar = proto.getXvar();
+      corrContext.yvar = proto.getYvar();
+      corrContext.covar = proto.getCovar();
+    } else {
+      // Merge the two partials
+      double xavgA = corrContext.xavg;
+      double yavgA = corrContext.yavg;
+      double xavgB = proto.getXavg();
+      double yavgB = proto.getYavg();
+      double xvarB = proto.getXvar();
+      double yvarB = proto.getYvar();
+      double covarB = proto.getCovar();
+
+      corrContext.count += nB;
+      corrContext.xavg = (xavgA * nA + xavgB * nB) / corrContext.count;
+      corrContext.yavg = (yavgA * nA + yavgB * nB) / corrContext.count;
+      corrContext.xvar += xvarB + (xavgA - xavgB) * (xavgA - xavgB) * nA * nB / corrContext.count;
+      corrContext.yvar += yvarB + (yavgA - yavgB) * (yavgA - yavgB) * nA * nB / corrContext.count;
+      corrContext.covar +=
+          covarB + (xavgA - xavgB) * (yavgA - yavgB) * ((double) (nA * nB) / corrContext.count);
+    }
+  }
+
+  @Override
+  public Datum getPartialResult(FunctionContext ctx) {
+    CorrContext corrContext = (CorrContext) ctx;
+    if (corrContext.count == 0) {
+      return NullDatum.get();
+    }
+    CorrProto.Builder builder = CorrProto.newBuilder();
+    builder.setCount(corrContext.count);
+    builder.setXavg(corrContext.xavg);
+    builder.setYavg(corrContext.yavg);
+    builder.setXvar(corrContext.xvar);
+    builder.setYvar(corrContext.yvar);
+    builder.setCovar(corrContext.covar);
+    return new ProtobufDatum(builder.build());
+  }
+
+  @Override
+  public DataType getPartialResultType() {
+    return CatalogUtil.newDataType(Type.PROTOBUF, CorrProto.class.getName());
+  }
+
+  @Override
+  public Datum terminate(FunctionContext ctx) {
+    CorrContext corrContext = (CorrContext) ctx;
+
+    if (corrContext.count < 2) { // SQL standard - return null for zero or one pair
+      return NullDatum.get();
+    } else {
+      return DatumFactory.createFloat8(corrContext.covar
+          / java.lang.Math.sqrt(corrContext.xvar)
+          / java.lang.Math.sqrt(corrContext.yvar));
+    }
+  }
+
+  protected static class CorrContext implements FunctionContext {
+    long count = 0; // number n of elements
+    double xavg = 0; // average of x elements
+    double yavg = 0; // average of y elements
+    double xvar = 0; // n times the variance of x elements
+    double yvar = 0; // n times the variance of y elements
+    double covar = 0; // n times the covariance
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/tajo-core/src/main/proto/InternalTypes.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/InternalTypes.proto b/tajo-core/src/main/proto/InternalTypes.proto
index 13dd107..d23e244 100644
--- a/tajo-core/src/main/proto/InternalTypes.proto
+++ b/tajo-core/src/main/proto/InternalTypes.proto
@@ -36,3 +36,12 @@ message VarianceProto {
   required double avg = 2;
   required int64 count = 3;
 }
+
+message CorrProto {
+  required int64 count = 1;
+  required double xavg = 2;
+  required double yavg = 3;
+  required double xvar = 4;
+  required double yvar = 5;
+  required double covar = 6;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
index 5dae452..72fdd6f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
@@ -788,4 +788,37 @@ public class TestBuiltinFunctions extends QueryTestCaseBase {
       executeString("DROP TABLE rank_table2 PURGE");
     }
   }
+
+  @Test
+  public void testCorr() throws Exception {
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", TajoDataTypes.Type.INT4);
+    schema.addColumn("value_int", TajoDataTypes.Type.INT4);
+    schema.addColumn("value_long", TajoDataTypes.Type.INT8);
+    schema.addColumn("value_float", TajoDataTypes.Type.FLOAT4);
+    schema.addColumn("value_double", TajoDataTypes.Type.FLOAT8);
+    String[] data = new String[]{
+        "1|\\N|-111|1.2|-50.5",
+        "2|1|\\N|\\N|52.5",
+        "3|2|-333|2.8|\\N",
+        "4|3|-555|2.8|43.2",
+        "5|4|-111|1.1|10.2",};
+    TajoTestingCluster.createTable("testbuiltin11", schema, tableOptions, data, 1);
+
+    try {
+      ResultSet res = executeString("select corr(value_int, value_long) as corr1, corr(value_long, value_float) as corr2, corr(value_float, value_double) as corr3, corr(value_double, value_int) as corr4 from testbuiltin11");
+      String ascExpected = "corr1,corr2,corr3,corr4\n" +
+          "-------------------------------\n" +
+          "0.5,-0.9037045658322675,0.7350290063698216,-0.8761489936497805\n";
+
+      assertEquals(ascExpected, resultSetToString(res));
+      res.close();
+    } finally {
+      executeString("DROP TABLE testbuiltin11 PURGE");
+    }
+  }
 }