You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hivemall.apache.org by my...@apache.org on 2016/12/02 07:04:27 UTC

[25/50] [abbrv] incubator-hivemall git commit: integrate chi2 and SNR into hivemall.spark

integrate chi2 and SNR into hivemall.spark



Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/a1f8f958
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/a1f8f958
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/a1f8f958

Branch: refs/heads/JIRA-22/pr-385
Commit: a1f8f958c99f3cde9e48b6d80d364004f6d98cc2
Parents: 22a608e
Author: amaya <gi...@sapphire.in.net>
Authored: Tue Sep 27 15:58:33 2016 +0900
Committer: amaya <gi...@sapphire.in.net>
Committed: Tue Sep 27 15:58:33 2016 +0900

----------------------------------------------------------------------
 .../apache/spark/sql/hive/GroupedDataEx.scala   | 24 ++++++++
 .../org/apache/spark/sql/hive/HivemallOps.scala | 19 ++++++
 .../spark/sql/hive/HivemallOpsSuite.scala       | 63 ++++++++++++++++++-
 .../org/apache/spark/sql/hive/HivemallOps.scala | 20 ++++++
 .../sql/hive/RelationalGroupedDatasetEx.scala   | 26 ++++++++
 .../spark/sql/hive/HivemallOpsSuite.scala       | 65 +++++++++++++++++++-
 6 files changed, 212 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/a1f8f958/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala
index 37d5423..2482c62 100644
--- a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala
+++ b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala
@@ -264,4 +264,28 @@ final class GroupedDataEx protected[sql](
       .toAggregateExpression()
     toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
   }
+
+  /**
+   * @see hivemall.ftvec.selection.SignalNoiseRatioUDAF
+   */
+  def snr(X: String, Y: String): DataFrame = {
+    val udaf = HiveUDAFFunction(
+      new HiveFunctionWrapper("hivemall.ftvec.selection.SignalNoiseRatioUDAF"),
+      Seq(X, Y).map(df.col(_).expr),
+      isUDAFBridgeRequired = false)
+      .toAggregateExpression()
+    toDF(Seq(Alias(udaf, udaf.prettyString)()))
+  }
+
+  /**
+   * @see hivemall.tools.matrix.TransposeAndDotUDAF
+   */
+  def transpose_and_dot(X: String, Y: String): DataFrame = {
+    val udaf = HiveUDAFFunction(
+      new HiveFunctionWrapper("hivemall.tools.matrix.TransposeAndDotUDAF"),
+      Seq(X, Y).map(df.col(_).expr),
+      isUDAFBridgeRequired = false)
+      .toAggregateExpression()
+    toDF(Seq(Alias(udaf, udaf.prettyString)()))
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/a1f8f958/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
index 133f1d5..5970b83 100644
--- a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
+++ b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
@@ -1006,6 +1006,15 @@ object HivemallOps {
   }
 
   /**
+    * @see hivemall.ftvec.selection.ChiSquareUDF
+    * @group ftvec.selection
+    */
+  def chi2(exprs: Column*): Column = {
+    HiveGenericUDF(new HiveFunctionWrapper(
+      "hivemall.ftvec.selection.ChiSquareUDF"), exprs.map(_.expr))
+  }
+
+  /**
    * @see hivemall.ftvec.conv.ToDenseFeaturesUDF
    * @group ftvec.conv
    */
@@ -1078,6 +1087,16 @@ object HivemallOps {
   }
 
   /**
+   * @see hivemall.tools.array.SelectKBestUDF
+   * @group tools.array
+   */
+  @scala.annotation.varargs
+  def select_k_best(exprs: Column*): Column = {
+    HiveGenericUDF(new HiveFunctionWrapper(
+      "hivemall.tools.array.SelectKBestUDF"), exprs.map(_.expr))
+  }
+
+  /**
    * @see hivemall.tools.math.SigmoidUDF
    * @group misc
    */

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/a1f8f958/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
index 4be1e5e..148e5a2 100644
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
+++ b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.hive
 
-import scala.collection.mutable.Seq
-
 import org.apache.spark.sql.{Column, Row}
 import org.apache.spark.sql.hive.HivemallOps._
 import org.apache.spark.sql.hive.HivemallUtils._
@@ -188,6 +186,22 @@ final class HivemallOpsSuite extends HivemallQueryTest {
         Row(Seq("1:1.0"))))
   }
 
+  test("ftvec.selection - chi2") {
+    import hiveContext.implicits._
+
+    val df = Seq(Seq(
+      Seq(250.29999999999998, 170.90000000000003, 73.2, 12.199999999999996),
+      Seq(296.8, 138.50000000000003, 212.99999999999997, 66.3),
+      Seq(329.3999999999999, 148.7, 277.59999999999997, 101.29999999999998)) -> Seq(
+      Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589),
+      Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589),
+      Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589))).toDF("arg0", "arg1")
+
+    assert(df.select(chi2(df("arg0"), df("arg1"))).collect.toSet ===
+      Set(Row(Row(Seq(10.817820878493995, 3.5944990176817315, 116.16984746363957, 67.24482558215503),
+        Seq(0.004476514990225833, 0.16575416718561453, 0d, 2.55351295663786e-15)))))
+  }
+
   test("ftvec.conv - quantify") {
     import hiveContext.implicits._
     val testDf = Seq((1, "aaa", true), (2, "bbb", false), (3, "aaa", false)).toDF
@@ -340,6 +354,18 @@ final class HivemallOpsSuite extends HivemallQueryTest {
     checkAnswer(predicted, Seq(Row(0), Row(1)))
   }
 
+  test("tools.array - select_k_best") {
+    import hiveContext.implicits._
+
+    val data = Seq(Tuple1(Seq(0, 1, 3)), Tuple1(Seq(2, 4, 1)), Tuple1(Seq(5, 4, 9)))
+    val importance = Seq(3, 1, 2)
+    val k = 2
+    val df = data.toDF("features")
+
+    assert(df.select(select_k_best(df("features"), importance, k)).collect.toSeq ===
+      data.map(s => Row(Seq(s._1(0).toDouble, s._1(2).toDouble))))
+  }
+
   test("misc - sigmoid") {
     import hiveContext.implicits._
     /**
@@ -536,4 +562,37 @@ final class HivemallOpsSuite extends HivemallQueryTest {
     val row4 = df4.groupby($"c0").f1score("c1", "c2").collect
     assert(row4(0).getDouble(1) ~== 0.25)
   }
+
+  test("user-defined aggregators for ftvec.selection") {
+    import hiveContext.implicits._
+
+    // +-----------------+-------+
+    // |     features    | class |
+    // +-----------------+-------+
+    // | 5.1,3.5,1.4,0.2 |     0 |
+    // | 4.9,3.0,1.4,0.2 |     0 |
+    // | 7.0,3.2,4.7,1.4 |     1 |
+    // | 6.4,3.2,4.5,1.5 |     1 |
+    // | 6.3,3.3,6.0,2.5 |     2 |
+    // | 5.8,2.7,5.1,1.9 |     2 |
+    // +-----------------+-------+
+    val df0 = Seq(
+      (1, Seq(5.1, 3.5, 1.4, 0.2), Seq(1, 0, 0)), (1, Seq(4.9, 3.0, 1.4, 0.2), Seq(1, 0, 0)),
+      (1, Seq(7.0, 3.2, 4.7, 1.4), Seq(0, 1, 0)), (1, Seq(6.4, 3.2, 4.5, 1.5), Seq(0, 1, 0)),
+      (1, Seq(6.3, 3.3, 6.0, 2.5), Seq(0, 0, 1)), (1, Seq(5.8, 2.7, 5.1, 1.9), Seq(0, 0, 1)))
+      .toDF.as("c0", "arg0", "arg1")
+    val row0 = df0.groupby($"c0").snr("arg0", "arg1").collect
+    assert(row0(0).getAs[Seq[Double]](1) ===
+      Seq(8.431818181818192, 1.3212121212121217, 42.94949494949499, 33.80952380952378))
+  }
+
+  test("user-defined aggregators for tools.matrix") {
+    import hiveContext.implicits._
+
+    // | 1  2  3 |T    | 5  6  7 |
+    // | 3  4  5 |  *  | 7  8  9 |
+    val df0 = Seq((1, Seq(1, 2, 3), Seq(5, 6, 7)), (1, Seq(3, 4, 5), Seq(7, 8, 9))).toDF.as("c0", "arg0", "arg1")
+    val row0 = df0.groupby($"c0").transpose_and_dot("arg0", "arg1").collect
+    assert(row0(0).getAs[Seq[Double]](1) === Seq(Seq(26.0, 30.0, 34.0), Seq(38.0, 44.0, 50.0), Seq(50.0, 58.0, 66.0)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/a1f8f958/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
index 4a583db..e9a1aeb 100644
--- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
+++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
@@ -1228,6 +1228,16 @@ object HivemallOps {
   }
 
   /**
+   * @see hivemall.ftvec.selection.ChiSquareUDF
+   * @group ftvec.selection
+   */
+  def chi2(exprs: Column*): Column = withExpr {
+    HiveGenericUDF("chi2",
+      new HiveFunctionWrapper("hivemall.ftvec.selection.ChiSquareUDF"),
+      exprs.map(_.expr))
+  }
+
+  /**
    * @see hivemall.ftvec.conv.ToDenseFeaturesUDF
    * @group ftvec.conv
    */
@@ -1307,6 +1317,16 @@ object HivemallOps {
   }
 
   /**
+   * @see hivemall.tools.array.SelectKBestUDF
+   * @group tools.array
+   */
+  def select_k_best(exprs: Column*): Column = withExpr {
+    HiveGenericUDF("select_k_best",
+      new HiveFunctionWrapper("hivemall.tools.array.SelectKBestUDF"),
+      exprs.map(_.expr))
+  }
+
+  /**
    * @see hivemall.tools.math.SigmoidUDF
    * @group misc
    */

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/a1f8f958/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/RelationalGroupedDatasetEx.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/RelationalGroupedDatasetEx.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/RelationalGroupedDatasetEx.scala
index e365197..be0673f 100644
--- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/RelationalGroupedDatasetEx.scala
+++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/RelationalGroupedDatasetEx.scala
@@ -274,4 +274,30 @@ final class RelationalGroupedDatasetEx protected[sql](
       .toAggregateExpression()
     toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq)
   }
+
+  /**
+   * @see hivemall.ftvec.selection.SignalNoiseRatioUDAF
+   */
+  def snr(X: String, Y: String): DataFrame = {
+    val udaf = HiveUDAFFunction(
+        "snr",
+        new HiveFunctionWrapper("hivemall.ftvec.selection.SignalNoiseRatioUDAF"),
+        Seq(X, Y).map(df.col(_).expr),
+        isUDAFBridgeRequired = false)
+      .toAggregateExpression()
+    toDF(Seq(Alias(udaf, udaf.prettyName)()))
+  }
+
+  /**
+   * @see hivemall.tools.matrix.TransposeAndDotUDAF
+   */
+  def transpose_and_dot(X: String, Y: String): DataFrame = {
+    val udaf = HiveUDAFFunction(
+        "transpose_and_dot",
+        new HiveFunctionWrapper("hivemall.tools.matrix.TransposeAndDotUDAF"),
+        Seq(X, Y).map(df.col(_).expr),
+        isUDAFBridgeRequired = false)
+      .toAggregateExpression()
+    toDF(Seq(Alias(udaf, udaf.prettyName)()))
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/a1f8f958/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
index 99cb1a7..039a492 100644
--- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
+++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
@@ -17,9 +17,7 @@
 
 package org.apache.spark.sql.hive
 
-import scala.collection.mutable.Seq
-
-import org.apache.spark.sql.{Column, Row}
+import org.apache.spark.sql.{AnalysisException, Column, Row}
 import org.apache.spark.sql.functions
 import org.apache.spark.sql.hive.HivemallOps._
 import org.apache.spark.sql.hive.HivemallUtils._
@@ -189,6 +187,22 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
         Row(Seq("1:1.0"))))
   }
 
+  test("ftvec.selection - chi2") {
+    import hiveContext.implicits._
+
+    val df = Seq(Seq(
+      Seq(250.29999999999998, 170.90000000000003, 73.2, 12.199999999999996),
+      Seq(296.8, 138.50000000000003, 212.99999999999997, 66.3),
+      Seq(329.3999999999999, 148.7, 277.59999999999997, 101.29999999999998)) -> Seq(
+      Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589),
+      Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589),
+      Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589))).toDF("arg0", "arg1")
+
+    assert(df.select(chi2(df("arg0"), df("arg1"))).collect.toSet ===
+      Set(Row(Row(Seq(10.817820878493995, 3.5944990176817315, 116.16984746363957, 67.24482558215503),
+         Seq(0.004476514990225833, 0.16575416718561453, 0d, 2.55351295663786e-15)))))
+  }
+
   test("ftvec.conv - quantify") {
     import hiveContext.implicits._
     val testDf = Seq((1, "aaa", true), (2, "bbb", false), (3, "aaa", false)).toDF
@@ -342,6 +356,18 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
     checkAnswer(predicted, Seq(Row(0), Row(1)))
   }
 
+  test("tools.array - select_k_best") {
+    import hiveContext.implicits._
+
+    val data = Seq(Seq(0, 1, 3), Seq(2, 4, 1), Seq(5, 4, 9))
+    val importance = Seq(3, 1, 2)
+    val k = 2
+    val df = data.toDF("features")
+
+    assert(df.select(select_k_best(df("features"), importance, k)).collect.toSeq ===
+      data.map(s => Row(Seq(s(0).toDouble, s(2).toDouble))))
+  }
+
   test("misc - sigmoid") {
     import hiveContext.implicits._
     /**
@@ -631,6 +657,39 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
     val row4 = df4.groupby($"c0").f1score("c1", "c2").collect
     assert(row4(0).getDouble(1) ~== 0.25)
   }
+
+  test("user-defined aggregators for ftvec.selection") {
+    import hiveContext.implicits._
+
+    // +-----------------+-------+
+    // |     features    | class |
+    // +-----------------+-------+
+    // | 5.1,3.5,1.4,0.2 |     0 |
+    // | 4.9,3.0,1.4,0.2 |     0 |
+    // | 7.0,3.2,4.7,1.4 |     1 |
+    // | 6.4,3.2,4.5,1.5 |     1 |
+    // | 6.3,3.3,6.0,2.5 |     2 |
+    // | 5.8,2.7,5.1,1.9 |     2 |
+    // +-----------------+-------+
+    val df0 = Seq(
+      (1, Seq(5.1, 3.5, 1.4, 0.2), Seq(1, 0, 0)), (1, Seq(4.9, 3.0, 1.4, 0.2), Seq(1, 0, 0)),
+      (1, Seq(7.0, 3.2, 4.7, 1.4), Seq(0, 1, 0)), (1, Seq(6.4, 3.2, 4.5, 1.5), Seq(0, 1, 0)),
+      (1, Seq(6.3, 3.3, 6.0, 2.5), Seq(0, 0, 1)), (1, Seq(5.8, 2.7, 5.1, 1.9), Seq(0, 0, 1)))
+      .toDF.as("c0", "arg0", "arg1")
+    val row0 = df0.groupby($"c0").snr("arg0", "arg1").collect
+    assert(row0(0).getAs[Seq[Double]](1) ===
+      Seq(8.431818181818192, 1.3212121212121217, 42.94949494949499, 33.80952380952378))
+  }
+
+  test("user-defined aggregators for tools.matrix") {
+    import hiveContext.implicits._
+
+    // | 1  2  3 |T    | 5  6  7 |
+    // | 3  4  5 |  *  | 7  8  9 |
+    val df0 = Seq((1, Seq(1, 2, 3), Seq(5, 6, 7)), (1, Seq(3, 4, 5), Seq(7, 8, 9))).toDF.as("c0", "arg0", "arg1")
+    val row0 = df0.groupby($"c0").transpose_and_dot("arg0", "arg1").collect
+    assert(row0(0).getAs[Seq[Double]](1) === Seq(Seq(26.0, 30.0, 34.0), Seq(38.0, 44.0, 50.0), Seq(50.0, 58.0, 66.0)))
+  }
 }
 
 final class HivemallOpsWithVectorSuite extends VectorQueryTest {