You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/04/12 03:41:16 UTC

spark git commit: [SPARK-6379][SQL] Support a functon to call user-defined functions registered in SQLContext

Repository: spark
Updated Branches:
  refs/heads/master 48cc84002 -> 352a5da42


[SPARK-6379][SQL] Support a functon to call user-defined functions registered in SQLContext

This is useful for using pre-defined UDFs in SQLContext;

val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
val sqlctx = df.sqlContext
sqlctx.udf.register("simpleUdf", (v: Int) => v * v)
df.select($"id", sqlctx.callUdf("simpleUdf", $"value"))

Author: Takeshi YAMAMURO <li...@gmail.com>

Closes #5061 from maropu/SupportUDFConversionInSparkContext and squashes the following commits:

f858aff [Takeshi YAMAMURO] Move the function into functions.scala
afd0380 [Takeshi YAMAMURO] Add a return type of callUDF
599b76c [Takeshi YAMAMURO] Remove the implicit conversion and add SqlContext#callUdf
8b56f10 [Takeshi YAMAMURO] Support an implicit conversion from udf"name" to an UDF defined in SQLContext


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/352a5da4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/352a5da4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/352a5da4

Branch: refs/heads/master
Commit: 352a5da421d61379f2a8bcd7548ccc5d2647120a
Parents: 48cc840
Author: Takeshi YAMAMURO <li...@gmail.com>
Authored: Sat Apr 11 18:41:12 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Sat Apr 11 18:41:12 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/functions.scala  | 21 +++++++++++++++++++-
 .../org/apache/spark/sql/DataFrameSuite.scala   |  9 +++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/352a5da4/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 111e751..ff91e1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -22,7 +22,7 @@ import scala.reflect.runtime.universe.{TypeTag, typeTag}
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.analysis.Star
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, Star}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types._
 
@@ -605,4 +605,23 @@ object functions {
   }
 
   // scalastyle:on
+
+  /**
+   * Call an user-defined function.
+   * Example:
+   * {{{
+   *  import org.apache.spark.sql._
+   *
+   *  val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
+   *  val sqlContext = df.sqlContext
+   *  sqlContext.udf.register("simpleUdf", (v: Int) => v * v)
+   *  df.select($"id", callUdf("simpleUdf", $"value"))
+   * }}}
+   *
+   * @group udf_funcs
+   */
+  def callUdf(udfName: String, cols: Column*): Column = {
+     UnresolvedFunction(udfName, cols.map(_.expr))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/352a5da4/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f5df8c6..b26e22f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -440,6 +440,15 @@ class DataFrameSuite extends QueryTest {
     )
   }
 
+  test("call udf in SQLContext") {
+    val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
+    val sqlctx = df.sqlContext
+    sqlctx.udf.register("simpleUdf", (v: Int) => v * v)
+    checkAnswer(
+      df.select($"id", callUdf("simpleUdf", $"value")),
+      Row("id1", 1) :: Row("id2", 16) :: Row("id3", 25) :: Nil)
+  }
+
   test("withColumn") {
     val df = testData.toDF().withColumn("newCol", col("key") + 1)
     checkAnswer(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org