You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/05 13:13:28 UTC

spark git commit: [SPARK-25044][SQL][FOLLOWUP] add back UserDefinedFunction.inputTypes

Repository: spark
Updated Branches:
  refs/heads/master 2119e518d -> 341b55a58


[SPARK-25044][SQL][FOLLOWUP] add back UserDefinedFunction.inputTypes

## What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/22259 .

Scala case class has a wide surface: apply, unapply, accessors, copy, etc.

In https://github.com/apache/spark/pull/22259 , we change the type of `UserDefinedFunction.inputTypes` from `Option[Seq[DataType]]` to `Option[Seq[Schema]]`. This breaks backward compatibility.

This PR changes the type back, and use a `var` to keep the new nullable info.

## How was this patch tested?

N/A

Closes #22319 from cloud-fan/revert.

Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/341b55a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/341b55a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/341b55a5

Branch: refs/heads/master
Commit: 341b55a58964b1966a1919ac0774c8be5d5e7251
Parents: 2119e51
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Sep 5 21:13:16 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Sep 5 21:13:16 2018 +0800

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |   5 -
 .../org/apache/spark/sql/UDFRegistration.scala  | 196 +++++++++----------
 .../sql/expressions/UserDefinedFunction.scala   |  29 ++-
 .../scala/org/apache/spark/sql/functions.scala  |  78 ++++----
 4 files changed, 163 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/341b55a5/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 45cc5cc..7ff783d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -47,11 +47,6 @@ object MimaExcludes {
     ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.fpm.FPGrowthModel.this"),
     ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.fpm.AssociationRules#Rule.this"),
 
-    // [SPARK-25044] Address translation of LMF closure primitive args to Object in Scala 2.12
-    ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.expressions.UserDefinedFunction$"),
-    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.apply"),
-    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.copy"),
-
     // [SPARK-24296][CORE] Replicate large blocks as a stream.
     ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.this"),
     // [SPARK-23528] Add numIter to ClusteringSummary

http://git-wip-us.apache.org/repos/asf/spark/blob/341b55a5/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index 24ee46d..c37ba0c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF}
 import org.apache.spark.sql.execution.aggregate.ScalaUDAF
 import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
-import org.apache.spark.sql.expressions.{UserDefinedAggregateFunction, UserDefinedFunction}
+import org.apache.spark.sql.expressions.{SparkUserDefinedFunction, UserDefinedAggregateFunction, UserDefinedFunction}
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.util.Utils
 
@@ -113,7 +113,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
     (0 to 22).foreach { x =>
       val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"})
       val typeTags = (1 to x).map(i => s"A$i: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _)
-      val inputTypes = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor[A$i] :: $s"})
+      val inputSchemas = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor[A$i] :: $s"})
       println(s"""
         |/**
         | * Registers a deterministic Scala closure of $x arguments as user-defined function (UDF).
@@ -122,16 +122,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
         | */
         |def register[$typeTags](name: String, func: Function$x[$types]): UserDefinedFunction = {
         |  val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-        |  val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try($inputTypes).toOption
+        |  val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try($inputSchemas).toOption
         |  def builder(e: Seq[Expression]) = if (e.length == $x) {
-        |    ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        |      udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+        |    ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        |      udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
         |  } else {
         |    throw new AnalysisException("Invalid number of arguments for function " + name +
         |      ". Expected: $x; Found: " + e.length)
         |  }
         |  functionRegistry.createOrReplaceTempFunction(name, builder)
-        |  val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+        |  val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
         |  if (nullable) udf else udf.asNonNullable()
         |}""".stripMargin)
     }
@@ -168,16 +168,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag](name: String, func: Function0[RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 0) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 0; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -188,16 +188,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 1) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 1; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -208,16 +208,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag](name: String, func: Function2[A1, A2, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 2) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 2; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -228,16 +228,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](name: String, func: Function3[A1, A2, A3, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 3) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 3; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -248,16 +248,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](name: String, func: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 4) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 4; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -268,16 +268,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](name: String, func: Function5[A1, A2, A3, A4, A5, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 5) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 5; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -288,16 +288,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](name: String, func: Function6[A1, A2, A3, A4, A5, A6, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 6) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 6; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -308,16 +308,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](name: String, func: Function7[A1, A2, A3, A4, A5, A6, A7, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 7) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 7; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -328,16 +328,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](name: String, func: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 8) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 8; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -348,16 +348,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](name: String, func: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 9) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 9; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -368,16 +368,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](name: String, func: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 10) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 10; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -388,16 +388,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag](name: String, func: Function11[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 11) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 11; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -408,16 +408,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag](name: String, func: Function12[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 12) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 12; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -428,16 +428,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag](name: String, func: Function13[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 13) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 13; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -448,16 +448,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag](name: String, func: Function14[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 14) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 14; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -468,16 +468,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag](name: String, func: Function15[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 15) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 15; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -488,16 +488,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag](name: String, func: Function16[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 16) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 16; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -508,16 +508,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag](name: String, func: Function17[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 17) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 17; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -528,16 +528,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag](name: String, func: Function18[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 18) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 18; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -548,16 +548,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag](name: String, func: Function19[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 19) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 19; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -568,16 +568,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag](name: String, func: Function20[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: ScalaReflection.schemaFor[A20] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: ScalaReflection.schemaFor[A20] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 20) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 20; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -588,16 +588,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag](name: String, func: Function21[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: ScalaReflection.schemaFor[A20] :: ScalaReflection.schemaFor[A21] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: ScalaReflection.schemaFor[A20] :: ScalaReflection.schemaFor[A21] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 21) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 21; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -608,16 +608,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag, A22: TypeTag](name: String, func: Function22[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, A22, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: ScalaReflection.schemaFor[A20] :: ScalaReflection.schemaFor[A21] :: ScalaReflection.schemaFor[A22] :: Nil).toOption
+    val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: ScalaReflection.schemaFor[A20] :: ScalaReflection.schemaFor[A21] :: ScalaReflection.schemaFor[A22] :: Nil).toOption
     def builder(e: Seq[Expression]) = if (e.length == 22) {
-      ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
-        udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))
+      ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable,
+        udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 22; Found: " + e.length)
     }
     functionRegistry.createOrReplaceTempFunction(name, builder)
-    val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
+    val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name)
     if (nullable) udf else udf.asNonNullable()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/341b55a5/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
index 7bd20db..697757f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
@@ -41,12 +41,16 @@ import org.apache.spark.sql.types.DataType
 case class UserDefinedFunction protected[sql] (
     f: AnyRef,
     dataType: DataType,
-    inputTypes: Option[Seq[ScalaReflection.Schema]]) {
+    inputTypes: Option[Seq[DataType]]) {
 
   private var _nameOption: Option[String] = None
   private var _nullable: Boolean = true
   private var _deterministic: Boolean = true
 
+  // This is a `var` instead of in the constructor for backward compatibility of this case class.
+  // TODO: revisit this case class in Spark 3.0, and narrow down the public surface.
+  private[sql] var nullableTypes: Option[Seq[Boolean]] = None
+
   /**
    * Returns true when the UDF can return a nullable value.
    *
@@ -69,15 +73,19 @@ case class UserDefinedFunction protected[sql] (
    */
   @scala.annotation.varargs
   def apply(exprs: Column*): Column = {
+    if (inputTypes.isDefined && nullableTypes.isDefined) {
+      require(inputTypes.get.length == nullableTypes.get.length)
+    }
+
     Column(ScalaUDF(
       f,
       dataType,
       exprs.map(_.expr),
-      inputTypes.map(_.map(_.dataType)).getOrElse(Nil),
+      inputTypes.getOrElse(Nil),
       udfName = _nameOption,
       nullable = _nullable,
       udfDeterministic = _deterministic,
-      nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)))
+      nullableTypes = nullableTypes.getOrElse(Nil)))
   }
 
   private def copyAll(): UserDefinedFunction = {
@@ -85,6 +93,7 @@ case class UserDefinedFunction protected[sql] (
     udf._nameOption = _nameOption
     udf._nullable = _nullable
     udf._deterministic = _deterministic
+    udf.nullableTypes = nullableTypes
     udf
   }
 
@@ -129,3 +138,17 @@ case class UserDefinedFunction protected[sql] (
     }
   }
 }
+
+// We have to use a name different than `UserDefinedFunction` here, to avoid breaking the binary
+// compatibility of the auto-generate UserDefinedFunction object.
+private[sql] object SparkUserDefinedFunction {
+
+  def create(
+      f: AnyRef,
+      dataType: DataType,
+      inputSchemas: Option[Seq[ScalaReflection.Schema]]): UserDefinedFunction = {
+    val udf = new UserDefinedFunction(f, dataType, inputSchemas.map(_.map(_.dataType)))
+    udf.nullableTypes = inputSchemas.map(_.map(_.nullable))
+    udf
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/341b55a5/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index a261a7c..c120be4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, ResolvedHint}
 import org.apache.spark.sql.execution.SparkSqlParser
-import org.apache.spark.sql.expressions.UserDefinedFunction
+import org.apache.spark.sql.expressions.{SparkUserDefinedFunction, UserDefinedFunction}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -3819,7 +3819,7 @@ object functions {
   (0 to 10).foreach { x =>
     val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"})
     val typeTags = (1 to x).map(i => s"A$i: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _)
-    val inputTypes = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor(typeTag[A$i]) :: $s"})
+    val inputSchemas = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor(typeTag[A$i]) :: $s"})
     println(s"""
       |/**
       | * Defines a Scala closure of $x arguments as user-defined function (UDF).
@@ -3832,8 +3832,8 @@ object functions {
       | */
       |def udf[$typeTags](f: Function$x[$types]): UserDefinedFunction = {
       |  val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-      |  val inputTypes = Try($inputTypes).toOption
-      |  val udf = UserDefinedFunction(f, dataType, inputTypes)
+      |  val inputSchemas = Try($inputTypes).toOption
+      |  val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
       |  if (nullable) udf else udf.asNonNullable()
       |}""".stripMargin)
   }
@@ -3856,7 +3856,7 @@ object functions {
       | */
       |def udf(f: UDF$i[$extTypeArgs], returnType: DataType): UserDefinedFunction = {
       |  val func = f$anyCast.call($anyParams)
-      |  UserDefinedFunction($funcCall, returnType, inputTypes = None)
+      |  SparkUserDefinedFunction.create($funcCall, returnType, inputSchemas = None)
       |}""".stripMargin)
   }
 
@@ -3877,8 +3877,8 @@ object functions {
    */
   def udf[RT: TypeTag](f: Function0[RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes = Try(Nil).toOption
-    val udf = UserDefinedFunction(f, dataType, inputTypes)
+    val inputSchemas = Try(Nil).toOption
+    val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -3893,8 +3893,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: Nil).toOption
-    val udf = UserDefinedFunction(f, dataType, inputTypes)
+    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: Nil).toOption
+    val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -3909,8 +3909,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: Nil).toOption
-    val udf = UserDefinedFunction(f, dataType, inputTypes)
+    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: Nil).toOption
+    val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -3925,8 +3925,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](f: Function3[A1, A2, A3, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: Nil).toOption
-    val udf = UserDefinedFunction(f, dataType, inputTypes)
+    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: Nil).toOption
+    val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -3941,8 +3941,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](f: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: Nil).toOption
-    val udf = UserDefinedFunction(f, dataType, inputTypes)
+    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: Nil).toOption
+    val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -3957,8 +3957,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](f: Function5[A1, A2, A3, A4, A5, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: Nil).toOption
-    val udf = UserDefinedFunction(f, dataType, inputTypes)
+    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: Nil).toOption
+    val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -3973,8 +3973,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](f: Function6[A1, A2, A3, A4, A5, A6, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: Nil).toOption
-    val udf = UserDefinedFunction(f, dataType, inputTypes)
+    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: Nil).toOption
+    val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -3989,8 +3989,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](f: Function7[A1, A2, A3, A4, A5, A6, A7, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: Nil).toOption
-    val udf = UserDefinedFunction(f, dataType, inputTypes)
+    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: Nil).toOption
+    val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4005,8 +4005,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](f: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: ScalaReflection.schemaFor(typeTag[A8]) :: Nil).toOption
-    val udf = UserDefinedFunction(f, dataType, inputTypes)
+    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: ScalaReflection.schemaFor(typeTag[A8]) :: Nil).toOption
+    val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4021,8 +4021,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](f: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: ScalaReflection.schemaFor(typeTag[A8]) :: ScalaReflection.schemaFor(typeTag[A9]) :: Nil).toOption
-    val udf = UserDefinedFunction(f, dataType, inputTypes)
+    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: ScalaReflection.schemaFor(typeTag[A8]) :: ScalaReflection.schemaFor(typeTag[A9]) :: Nil).toOption
+    val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4037,8 +4037,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](f: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: ScalaReflection.schemaFor(typeTag[A8]) :: ScalaReflection.schemaFor(typeTag[A9]) :: ScalaReflection.schemaFor(typeTag[A10]) :: Nil).toOption
-    val udf = UserDefinedFunction(f, dataType, inputTypes)
+    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: ScalaReflection.schemaFor(typeTag[A8]) :: ScalaReflection.schemaFor(typeTag[A9]) :: ScalaReflection.schemaFor(typeTag[A10]) :: Nil).toOption
+    val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4057,7 +4057,7 @@ object functions {
    */
   def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF0[Any]].call()
-    UserDefinedFunction(() => func, returnType, inputTypes = None)
+    SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = None)
   }
 
   /**
@@ -4071,7 +4071,7 @@ object functions {
    */
   def udf(f: UDF1[_, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any)
-    UserDefinedFunction(func, returnType, inputTypes = None)
+    SparkUserDefinedFunction.create(func, returnType, inputSchemas = None)
   }
 
   /**
@@ -4085,7 +4085,7 @@ object functions {
    */
   def udf(f: UDF2[_, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any)
-    UserDefinedFunction(func, returnType, inputTypes = None)
+    SparkUserDefinedFunction.create(func, returnType, inputSchemas = None)
   }
 
   /**
@@ -4099,7 +4099,7 @@ object functions {
    */
   def udf(f: UDF3[_, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any)
-    UserDefinedFunction(func, returnType, inputTypes = None)
+    SparkUserDefinedFunction.create(func, returnType, inputSchemas = None)
   }
 
   /**
@@ -4113,7 +4113,7 @@ object functions {
    */
   def udf(f: UDF4[_, _, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any)
-    UserDefinedFunction(func, returnType, inputTypes = None)
+    SparkUserDefinedFunction.create(func, returnType, inputSchemas = None)
   }
 
   /**
@@ -4127,7 +4127,7 @@ object functions {
    */
   def udf(f: UDF5[_, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any)
-    UserDefinedFunction(func, returnType, inputTypes = None)
+    SparkUserDefinedFunction.create(func, returnType, inputSchemas = None)
   }
 
   /**
@@ -4141,7 +4141,7 @@ object functions {
    */
   def udf(f: UDF6[_, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
-    UserDefinedFunction(func, returnType, inputTypes = None)
+    SparkUserDefinedFunction.create(func, returnType, inputSchemas = None)
   }
 
   /**
@@ -4155,7 +4155,7 @@ object functions {
    */
   def udf(f: UDF7[_, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
-    UserDefinedFunction(func, returnType, inputTypes = None)
+    SparkUserDefinedFunction.create(func, returnType, inputSchemas = None)
   }
 
   /**
@@ -4169,7 +4169,7 @@ object functions {
    */
   def udf(f: UDF8[_, _, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
-    UserDefinedFunction(func, returnType, inputTypes = None)
+    SparkUserDefinedFunction.create(func, returnType, inputSchemas = None)
   }
 
   /**
@@ -4183,7 +4183,7 @@ object functions {
    */
   def udf(f: UDF9[_, _, _, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
-    UserDefinedFunction(func, returnType, inputTypes = None)
+    SparkUserDefinedFunction.create(func, returnType, inputSchemas = None)
   }
 
   /**
@@ -4197,7 +4197,7 @@ object functions {
    */
   def udf(f: UDF10[_, _, _, _, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
-    UserDefinedFunction(func, returnType, inputTypes = None)
+    SparkUserDefinedFunction.create(func, returnType, inputSchemas = None)
   }
 
   // scalastyle:on parameter.number
@@ -4216,7 +4216,7 @@ object functions {
    * @since 2.0.0
    */
   def udf(f: AnyRef, dataType: DataType): UserDefinedFunction = {
-    UserDefinedFunction(f, dataType, None)
+    SparkUserDefinedFunction.create(f, dataType, inputSchemas = None)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org