You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/07/26 00:19:48 UTC
spark git commit: [SPARK-20586][SQL] Add deterministic to ScalaUDF
Repository: spark
Updated Branches:
refs/heads/master 9b4da7b79 -> ebc24a9b7
[SPARK-20586][SQL] Add deterministic to ScalaUDF
### What changes were proposed in this pull request?
Like [Hive UDFType](https://hive.apache.org/javadocs/r2.0.1/api/org/apache/hadoop/hive/ql/udf/UDFType.html), we should allow users to add the extra flags for ScalaUDF and JavaUDF too. _stateful_/_impliesOrder_ are not applicable to our Scala UDF. Thus, we only add the following two flags.
- deterministic: Certain optimizations should not be applied if UDF is not deterministic. Deterministic UDF returns same result each time it is invoked with a particular input. This determinism just needs to hold within the context of a query.
When the deterministic flag is not correctly set, the results could be wrong.
For ScalaUDF in Dataset APIs, users can call the following extra APIs for `UserDefinedFunction` to make the corresponding changes.
- `nonDeterministic`: Updates UserDefinedFunction to non-deterministic.
Also fixed the Java UDF name loss issue.
Will submit a separate PR for `distinctLike` for UDAF
### How was this patch tested?
Added test cases for both ScalaUDF
Author: gatorsmile <ga...@gmail.com>
Author: Wenchen Fan <cl...@gmail.com>
Closes #17848 from gatorsmile/udfRegister.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebc24a9b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebc24a9b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebc24a9b
Branch: refs/heads/master
Commit: ebc24a9b7fde273ee4912f9bc1c5059703f7b31e
Parents: 9b4da7b
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Jul 25 17:19:44 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Jul 25 17:19:44 2017 -0700
----------------------------------------------------------------------
python/pyspark/sql/context.py | 4 +-
.../spark/sql/catalyst/analysis/Analyzer.scala | 2 +-
.../sql/catalyst/expressions/ScalaUDF.scala | 10 +-
.../org/apache/spark/sql/UDFRegistration.scala | 243 +++++++++++--------
.../sql/expressions/UserDefinedFunction.scala | 48 +++-
.../scala/org/apache/spark/sql/functions.scala | 113 ++++++---
.../scala/org/apache/spark/sql/UDFSuite.scala | 22 +-
7 files changed, 278 insertions(+), 164 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ebc24a9b/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index c44ab24..b1e723c 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -220,11 +220,11 @@ class SQLContext(object):
>>> sqlContext.registerJavaFunction("javaStringLength",
... "test.org.apache.spark.sql.JavaStringLength", IntegerType())
>>> sqlContext.sql("SELECT javaStringLength('test')").collect()
- [Row(UDF(test)=4)]
+ [Row(UDF:javaStringLength(test)=4)]
>>> sqlContext.registerJavaFunction("javaStringLength2",
... "test.org.apache.spark.sql.JavaStringLength")
>>> sqlContext.sql("SELECT javaStringLength2('test')").collect()
- [Row(UDF(test)=4)]
+ [Row(UDF:javaStringLength2(test)=4)]
"""
jdt = None
http://git-wip-us.apache.org/repos/asf/spark/blob/ebc24a9b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 501e7e3..913d846 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1950,7 +1950,7 @@ class Analyzer(
case p => p transformExpressionsUp {
- case udf @ ScalaUDF(func, _, inputs, _, _, _) =>
+ case udf @ ScalaUDF(func, _, inputs, _, _, _, _) =>
val parameterTypes = ScalaReflection.getParameterTypes(func)
assert(parameterTypes.length == inputs.length)
http://git-wip-us.apache.org/repos/asf/spark/blob/ebc24a9b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
index a54f6d0..9df0e2e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
@@ -24,7 +24,6 @@ import org.apache.spark.sql.types.DataType
/**
* User-defined function.
- * Note that the user-defined functions must be deterministic.
* @param function The user defined scala function to run.
* Note that if you use primitive parameters, you are not able to check if it is
* null or not, and the UDF will return null for you if the primitive input is
@@ -35,8 +34,10 @@ import org.apache.spark.sql.types.DataType
* not want to perform coercion, simply use "Nil". Note that it would've been
* better to use Option of Seq[DataType] so we can use "None" as the case for no
* type coercion. However, that would require more refactoring of the codebase.
- * @param udfName The user-specified name of this UDF.
+ * @param udfName The user-specified name of this UDF.
* @param nullable True if the UDF can return null value.
+ * @param udfDeterministic True if the UDF is deterministic. Deterministic UDF returns same result
+ * each time it is invoked with a particular input.
*/
case class ScalaUDF(
function: AnyRef,
@@ -44,9 +45,12 @@ case class ScalaUDF(
children: Seq[Expression],
inputTypes: Seq[DataType] = Nil,
udfName: Option[String] = None,
- nullable: Boolean = true)
+ nullable: Boolean = true,
+ udfDeterministic: Boolean = true)
extends Expression with ImplicitCastInputTypes with NonSQLExpression {
+ override def deterministic: Boolean = udfDeterministic && children.forall(_.deterministic)
+
override def toString: String =
s"${udfName.map(name => s"UDF:$name").getOrElse("UDF")}(${children.mkString(", ")})"
http://git-wip-us.apache.org/repos/asf/spark/blob/ebc24a9b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index c66d405..737afb4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -64,7 +64,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
}
/**
- * Register a user-defined aggregate function (UDAF).
+ * Registers a user-defined aggregate function (UDAF).
*
* @param name the name of the UDAF.
* @param udaf the UDAF needs to be registered.
@@ -79,8 +79,19 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
}
/**
- * Register a user-defined function (UDF), for a UDF that's already defined using the DataFrame
- * API (i.e. of type UserDefinedFunction).
+ * Registers a user-defined function (UDF), for a UDF that's already defined using the Dataset
+ * API (i.e. of type UserDefinedFunction). To change a UDF to nondeterministic, call the API
+ * `UserDefinedFunction.asNondeterministic()`. To change a UDF to nonNullable, call the API
+ * `UserDefinedFunction.asNonNullabe()`.
+ *
+ * Example:
+ * {{{
+ * val foo = udf(() => Math.random())
+ * spark.udf.register("random", foo.asNondeterministic())
+ *
+ * val bar = udf(() => "bar")
+ * spark.udf.register("stringLit", bar.asNonNullabe())
+ * }}}
*
* @param name the name of the UDF.
* @param udf the UDF needs to be registered.
@@ -104,7 +115,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val inputTypes = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor[A$i].dataType :: $s"})
println(s"""
/**
- * Register a Scala closure of ${x} arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of ${x} arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -112,13 +123,14 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try($inputTypes).toOption
def builder(e: Seq[Expression]) = if (e.length == $x) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: $x; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}""")
}
@@ -137,7 +149,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
|def register(name: String, f: UDF$i[$extTypeArgs], returnType: DataType): Unit = {
| val func = f$anyCast.call($anyParams)
| def builder(e: Seq[Expression]) = if (e.length == $i) {
- | ScalaUDF($funcCall, returnType, e)
+ | ScalaUDF($funcCall, returnType, e, udfName = Some(name))
| } else {
| throw new AnalysisException("Invalid number of arguments for function " + name +
| ". Expected: $i; Found: " + e.length)
@@ -148,7 +160,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
*/
/**
- * Register a Scala closure of 0 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 0 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -156,17 +168,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 0) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 0; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 1 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 1 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -174,17 +187,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 1) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 1; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 2 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 2 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -192,17 +206,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 2) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 2; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 3 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 3 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -210,17 +225,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 3) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 3; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 4 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 4 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -228,17 +244,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 4) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 4; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 5 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 5 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -246,17 +263,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 5) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 5; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 6 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 6 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -264,17 +282,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 6) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 6; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 7 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 7 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -282,17 +301,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 7) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 7; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 8 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 8 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -300,17 +320,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 8) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 8; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 9 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 9 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -318,17 +339,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: ScalaReflection.schemaFor[A9].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 9) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 9; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 10 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 10 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -336,17 +358,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: ScalaReflection.schemaFor[A9].dataType :: ScalaReflection.schemaFor[A10].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 10) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 10; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 11 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 11 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -354,17 +377,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: ScalaReflection.schemaFor[A9].dataType :: ScalaReflection.schemaFor[A10].dataType :: ScalaReflection.schemaFor[A11].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 11) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 11; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 12 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 12 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -372,17 +396,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: ScalaReflection.schemaFor[A9].dataType :: ScalaReflection.schemaFor[A10].dataType :: ScalaReflection.schemaFor[A11].dataType :: ScalaReflection.schemaFor[A12].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 12) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 12; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 13 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 13 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -390,17 +415,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: ScalaReflection.schemaFor[A9].dataType :: ScalaReflection.schemaFor[A10].dataType :: ScalaReflection.schemaFor[A11].dataType :: ScalaReflection.schemaFor[A12].dataType :: ScalaReflection.schemaFor[A13].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 13) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 13; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 14 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 14 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -408,17 +434,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: ScalaReflection.schemaFor[A9].dataType :: ScalaReflection.schemaFor[A10].dataType :: ScalaReflection.schemaFor[A11].dataType :: ScalaReflection.schemaFor[A12].dataType :: ScalaReflection.schemaFor[A13].dataType :: ScalaReflection.schemaFor[A14].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 14) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 14; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 15 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 15 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -426,17 +453,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: ScalaReflection.schemaFor[A9].dataType :: ScalaReflection.schemaFor[A10].dataType :: ScalaReflection.schemaFor[A11].dataType :: ScalaReflection.schemaFor[A12].dataType :: ScalaReflection.schemaFor[A13].dataType :: ScalaReflection.schemaFor[A14].dataType :: ScalaReflection.schemaFor[A15].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 15) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 15; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 16 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 16 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -444,17 +472,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: ScalaReflection.schemaFor[A9].dataType :: ScalaReflection.schemaFor[A10].dataType :: ScalaReflection.schemaFor[A11].dataType :: ScalaReflection.schemaFor[A12].dataType :: ScalaReflection.schemaFor[A13].dataType :: ScalaReflection.schemaFor[A14].dataType :: ScalaReflection.schemaFor[A15].dataType :: ScalaReflection.schemaFor[A16].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 16) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 16; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 17 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 17 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -462,17 +491,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: ScalaReflection.schemaFor[A9].dataType :: ScalaReflection.schemaFor[A10].dataType :: ScalaReflection.schemaFor[A11].dataType :: ScalaReflection.schemaFor[A12].dataType :: ScalaReflection.schemaFor[A13].dataType :: ScalaReflection.schemaFor[A14].dataType :: ScalaReflection.schemaFor[A15].dataType :: ScalaReflection.schemaFor[A16].dataType :: ScalaReflection.schemaFor[A17].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 17) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 17; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 18 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 18 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -480,17 +510,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: ScalaReflection.schemaFor[A9].dataType :: ScalaReflection.schemaFor[A10].dataType :: ScalaReflection.schemaFor[A11].dataType :: ScalaReflection.schemaFor[A12].dataType :: ScalaReflection.schemaFor[A13].dataType :: ScalaReflection.schemaFor[A14].dataType :: ScalaReflection.schemaFor[A15].dataType :: ScalaReflection.schemaFor[A16].dataType :: ScalaReflection.schemaFor[A17].dataType :: ScalaReflection.schemaFor[A18].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 18) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 18; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 19 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 19 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -498,17 +529,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: ScalaReflection.schemaFor[A9].dataType :: ScalaReflection.schemaFor[A10].dataType :: ScalaReflection.schemaFor[A11].dataType :: ScalaReflection.schemaFor[A12].dataType :: ScalaReflection.schemaFor[A13].dataType :: ScalaReflection.schemaFor[A14].dataType :: ScalaReflection.schemaFor[A15].dataType :: ScalaReflection.schemaFor[A16].dataType :: ScalaReflection.schemaFor[A17].dataType :: ScalaReflection.schemaFor[A18].dataType :: ScalaReflection.schemaFor[A19].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 19) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 19; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 20 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 20 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -516,17 +548,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: ScalaReflection.schemaFor[A9].dataType :: ScalaReflection.schemaFor[A10].dataType :: ScalaReflection.schemaFor[A11].dataType :: ScalaReflection.schemaFor[A12].dataType :: ScalaReflection.schemaFor[A13].dataType :: ScalaReflection.schemaFor[A14].dataType :: ScalaReflection.schemaFor[A15].dataType :: ScalaReflection.schemaFor[A16].dataType :: ScalaReflection.schemaFor[A17].dataType :: ScalaReflection.schemaFor[A18].dataType :: ScalaReflection.schemaFor[A19].dataType :: ScalaReflection.schemaFor[A20].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 20) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 20; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 21 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 21 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -534,17 +567,18 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: ScalaReflection.schemaFor[A9].dataType :: ScalaReflection.schemaFor[A10].dataType :: ScalaReflection.schemaFor[A11].dataType :: ScalaReflection.schemaFor[A12].dataType :: ScalaReflection.schemaFor[A13].dataType :: ScalaReflection.schemaFor[A14].dataType :: ScalaReflection.schemaFor[A15].dataType :: ScalaReflection.schemaFor[A16].dataType :: ScalaReflection.schemaFor[A17].dataType :: ScalaReflection.schemaFor[A18].dataType :: ScalaReflection.schemaFor[A19].dataType :: ScalaReflection.schemaFor[A20].dataType :: ScalaReflection.schemaFor[A21].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 21) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 21; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Register a Scala closure of 22 arguments as user-defined function (UDF).
+ * Registers a deterministic Scala closure of 22 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
@@ -552,13 +586,14 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: ScalaReflection.schemaFor[A2].dataType :: ScalaReflection.schemaFor[A3].dataType :: ScalaReflection.schemaFor[A4].dataType :: ScalaReflection.schemaFor[A5].dataType :: ScalaReflection.schemaFor[A6].dataType :: ScalaReflection.schemaFor[A7].dataType :: ScalaReflection.schemaFor[A8].dataType :: ScalaReflection.schemaFor[A9].dataType :: ScalaReflection.schemaFor[A10].dataType :: ScalaReflection.schemaFor[A11].dataType :: ScalaReflection.schemaFor[A12].dataType :: ScalaReflection.schemaFor[A13].dataType :: ScalaReflection.schemaFor[A14].dataType :: ScalaReflection.schemaFor[A15].dataType :: ScalaReflection.schemaFor[A16].dataType :: ScalaReflection.schemaFor[A17].dataType :: ScalaReflection.schemaFor[A18].dataType :: ScalaReflection.schemaFor[A19].dataType :: ScalaReflection.schemaFor[A20].dataType :: ScalaReflection.schemaFor[A21].dataType :: ScalaReflection.schemaFor[A22].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 22) {
- ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable)
+ ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 22; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
- UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
+ val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+ if (nullable) udf else udf.asNonNullabe()
}
//////////////////////////////////////////////////////////////////////////////////////////////
@@ -581,9 +616,9 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
.map(_.asInstanceOf[ParameterizedType])
.filter(e => e.getRawType.isInstanceOf[Class[_]] && e.getRawType.asInstanceOf[Class[_]].getCanonicalName.startsWith("org.apache.spark.sql.api.java.UDF"))
if (udfInterfaces.length == 0) {
- throw new AnalysisException(s"UDF class ${className} doesn't implement any UDF interface")
+ throw new AnalysisException(s"UDF class $className doesn't implement any UDF interface")
} else if (udfInterfaces.length > 1) {
- throw new AnalysisException(s"It is invalid to implement multiple UDF interfaces, UDF class ${className}")
+ throw new AnalysisException(s"It is invalid to implement multiple UDF interfaces, UDF class $className")
} else {
try {
val udf = clazz.newInstance()
@@ -618,15 +653,15 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
case 22 => register(name, udf.asInstanceOf[UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
case 23 => register(name, udf.asInstanceOf[UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
case n =>
- throw new AnalysisException(s"UDF class with ${n} type arguments is not supported.")
+ throw new AnalysisException(s"UDF class with $n type arguments is not supported.")
}
} catch {
case e @ (_: InstantiationException | _: IllegalArgumentException) =>
- throw new AnalysisException(s"Can not instantiate class ${className}, please make sure it has public non argument constructor")
+ throw new AnalysisException(s"Can not instantiate class $className, please make sure it has public non argument constructor")
}
}
} catch {
- case e: ClassNotFoundException => throw new AnalysisException(s"Can not load class ${className}, please make sure it is on the classpath")
+ case e: ClassNotFoundException => throw new AnalysisException(s"Can not load class $className, please make sure it is on the classpath")
}
}
@@ -659,7 +694,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF0[_], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF0[Any]].call()
def builder(e: Seq[Expression]) = if (e.length == 0) {
- ScalaUDF(() => func, returnType, e)
+ ScalaUDF(() => func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 0; Found: " + e.length)
@@ -674,7 +709,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF1[_, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any)
def builder(e: Seq[Expression]) = if (e.length == 1) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 1; Found: " + e.length)
@@ -689,7 +724,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF2[_, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 2) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 2; Found: " + e.length)
@@ -704,7 +739,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF3[_, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 3) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 3; Found: " + e.length)
@@ -719,7 +754,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF4[_, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 4) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 4; Found: " + e.length)
@@ -734,7 +769,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF5[_, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 5) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 5; Found: " + e.length)
@@ -749,7 +784,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF6[_, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 6) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 6; Found: " + e.length)
@@ -764,7 +799,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF7[_, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 7) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 7; Found: " + e.length)
@@ -779,7 +814,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF8[_, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 8) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 8; Found: " + e.length)
@@ -794,7 +829,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF9[_, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 9) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 9; Found: " + e.length)
@@ -809,7 +844,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF10[_, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 10) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 10; Found: " + e.length)
@@ -824,7 +859,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF11[_, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 11) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 11; Found: " + e.length)
@@ -839,7 +874,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 12) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 12; Found: " + e.length)
@@ -854,7 +889,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 13) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 13; Found: " + e.length)
@@ -869,7 +904,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 14) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 14; Found: " + e.length)
@@ -884,7 +919,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 15) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 15; Found: " + e.length)
@@ -899,7 +934,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 16) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 16; Found: " + e.length)
@@ -914,7 +949,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 17) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 17; Found: " + e.length)
@@ -929,7 +964,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 18) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 18; Found: " + e.length)
@@ -944,7 +979,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 19) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 19; Found: " + e.length)
@@ -959,7 +994,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 20) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 20; Found: " + e.length)
@@ -974,7 +1009,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 21) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 21; Found: " + e.length)
@@ -989,7 +1024,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
def register(name: String, f: UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 22) {
- ScalaUDF(func, returnType, e)
+ ScalaUDF(func, returnType, e, udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 22; Found: " + e.length)
http://git-wip-us.apache.org/repos/asf/spark/blob/ebc24a9b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
index 0c5f1b4..97b921a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.expressions
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.expressions.ScalaUDF
import org.apache.spark.sql.Column
-import org.apache.spark.sql.functions
import org.apache.spark.sql.types.DataType
/**
@@ -35,10 +34,6 @@ import org.apache.spark.sql.types.DataType
* df.select( predict(df("score")) )
* }}}
*
- * @note The user-defined functions must be deterministic. Due to optimization,
- * duplicate invocations may be eliminated or the function may even be invoked more times than
- * it is present in the query.
- *
* @since 1.3.0
*/
@InterfaceStability.Stable
@@ -49,6 +44,7 @@ case class UserDefinedFunction protected[sql] (
private var _nameOption: Option[String] = None
private var _nullable: Boolean = true
+ private var _deterministic: Boolean = true
/**
* Returns true when the UDF can return a nullable value.
@@ -58,6 +54,14 @@ case class UserDefinedFunction protected[sql] (
def nullable: Boolean = _nullable
/**
+ * Returns true iff the UDF is deterministic, i.e. the UDF produces the same output given the same
+ * input.
+ *
+ * @since 2.3.0
+ */
+ def deterministic: Boolean = _deterministic
+
+ /**
* Returns an expression that invokes the UDF, using the given arguments.
*
* @since 1.3.0
@@ -69,13 +73,15 @@ case class UserDefinedFunction protected[sql] (
exprs.map(_.expr),
inputTypes.getOrElse(Nil),
udfName = _nameOption,
- nullable = _nullable))
+ nullable = _nullable,
+ udfDeterministic = _deterministic))
}
private def copyAll(): UserDefinedFunction = {
val udf = copy()
udf._nameOption = _nameOption
udf._nullable = _nullable
+ udf._deterministic = _deterministic
udf
}
@@ -84,22 +90,38 @@ case class UserDefinedFunction protected[sql] (
*
* @since 2.3.0
*/
- def withName(name: String): this.type = {
- this._nameOption = Option(name)
- this
+ def withName(name: String): UserDefinedFunction = {
+ val udf = copyAll()
+ udf._nameOption = Option(name)
+ udf
+ }
+
+ /**
+ * Updates UserDefinedFunction to non-nullable.
+ *
+ * @since 2.3.0
+ */
+ def asNonNullabe(): UserDefinedFunction = {
+ if (!nullable) {
+ this
+ } else {
+ val udf = copyAll()
+ udf._nullable = false
+ udf
+ }
}
/**
- * Updates UserDefinedFunction with a given nullability.
+ * Updates UserDefinedFunction to nondeterministic.
*
* @since 2.3.0
*/
- def withNullability(nullable: Boolean): UserDefinedFunction = {
- if (nullable == _nullable) {
+ def asNondeterministic(): UserDefinedFunction = {
+ if (!_deterministic) {
this
} else {
val udf = copyAll()
- udf._nullable = nullable
+ udf._deterministic = false
udf
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ebc24a9b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index ebdeb42..ccff00e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3185,8 +3185,10 @@ object functions {
val inputTypes = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor(typeTag[A$i]).dataType :: $s"})
println(s"""
/**
- * Defines a user-defined function of ${x} arguments as user-defined function (UDF).
- * The data types are automatically inferred based on the function's signature.
+ * Defines a deterministic user-defined function of ${x} arguments as user-defined
+ * function (UDF). The data types are automatically inferred based on the function's
+ * signature. To change a UDF to nondeterministic, call the API
+ * `UserDefinedFunction.asNondeterministic()`.
*
* @group udf_funcs
* @since 1.3.0
@@ -3194,15 +3196,18 @@ object functions {
def udf[$typeTags](f: Function$x[$types]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try($inputTypes).toOption
- UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
+ val udf = UserDefinedFunction(f, dataType, inputTypes)
+ if (nullable) udf else udf.asNonNullabe()
}""")
}
*/
/**
- * Defines a user-defined function of 0 arguments as user-defined function (UDF).
- * The data types are automatically inferred based on the function's signature.
+ * Defines a deterministic user-defined function of 0 arguments as user-defined
+ * function (UDF). The data types are automatically inferred based on the function's
+ * signature. To change a UDF to nondeterministic, call the API
+ * `UserDefinedFunction.asNondeterministic()`.
*
* @group udf_funcs
* @since 1.3.0
@@ -3210,12 +3215,15 @@ object functions {
def udf[RT: TypeTag](f: Function0[RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(Nil).toOption
- UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
+ val udf = UserDefinedFunction(f, dataType, inputTypes)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Defines a user-defined function of 1 arguments as user-defined function (UDF).
- * The data types are automatically inferred based on the function's signature.
+ * Defines a deterministic user-defined function of 1 arguments as user-defined
+ * function (UDF). The data types are automatically inferred based on the function's
+ * signature. To change a UDF to nondeterministic, call the API
+ * `UserDefinedFunction.asNondeterministic()`.
*
* @group udf_funcs
* @since 1.3.0
@@ -3223,12 +3231,15 @@ object functions {
def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: Nil).toOption
- UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
+ val udf = UserDefinedFunction(f, dataType, inputTypes)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Defines a user-defined function of 2 arguments as user-defined function (UDF).
- * The data types are automatically inferred based on the function's signature.
+ * Defines a deterministic user-defined function of 2 arguments as user-defined
+ * function (UDF). The data types are automatically inferred based on the function's
+ * signature. To change a UDF to nondeterministic, call the API
+ * `UserDefinedFunction.asNondeterministic()`.
*
* @group udf_funcs
* @since 1.3.0
@@ -3236,12 +3247,15 @@ object functions {
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: ScalaReflection.schemaFor(typeTag[A2]).dataType :: Nil).toOption
- UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
+ val udf = UserDefinedFunction(f, dataType, inputTypes)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Defines a user-defined function of 3 arguments as user-defined function (UDF).
- * The data types are automatically inferred based on the function's signature.
+ * Defines a deterministic user-defined function of 3 arguments as user-defined
+ * function (UDF). The data types are automatically inferred based on the function's
+ * signature. To change a UDF to nondeterministic, call the API
+ * `UserDefinedFunction.asNondeterministic()`.
*
* @group udf_funcs
* @since 1.3.0
@@ -3249,12 +3263,15 @@ object functions {
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](f: Function3[A1, A2, A3, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: ScalaReflection.schemaFor(typeTag[A2]).dataType :: ScalaReflection.schemaFor(typeTag[A3]).dataType :: Nil).toOption
- UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
+ val udf = UserDefinedFunction(f, dataType, inputTypes)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Defines a user-defined function of 4 arguments as user-defined function (UDF).
- * The data types are automatically inferred based on the function's signature.
+ * Defines a deterministic user-defined function of 4 arguments as user-defined
+ * function (UDF). The data types are automatically inferred based on the function's
+ * signature. To change a UDF to nondeterministic, call the API
+ * `UserDefinedFunction.asNondeterministic()`.
*
* @group udf_funcs
* @since 1.3.0
@@ -3262,12 +3279,15 @@ object functions {
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](f: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: ScalaReflection.schemaFor(typeTag[A2]).dataType :: ScalaReflection.schemaFor(typeTag[A3]).dataType :: ScalaReflection.schemaFor(typeTag[A4]).dataType :: Nil).toOption
- UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
+ val udf = UserDefinedFunction(f, dataType, inputTypes)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Defines a user-defined function of 5 arguments as user-defined function (UDF).
- * The data types are automatically inferred based on the function's signature.
+ * Defines a deterministic user-defined function of 5 arguments as user-defined
+ * function (UDF). The data types are automatically inferred based on the function's
+ * signature. To change a UDF to nondeterministic, call the API
+ * `UserDefinedFunction.asNondeterministic()`.
*
* @group udf_funcs
* @since 1.3.0
@@ -3275,12 +3295,15 @@ object functions {
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](f: Function5[A1, A2, A3, A4, A5, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: ScalaReflection.schemaFor(typeTag[A2]).dataType :: ScalaReflection.schemaFor(typeTag[A3]).dataType :: ScalaReflection.schemaFor(typeTag[A4]).dataType :: ScalaReflection.schemaFor(typeTag[A5]).dataType :: Nil).toOption
- UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
+ val udf = UserDefinedFunction(f, dataType, inputTypes)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Defines a user-defined function of 6 arguments as user-defined function (UDF).
- * The data types are automatically inferred based on the function's signature.
+ * Defines a deterministic user-defined function of 6 arguments as user-defined
+ * function (UDF). The data types are automatically inferred based on the function's
+ * signature. To change a UDF to nondeterministic, call the API
+ * `UserDefinedFunction.asNondeterministic()`.
*
* @group udf_funcs
* @since 1.3.0
@@ -3288,12 +3311,15 @@ object functions {
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](f: Function6[A1, A2, A3, A4, A5, A6, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: ScalaReflection.schemaFor(typeTag[A2]).dataType :: ScalaReflection.schemaFor(typeTag[A3]).dataType :: ScalaReflection.schemaFor(typeTag[A4]).dataType :: ScalaReflection.schemaFor(typeTag[A5]).dataType :: ScalaReflection.schemaFor(typeTag[A6]).dataType :: Nil).toOption
- UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
+ val udf = UserDefinedFunction(f, dataType, inputTypes)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Defines a user-defined function of 7 arguments as user-defined function (UDF).
- * The data types are automatically inferred based on the function's signature.
+ * Defines a deterministic user-defined function of 7 arguments as user-defined
+ * function (UDF). The data types are automatically inferred based on the function's
+ * signature. To change a UDF to nondeterministic, call the API
+ * `UserDefinedFunction.asNondeterministic()`.
*
* @group udf_funcs
* @since 1.3.0
@@ -3301,12 +3327,15 @@ object functions {
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](f: Function7[A1, A2, A3, A4, A5, A6, A7, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: ScalaReflection.schemaFor(typeTag[A2]).dataType :: ScalaReflection.schemaFor(typeTag[A3]).dataType :: ScalaReflection.schemaFor(typeTag[A4]).dataType :: ScalaReflection.schemaFor(typeTag[A5]).dataType :: ScalaReflection.schemaFor(typeTag[A6]).dataType :: ScalaReflection.schemaFor(typeTag[A7]).dataType :: Nil).toOption
- UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
+ val udf = UserDefinedFunction(f, dataType, inputTypes)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Defines a user-defined function of 8 arguments as user-defined function (UDF).
- * The data types are automatically inferred based on the function's signature.
+ * Defines a deterministic user-defined function of 8 arguments as user-defined
+ * function (UDF). The data types are automatically inferred based on the function's
+ * signature. To change a UDF to nondeterministic, call the API
+ * `UserDefinedFunction.asNondeterministic()`.
*
* @group udf_funcs
* @since 1.3.0
@@ -3314,12 +3343,15 @@ object functions {
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](f: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: ScalaReflection.schemaFor(typeTag[A2]).dataType :: ScalaReflection.schemaFor(typeTag[A3]).dataType :: ScalaReflection.schemaFor(typeTag[A4]).dataType :: ScalaReflection.schemaFor(typeTag[A5]).dataType :: ScalaReflection.schemaFor(typeTag[A6]).dataType :: ScalaReflection.schemaFor(typeTag[A7]).dataType :: ScalaReflection.schemaFor(typeTag[A8]).dataType :: Nil).toOption
- UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
+ val udf = UserDefinedFunction(f, dataType, inputTypes)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Defines a user-defined function of 9 arguments as user-defined function (UDF).
- * The data types are automatically inferred based on the function's signature.
+ * Defines a deterministic user-defined function of 9 arguments as user-defined
+ * function (UDF). The data types are automatically inferred based on the function's
+ * signature. To change a UDF to nondeterministic, call the API
+ * `UserDefinedFunction.asNondeterministic()`.
*
* @group udf_funcs
* @since 1.3.0
@@ -3327,12 +3359,15 @@ object functions {
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](f: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: ScalaReflection.schemaFor(typeTag[A2]).dataType :: ScalaReflection.schemaFor(typeTag[A3]).dataType :: ScalaReflection.schemaFor(typeTag[A4]).dataType :: ScalaReflection.schemaFor(typeTag[A5]).dataType :: ScalaReflection.schemaFor(typeTag[A6]).dataType :: ScalaReflection.schemaFor(typeTag[A7]).dataType :: ScalaReflection.schemaFor(typeTag[A8]).dataType :: ScalaReflection.schemaFor(typeTag[A9]).dataType :: Nil).toOption
- UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
+ val udf = UserDefinedFunction(f, dataType, inputTypes)
+ if (nullable) udf else udf.asNonNullabe()
}
/**
- * Defines a user-defined function of 10 arguments as user-defined function (UDF).
- * The data types are automatically inferred based on the function's signature.
+ * Defines a deterministic user-defined function of 10 arguments as user-defined
+ * function (UDF). The data types are automatically inferred based on the function's
+ * signature. To change a UDF to nondeterministic, call the API
+ * `UserDefinedFunction.asNondeterministic()`.
*
* @group udf_funcs
* @since 1.3.0
@@ -3340,15 +3375,17 @@ object functions {
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](f: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: ScalaReflection.schemaFor(typeTag[A2]).dataType :: ScalaReflection.schemaFor(typeTag[A3]).dataType :: ScalaReflection.schemaFor(typeTag[A4]).dataType :: ScalaReflection.schemaFor(typeTag[A5]).dataType :: ScalaReflection.schemaFor(typeTag[A6]).dataType :: ScalaReflection.schemaFor(typeTag[A7]).dataType :: ScalaReflection.schemaFor(typeTag[A8]).dataType :: ScalaReflection.schemaFor(typeTag[A9]).dataType :: ScalaReflection.schemaFor(typeTag[A10]).dataType :: Nil).toOption
- UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
+ val udf = UserDefinedFunction(f, dataType, inputTypes)
+ if (nullable) udf else udf.asNonNullabe()
}
// scalastyle:on parameter.number
// scalastyle:on line.size.limit
/**
- * Defines a user-defined function (UDF) using a Scala closure. For this variant, the caller must
- * specify the output data type, and there is no automatic input type coercion.
+ * Defines a deterministic user-defined function (UDF) using a Scala closure. For this variant,
+ * the caller must specify the output data type, and there is no automatic input type coercion.
+ * To change a UDF to nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`.
*
* @param f A closure in Scala
* @param dataType The output data type of the UDF
http://git-wip-us.apache.org/repos/asf/spark/blob/ebc24a9b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index 335b882..7f1c009 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -17,9 +17,12 @@
package org.apache.spark.sql
+import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.SQLTestData._
+import org.apache.spark.sql.types.DataTypes
private case class FunctionResult(f1: String, f2: String)
@@ -109,9 +112,22 @@ class UDFSuite extends QueryTest with SharedSQLContext {
assert(sql("select foo(5)").head().getInt(0) == 6)
}
- test("ZeroArgument UDF") {
- spark.udf.register("random0", () => { Math.random()})
- assert(sql("SELECT random0()").head().getDouble(0) >= 0.0)
+ test("ZeroArgument non-deterministic UDF") {
+ val foo = udf(() => Math.random())
+ spark.udf.register("random0", foo.asNondeterministic())
+ val df = sql("SELECT random0()")
+ assert(df.logicalPlan.asInstanceOf[Project].projectList.forall(!_.deterministic))
+ assert(df.head().getDouble(0) >= 0.0)
+
+ val foo1 = foo.asNondeterministic()
+ val df1 = testData.select(foo1())
+ assert(df1.logicalPlan.asInstanceOf[Project].projectList.forall(!_.deterministic))
+ assert(df1.head().getDouble(0) >= 0.0)
+
+ val bar = udf(() => Math.random(), DataTypes.DoubleType).asNondeterministic()
+ val df2 = testData.select(bar())
+ assert(df2.logicalPlan.asInstanceOf[Project].projectList.forall(!_.deterministic))
+ assert(df2.head().getDouble(0) >= 0.0)
}
test("TwoArgument UDF") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org