You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/07/11 22:44:32 UTC
spark git commit: [SPARK-19285][SQL] Implement UDF0
Repository: spark
Updated Branches:
refs/heads/master 1cad31f00 -> d3e071658
[SPARK-19285][SQL] Implement UDF0
### What changes were proposed in this pull request?
This PR is to implement UDF0. `UDF0` is needed when users need to implement a JAVA UDF with no argument.
### How was this patch tested?
Added a test case
Author: gatorsmile <ga...@gmail.com>
Closes #18598 from gatorsmile/udf0.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3e07165
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3e07165
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3e07165
Branch: refs/heads/master
Commit: d3e071658f931f601cd4caaf00997ae411593a44
Parents: 1cad31f
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Jul 11 15:44:29 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Jul 11 15:44:29 2017 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/api/java/UDF0.java | 30 +++++++++++++
.../org/apache/spark/sql/UDFRegistration.scala | 44 ++++++++++++++------
.../test/org/apache/spark/sql/JavaUDFSuite.java | 8 ++++
3 files changed, 69 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d3e07165/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF0.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF0.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF0.java
new file mode 100644
index 0000000..4eeb7be
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF0.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * A Spark SQL UDF that has 0 arguments.
+ */
+@InterfaceStability.Stable
+public interface UDF0<R> extends Serializable {
+ R call() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d3e07165/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index c4d0adb..c66d405 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -122,25 +122,27 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
}""")
}
- (1 to 22).foreach { i =>
- val extTypeArgs = (1 to i).map(_ => "_").mkString(", ")
- val anyTypeArgs = (1 to i).map(_ => "Any").mkString(", ")
- val anyCast = s".asInstanceOf[UDF$i[$anyTypeArgs, Any]]"
+ (0 to 22).foreach { i =>
+ val extTypeArgs = (0 to i).map(_ => "_").mkString(", ")
+ val anyTypeArgs = (0 to i).map(_ => "Any").mkString(", ")
+ val anyCast = s".asInstanceOf[UDF$i[$anyTypeArgs]]"
val anyParams = (1 to i).map(_ => "_: Any").mkString(", ")
+ val version = if (i == 0) "2.3.0" else "1.3.0"
+ val funcCall = if (i == 0) "() => func" else "func"
println(s"""
|/**
| * Register a user-defined function with ${i} arguments.
- | * @since 1.3.0
+ | * @since $version
| */
- |def register(name: String, f: UDF$i[$extTypeArgs, _], returnType: DataType): Unit = {
+ |def register(name: String, f: UDF$i[$extTypeArgs], returnType: DataType): Unit = {
| val func = f$anyCast.call($anyParams)
- |def builder(e: Seq[Expression]) = if (e.length == $i) {
- | ScalaUDF(func, returnType, e)
- |} else {
- | throw new AnalysisException("Invalid number of arguments for function " + name +
- | ". Expected: $i; Found: " + e.length)
- |}
- |functionRegistry.createOrReplaceTempFunction(name, builder)
+ | def builder(e: Seq[Expression]) = if (e.length == $i) {
+ | ScalaUDF($funcCall, returnType, e)
+ | } else {
+ | throw new AnalysisException("Invalid number of arguments for function " + name +
+ | ". Expected: $i; Found: " + e.length)
+ | }
+ | functionRegistry.createOrReplaceTempFunction(name, builder)
|}""".stripMargin)
}
*/
@@ -592,6 +594,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
}
udfInterfaces(0).getActualTypeArguments.length match {
+ case 1 => register(name, udf.asInstanceOf[UDF0[_]], returnType)
case 2 => register(name, udf.asInstanceOf[UDF1[_, _]], returnType)
case 3 => register(name, udf.asInstanceOf[UDF2[_, _, _]], returnType)
case 4 => register(name, udf.asInstanceOf[UDF3[_, _, _, _]], returnType)
@@ -650,6 +653,21 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
}
/**
+ * Register a user-defined function with 0 arguments.
+ * @since 2.3.0
+ */
+ def register(name: String, f: UDF0[_], returnType: DataType): Unit = {
+ val func = f.asInstanceOf[UDF0[Any]].call()
+ def builder(e: Seq[Expression]) = if (e.length == 0) {
+ ScalaUDF(() => func, returnType, e)
+ } else {
+ throw new AnalysisException("Invalid number of arguments for function " + name +
+ ". Expected: 0; Found: " + e.length)
+ }
+ functionRegistry.createOrReplaceTempFunction(name, builder)
+ }
+
+ /**
* Register a user-defined function with 1 arguments.
* @since 1.3.0
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/d3e07165/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
index 4fb2988..5bf1888 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
@@ -113,4 +113,12 @@ public class JavaUDFSuite implements Serializable {
spark.udf().register("inc", (Long i) -> i + 1, DataTypes.LongType);
List<Row> results = spark.sql("SELECT inc(1, 5)").collectAsList();
}
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void udf6Test() {
+ spark.udf().register("returnOne", () -> 1, DataTypes.IntegerType);
+ Row result = spark.sql("SELECT returnOne()").head();
+ Assert.assertEquals(1, result.getInt(0));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org