You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/24 15:28:19 UTC

[spark] branch branch-3.0 updated: [SPARK-30127][SQL] Support case class parameter for typed Scala UDF

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a934142  [SPARK-30127][SQL] Support case class parameter for typed Scala UDF
a934142 is described below

commit a934142f2405e085d0e1ef76b1bc6fbc6e3059c1
Author: yi.wu <yi...@databricks.com>
AuthorDate: Tue Mar 24 23:03:57 2020 +0800

    [SPARK-30127][SQL] Support case class parameter for typed Scala UDF
    
    ### What changes were proposed in this pull request?
    
    To support  case class parameter for typed Scala UDF, e.g.
    
    ```
    case class TestData(key: Int, value: String)
    val f = (d: TestData) => d.key * d.value.toInt
    val myUdf = udf(f)
    val df = Seq(("data", TestData(50, "2"))).toDF("col1", "col2")
    checkAnswer(df.select(myUdf(Column("col2"))), Row(100) :: Nil)
    ```
    
    ### Why are the changes needed?
    
    Currently, Spark UDF can only work on data types like java.lang.String, o.a.s.sql.Row, Seq[_], etc. This is inconvenient if user want to apply an operation on one column, and the column is struct type. You must access data from a Row object, instead of domain object like Dataset operations. It will be great if UDF can work on types that are supported by Dataset, e.g. case class.
    
    And here's benchmark result of using case class comparing to row:
    
    ```scala
    
    // case class:  58ms 65ms 59ms 64ms 61ms
    // row:         59ms 64ms 73ms 84ms 69ms
    val f1 = (d: TestData) => s"${d.key}, ${d.value}"
    val f2 = (r: Row) => s"${r.getInt(0)}, ${r.getString(1)}"
    val udf1 = udf(f1)
    // set spark.sql.legacy.allowUntypedScalaUDF=true
    val udf2 = udf(f2, StringType)
    
    val df = spark.range(100000).selectExpr("cast (id as int) as id")
        .select(struct('id, lit("str")).as("col"))
    df.cache().collect()
    
    // warmup to exclude some extra influence
    df.select(udf1('col)).write.mode(SaveMode.Overwrite).format("noop").save()
    df.select(udf2('col)).write.mode(SaveMode.Overwrite).format("noop").save()
    
    start = System.currentTimeMillis()
    df.select(udf1('col)).write.mode(SaveMode.Overwrite).format("noop").save()
    println(System.currentTimeMillis() - start)
    
    start = System.currentTimeMillis()
    df.select(udf2('col)).write.mode(SaveMode.Overwrite).format("noop").save()
    println(System.currentTimeMillis() - start)
    
    ```
    
    ### Does this PR introduce any user-facing change?
    
    Yes. User now could be able to use typed Scala UDF with case class as input parameter.
    
    ### How was this patch tested?
    
    Added unit tests.
    
    Closes #27937 from Ngone51/udf_caseclass_support.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit f6ff7d0cf8c0e562f3b086180d5418e6996055bb)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   8 +-
 .../spark/sql/catalyst/expressions/ScalaUDF.scala  | 595 +++++++++++----------
 .../sql/catalyst/analysis/AnalysisSuite.scala      |  18 +-
 .../sql/catalyst/expressions/ScalaUDFSuite.scala   |  18 +-
 .../catalyst/optimizer/EliminateSortsSuite.scala   |   4 +-
 .../spark/sql/catalyst/trees/TreeNodeSuite.scala   |   4 +-
 .../org/apache/spark/sql/UDFRegistration.scala     | 146 ++---
 .../datasources/FileFormatDataWriter.scala         |   3 +-
 .../sql/expressions/UserDefinedFunction.scala      |  11 +-
 .../scala/org/apache/spark/sql/functions.scala     |  76 +--
 .../apache/spark/sql/IntegratedUDFTestUtils.scala  |   2 +-
 .../test/scala/org/apache/spark/sql/UDFSuite.scala |  44 +-
 12 files changed, 519 insertions(+), 410 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 975f023..67f6f49 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2703,13 +2703,13 @@ class Analyzer(
 
       case p => p transformExpressionsUp {
 
-        case udf @ ScalaUDF(_, _, inputs, inputPrimitives, _, _, _, _)
-            if inputPrimitives.contains(true) =>
+        case udf @ ScalaUDF(_, _, inputs, _, _, _, _)
+            if udf.inputPrimitives.contains(true) =>
           // Otherwise, add special handling of null for fields that can't accept null.
           // The result of operations like this, when passed null, is generally to return null.
-          assert(inputPrimitives.length == inputs.length)
+          assert(udf.inputPrimitives.length == inputs.length)
 
-          val inputPrimitivesPair = inputPrimitives.zip(inputs)
+          val inputPrimitivesPair = udf.inputPrimitives.zip(inputs)
           val inputNullCheck = inputPrimitivesPair.collect {
             case (isPrimitive, input) if isPrimitive && input.nullable =>
               IsNull(input)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
index 10f8ec9..1ac7ca6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import scala.collection.mutable
+
 import org.apache.spark.SparkException
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
-import org.apache.spark.sql.types.{AbstractDataType, DataType}
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, DataType}
 
 /**
  * User-defined function.
@@ -31,14 +34,9 @@ import org.apache.spark.sql.types.{AbstractDataType, DataType}
  *                  null. Use boxed type or [[Option]] if you wanna do the null-handling yourself.
  * @param dataType  Return type of function.
  * @param children  The input expressions of this UDF.
- * @param inputPrimitives The analyzer should be aware of Scala primitive types so as to make the
- *                        UDF return null if there is any null input value of these types. On the
- *                        other hand, Java UDFs can only have boxed types, thus this parameter will
- *                        always be all false.
- * @param inputTypes  The expected input types of this UDF, used to perform type coercion. If we do
- *                    not want to perform coercion, simply use "Nil". Note that it would've been
- *                    better to use Option of Seq[DataType] so we can use "None" as the case for no
- *                    type coercion. However, that would require more refactoring of the codebase.
+ * @param inputEncoders ExpressionEncoder for each input parameters. For a input parameter which
+ *                      serialized as struct will use encoder instead of CatalystTypeConverters to
+ *                      convert internal value to Scala value.
  * @param udfName  The user-specified name of this UDF.
  * @param nullable  True if the UDF can return null value.
  * @param udfDeterministic  True if the UDF is deterministic. Deterministic UDF returns same result
@@ -48,8 +46,7 @@ case class ScalaUDF(
     function: AnyRef,
     dataType: DataType,
     children: Seq[Expression],
-    inputPrimitives: Seq[Boolean],
-    inputTypes: Seq[AbstractDataType] = Nil,
+    inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Nil,
     udfName: Option[String] = None,
     nullable: Boolean = true,
     udfDeterministic: Boolean = true)
@@ -59,6 +56,68 @@ case class ScalaUDF(
 
   override def toString: String = s"${udfName.getOrElse("UDF")}(${children.mkString(", ")})"
 
+  /**
+   * The analyzer should be aware of Scala primitive types so as to make the
+   * UDF return null if there is any null input value of these types. On the
+   * other hand, Java UDFs can only have boxed types, thus this will return
+   * Nil(has same effect with all false) and analyzer will skip null-handling
+   * on them.
+   */
+  def inputPrimitives: Seq[Boolean] = {
+    inputEncoders.map { encoderOpt =>
+      // It's possible that some of the inputs don't have a specific encoder(e.g. `Any`)
+      if (encoderOpt.isDefined) {
+        val encoder = encoderOpt.get
+        if (encoder.isSerializedAsStruct) {
+          // struct type is not primitive
+          false
+        } else {
+          // `nullable` is false iff the type is primitive
+          !encoder.schema.head.nullable
+        }
+      } else {
+        // Any type is not primitive
+        false
+      }
+    }
+  }
+
+  /**
+   * The expected input types of this UDF, used to perform type coercion. If we do
+   * not want to perform coercion, simply use "Nil". Note that it would've been
+   * better to use Option of Seq[DataType] so we can use "None" as the case for no
+   * type coercion. However, that would require more refactoring of the codebase.
+   */
+  def inputTypes: Seq[AbstractDataType] = {
+    inputEncoders.map { encoderOpt =>
+      if (encoderOpt.isDefined) {
+        val encoder = encoderOpt.get
+        if (encoder.isSerializedAsStruct) {
+          encoder.schema
+        } else {
+          encoder.schema.head.dataType
+        }
+      } else {
+        AnyDataType
+      }
+    }
+  }
+
+  private def createToScalaConverter(i: Int, dataType: DataType): Any => Any = {
+    if (inputEncoders.isEmpty) {
+      // for untyped Scala UDF
+      CatalystTypeConverters.createToScalaConverter(dataType)
+    } else {
+      val encoder = inputEncoders(i)
+      if (encoder.isDefined && encoder.get.isSerializedAsStructForTopLevel) {
+        val enc = encoder.get.resolveAndBind()
+        row: Any => enc.fromRow(row.asInstanceOf[InternalRow])
+      } else {
+        CatalystTypeConverters.createToScalaConverter(dataType)
+      }
+    }
+  }
+
   // scalastyle:off line.size.limit
 
   /** This method has been generated by this script
@@ -66,7 +125,7 @@ case class ScalaUDF(
     (1 to 22).map { x =>
       val anys = (1 to x).map(x => "Any").reduce(_ + ", " + _)
       val childs = (0 to x - 1).map(x => s"val child$x = children($x)").reduce(_ + "\n  " + _)
-      val converters = (0 to x - 1).map(x => s"lazy val converter$x = CatalystTypeConverters.createToScalaConverter(child$x.dataType)").reduce(_ + "\n  " + _)
+      val converters = (0 to x - 1).map(x => s"lazy val converter$x = createToScalaConverter($x, child$x.dataType)").reduce(_ + "\n  " + _)
       val evals = (0 to x - 1).map(x => s"converter$x(child$x.eval(input))").reduce(_ + ",\n      " + _)
 
       s"""case $x =>
@@ -91,7 +150,7 @@ case class ScalaUDF(
     case 1 =>
       val func = function.asInstanceOf[(Any) => Any]
       val child0 = children(0)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)))
@@ -101,8 +160,8 @@ case class ScalaUDF(
       val func = function.asInstanceOf[(Any, Any) => Any]
       val child0 = children(0)
       val child1 = children(1)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -114,9 +173,9 @@ case class ScalaUDF(
       val child0 = children(0)
       val child1 = children(1)
       val child2 = children(2)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -130,10 +189,10 @@ case class ScalaUDF(
       val child1 = children(1)
       val child2 = children(2)
       val child3 = children(3)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -149,11 +208,11 @@ case class ScalaUDF(
       val child2 = children(2)
       val child3 = children(3)
       val child4 = children(4)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -171,12 +230,12 @@ case class ScalaUDF(
       val child3 = children(3)
       val child4 = children(4)
       val child5 = children(5)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -196,13 +255,13 @@ case class ScalaUDF(
       val child4 = children(4)
       val child5 = children(5)
       val child6 = children(6)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -224,14 +283,14 @@ case class ScalaUDF(
       val child5 = children(5)
       val child6 = children(6)
       val child7 = children(7)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -255,15 +314,15 @@ case class ScalaUDF(
       val child6 = children(6)
       val child7 = children(7)
       val child8 = children(8)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
+      lazy val converter8 = createToScalaConverter(8, child8.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -289,16 +348,16 @@ case class ScalaUDF(
       val child7 = children(7)
       val child8 = children(8)
       val child9 = children(9)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
-      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
+      lazy val converter8 = createToScalaConverter(8, child8.dataType)
+      lazy val converter9 = createToScalaConverter(9, child9.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -326,17 +385,17 @@ case class ScalaUDF(
       val child8 = children(8)
       val child9 = children(9)
       val child10 = children(10)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
-      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
-      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
+      lazy val converter8 = createToScalaConverter(8, child8.dataType)
+      lazy val converter9 = createToScalaConverter(9, child9.dataType)
+      lazy val converter10 = createToScalaConverter(10, child10.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -366,18 +425,18 @@ case class ScalaUDF(
       val child9 = children(9)
       val child10 = children(10)
       val child11 = children(11)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
-      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
-      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
-      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
+      lazy val converter8 = createToScalaConverter(8, child8.dataType)
+      lazy val converter9 = createToScalaConverter(9, child9.dataType)
+      lazy val converter10 = createToScalaConverter(10, child10.dataType)
+      lazy val converter11 = createToScalaConverter(11, child11.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -409,19 +468,19 @@ case class ScalaUDF(
       val child10 = children(10)
       val child11 = children(11)
       val child12 = children(12)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
-      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
-      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
-      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
-      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
+      lazy val converter8 = createToScalaConverter(8, child8.dataType)
+      lazy val converter9 = createToScalaConverter(9, child9.dataType)
+      lazy val converter10 = createToScalaConverter(10, child10.dataType)
+      lazy val converter11 = createToScalaConverter(11, child11.dataType)
+      lazy val converter12 = createToScalaConverter(12, child12.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -455,20 +514,20 @@ case class ScalaUDF(
       val child11 = children(11)
       val child12 = children(12)
       val child13 = children(13)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
-      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
-      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
-      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
-      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
-      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
+      lazy val converter8 = createToScalaConverter(8, child8.dataType)
+      lazy val converter9 = createToScalaConverter(9, child9.dataType)
+      lazy val converter10 = createToScalaConverter(10, child10.dataType)
+      lazy val converter11 = createToScalaConverter(11, child11.dataType)
+      lazy val converter12 = createToScalaConverter(12, child12.dataType)
+      lazy val converter13 = createToScalaConverter(13, child13.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -504,21 +563,21 @@ case class ScalaUDF(
       val child12 = children(12)
       val child13 = children(13)
       val child14 = children(14)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
-      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
-      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
-      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
-      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
-      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
-      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
+      lazy val converter8 = createToScalaConverter(8, child8.dataType)
+      lazy val converter9 = createToScalaConverter(9, child9.dataType)
+      lazy val converter10 = createToScalaConverter(10, child10.dataType)
+      lazy val converter11 = createToScalaConverter(11, child11.dataType)
+      lazy val converter12 = createToScalaConverter(12, child12.dataType)
+      lazy val converter13 = createToScalaConverter(13, child13.dataType)
+      lazy val converter14 = createToScalaConverter(14, child14.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -556,22 +615,22 @@ case class ScalaUDF(
       val child13 = children(13)
       val child14 = children(14)
       val child15 = children(15)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
-      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
-      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
-      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
-      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
-      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
-      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
-      lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
+      lazy val converter8 = createToScalaConverter(8, child8.dataType)
+      lazy val converter9 = createToScalaConverter(9, child9.dataType)
+      lazy val converter10 = createToScalaConverter(10, child10.dataType)
+      lazy val converter11 = createToScalaConverter(11, child11.dataType)
+      lazy val converter12 = createToScalaConverter(12, child12.dataType)
+      lazy val converter13 = createToScalaConverter(13, child13.dataType)
+      lazy val converter14 = createToScalaConverter(14, child14.dataType)
+      lazy val converter15 = createToScalaConverter(15, child15.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -611,23 +670,23 @@ case class ScalaUDF(
       val child14 = children(14)
       val child15 = children(15)
       val child16 = children(16)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
-      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
-      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
-      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
-      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
-      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
-      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
-      lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
-      lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
+      lazy val converter8 = createToScalaConverter(8, child8.dataType)
+      lazy val converter9 = createToScalaConverter(9, child9.dataType)
+      lazy val converter10 = createToScalaConverter(10, child10.dataType)
+      lazy val converter11 = createToScalaConverter(11, child11.dataType)
+      lazy val converter12 = createToScalaConverter(12, child12.dataType)
+      lazy val converter13 = createToScalaConverter(13, child13.dataType)
+      lazy val converter14 = createToScalaConverter(14, child14.dataType)
+      lazy val converter15 = createToScalaConverter(15, child15.dataType)
+      lazy val converter16 = createToScalaConverter(16, child16.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -669,24 +728,24 @@ case class ScalaUDF(
       val child15 = children(15)
       val child16 = children(16)
       val child17 = children(17)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
-      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
-      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
-      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
-      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
-      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
-      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
-      lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
-      lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
-      lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
+      lazy val converter8 = createToScalaConverter(8, child8.dataType)
+      lazy val converter9 = createToScalaConverter(9, child9.dataType)
+      lazy val converter10 = createToScalaConverter(10, child10.dataType)
+      lazy val converter11 = createToScalaConverter(11, child11.dataType)
+      lazy val converter12 = createToScalaConverter(12, child12.dataType)
+      lazy val converter13 = createToScalaConverter(13, child13.dataType)
+      lazy val converter14 = createToScalaConverter(14, child14.dataType)
+      lazy val converter15 = createToScalaConverter(15, child15.dataType)
+      lazy val converter16 = createToScalaConverter(16, child16.dataType)
+      lazy val converter17 = createToScalaConverter(17, child17.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -730,25 +789,25 @@ case class ScalaUDF(
       val child16 = children(16)
       val child17 = children(17)
       val child18 = children(18)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
-      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
-      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
-      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
-      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
-      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
-      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
-      lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
-      lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
-      lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
-      lazy val converter18 = CatalystTypeConverters.createToScalaConverter(child18.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
+      lazy val converter8 = createToScalaConverter(8, child8.dataType)
+      lazy val converter9 = createToScalaConverter(9, child9.dataType)
+      lazy val converter10 = createToScalaConverter(10, child10.dataType)
+      lazy val converter11 = createToScalaConverter(11, child11.dataType)
+      lazy val converter12 = createToScalaConverter(12, child12.dataType)
+      lazy val converter13 = createToScalaConverter(13, child13.dataType)
+      lazy val converter14 = createToScalaConverter(14, child14.dataType)
+      lazy val converter15 = createToScalaConverter(15, child15.dataType)
+      lazy val converter16 = createToScalaConverter(16, child16.dataType)
+      lazy val converter17 = createToScalaConverter(17, child17.dataType)
+      lazy val converter18 = createToScalaConverter(18, child18.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -794,26 +853,26 @@ case class ScalaUDF(
       val child17 = children(17)
       val child18 = children(18)
       val child19 = children(19)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
-      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
-      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
-      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
-      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
-      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
-      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
-      lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
-      lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
-      lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
-      lazy val converter18 = CatalystTypeConverters.createToScalaConverter(child18.dataType)
-      lazy val converter19 = CatalystTypeConverters.createToScalaConverter(child19.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
+      lazy val converter8 = createToScalaConverter(8, child8.dataType)
+      lazy val converter9 = createToScalaConverter(9, child9.dataType)
+      lazy val converter10 = createToScalaConverter(10, child10.dataType)
+      lazy val converter11 = createToScalaConverter(11, child11.dataType)
+      lazy val converter12 = createToScalaConverter(12, child12.dataType)
+      lazy val converter13 = createToScalaConverter(13, child13.dataType)
+      lazy val converter14 = createToScalaConverter(14, child14.dataType)
+      lazy val converter15 = createToScalaConverter(15, child15.dataType)
+      lazy val converter16 = createToScalaConverter(16, child16.dataType)
+      lazy val converter17 = createToScalaConverter(17, child17.dataType)
+      lazy val converter18 = createToScalaConverter(18, child18.dataType)
+      lazy val converter19 = createToScalaConverter(19, child19.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -861,27 +920,27 @@ case class ScalaUDF(
       val child18 = children(18)
       val child19 = children(19)
       val child20 = children(20)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
-      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
-      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
-      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
-      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
-      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
-      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
-      lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
-      lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
-      lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
-      lazy val converter18 = CatalystTypeConverters.createToScalaConverter(child18.dataType)
-      lazy val converter19 = CatalystTypeConverters.createToScalaConverter(child19.dataType)
-      lazy val converter20 = CatalystTypeConverters.createToScalaConverter(child20.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
+      lazy val converter8 = createToScalaConverter(8, child8.dataType)
+      lazy val converter9 = createToScalaConverter(9, child9.dataType)
+      lazy val converter10 = createToScalaConverter(10, child10.dataType)
+      lazy val converter11 = createToScalaConverter(11, child11.dataType)
+      lazy val converter12 = createToScalaConverter(12, child12.dataType)
+      lazy val converter13 = createToScalaConverter(13, child13.dataType)
+      lazy val converter14 = createToScalaConverter(14, child14.dataType)
+      lazy val converter15 = createToScalaConverter(15, child15.dataType)
+      lazy val converter16 = createToScalaConverter(16, child16.dataType)
+      lazy val converter17 = createToScalaConverter(17, child17.dataType)
+      lazy val converter18 = createToScalaConverter(18, child18.dataType)
+      lazy val converter19 = createToScalaConverter(19, child19.dataType)
+      lazy val converter20 = createToScalaConverter(20, child20.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -931,28 +990,28 @@ case class ScalaUDF(
       val child19 = children(19)
       val child20 = children(20)
       val child21 = children(21)
-      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
-      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
-      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
-      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
-      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
-      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
-      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
-      lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
-      lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
-      lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
-      lazy val converter18 = CatalystTypeConverters.createToScalaConverter(child18.dataType)
-      lazy val converter19 = CatalystTypeConverters.createToScalaConverter(child19.dataType)
-      lazy val converter20 = CatalystTypeConverters.createToScalaConverter(child20.dataType)
-      lazy val converter21 = CatalystTypeConverters.createToScalaConverter(child21.dataType)
+      lazy val converter0 = createToScalaConverter(0, child0.dataType)
+      lazy val converter1 = createToScalaConverter(1, child1.dataType)
+      lazy val converter2 = createToScalaConverter(2, child2.dataType)
+      lazy val converter3 = createToScalaConverter(3, child3.dataType)
+      lazy val converter4 = createToScalaConverter(4, child4.dataType)
+      lazy val converter5 = createToScalaConverter(5, child5.dataType)
+      lazy val converter6 = createToScalaConverter(6, child6.dataType)
+      lazy val converter7 = createToScalaConverter(7, child7.dataType)
+      lazy val converter8 = createToScalaConverter(8, child8.dataType)
+      lazy val converter9 = createToScalaConverter(9, child9.dataType)
+      lazy val converter10 = createToScalaConverter(10, child10.dataType)
+      lazy val converter11 = createToScalaConverter(11, child11.dataType)
+      lazy val converter12 = createToScalaConverter(12, child12.dataType)
+      lazy val converter13 = createToScalaConverter(13, child13.dataType)
+      lazy val converter14 = createToScalaConverter(14, child14.dataType)
+      lazy val converter15 = createToScalaConverter(15, child15.dataType)
+      lazy val converter16 = createToScalaConverter(16, child16.dataType)
+      lazy val converter17 = createToScalaConverter(17, child17.dataType)
+      lazy val converter18 = createToScalaConverter(18, child18.dataType)
+      lazy val converter19 = createToScalaConverter(19, child19.dataType)
+      lazy val converter20 = createToScalaConverter(20, child20.dataType)
+      lazy val converter21 = createToScalaConverter(21, child21.dataType)
       (input: InternalRow) => {
         func(
           converter0(child0.eval(input)),
@@ -987,8 +1046,8 @@ case class ScalaUDF(
     val converterClassName = classOf[Any => Any].getName
 
     // The type converters for inputs and the result.
-    val converters: Array[Any => Any] = children.map { c =>
-      CatalystTypeConverters.createToScalaConverter(c.dataType)
+    val converters: Array[Any => Any] = children.zipWithIndex.map { case (c, i) =>
+      createToScalaConverter(i, c.dataType)
     }.toArray :+ CatalystTypeConverters.createToCatalystConverter(dataType)
     val convertersTerm = ctx.addReferenceObj("converters", converters, s"$converterClassName[]")
     val errorMsgTerm = ctx.addReferenceObj("errMsg", udfErrorMessage)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 8451b9b..02472e1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, Sum}
@@ -326,20 +327,21 @@ class AnalysisSuite extends AnalysisTest with Matchers {
     }
 
     // non-primitive parameters do not need special null handling
-    val udf1 = ScalaUDF((s: String) => "x", StringType, string :: Nil, false :: Nil)
+    val udf1 = ScalaUDF((s: String) => "x", StringType, string :: Nil,
+      Option(ExpressionEncoder[String]()) :: Nil)
     val expected1 = udf1
     checkUDF(udf1, expected1)
 
     // only primitive parameter needs special null handling
     val udf2 = ScalaUDF((s: String, d: Double) => "x", StringType, string :: double :: Nil,
-      false :: true :: Nil)
+      Option(ExpressionEncoder[String]()) :: Option(ExpressionEncoder[Double]()) :: Nil)
     val expected2 =
       If(IsNull(double), nullResult, udf2.copy(children = string :: KnownNotNull(double) :: Nil))
     checkUDF(udf2, expected2)
 
     // special null handling should apply to all primitive parameters
     val udf3 = ScalaUDF((s: Short, d: Double) => "x", StringType, short :: double :: Nil,
-      true :: true :: Nil)
+      Option(ExpressionEncoder[Short]()) :: Option(ExpressionEncoder[Double]()) :: Nil)
     val expected3 = If(
       IsNull(short) || IsNull(double),
       nullResult,
@@ -351,7 +353,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
       (s: Short, d: Double) => "x",
       StringType,
       short :: nonNullableDouble :: Nil,
-      true :: true :: Nil)
+      Option(ExpressionEncoder[Short]()) :: Option(ExpressionEncoder[Double]()) :: Nil)
     val expected4 = If(
       IsNull(short),
       nullResult,
@@ -362,8 +364,12 @@ class AnalysisSuite extends AnalysisTest with Matchers {
   test("SPARK-24891 Fix HandleNullInputsForUDF rule") {
     val a = testRelation.output(0)
     val func = (x: Int, y: Int) => x + y
-    val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil, false :: false :: Nil)
-    val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil, false :: false :: Nil)
+    val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil,
+      Option(ExpressionEncoder[java.lang.Integer]()) ::
+        Option(ExpressionEncoder[java.lang.Integer]()) :: Nil)
+    val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil,
+      Option(ExpressionEncoder[java.lang.Integer]()) ::
+        Option(ExpressionEncoder[java.lang.Integer]()) :: Nil)
     val plan = Project(Alias(udf2, "")() :: Nil, testRelation)
     comparePlans(plan.analyze, plan.analyze.analyze)
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDFSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDFSuite.scala
index c5ffc38..836b2ea 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDFSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDFSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
 import java.util.Locale
 
 import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DecimalType, IntegerType, StringType}
@@ -27,10 +28,12 @@ import org.apache.spark.sql.types.{DecimalType, IntegerType, StringType}
 class ScalaUDFSuite extends SparkFunSuite with ExpressionEvalHelper {
 
   test("basic") {
-    val intUdf = ScalaUDF((i: Int) => i + 1, IntegerType, Literal(1) :: Nil, true :: Nil)
+    val intUdf = ScalaUDF((i: Int) => i + 1, IntegerType, Literal(1) :: Nil,
+      Option(ExpressionEncoder[Int]()) :: Nil)
     checkEvaluation(intUdf, 2)
 
-    val stringUdf = ScalaUDF((s: String) => s + "x", StringType, Literal("a") :: Nil, false :: Nil)
+    val stringUdf = ScalaUDF((s: String) => s + "x", StringType, Literal("a") :: Nil,
+      Option(ExpressionEncoder[String]()) :: Nil)
     checkEvaluation(stringUdf, "ax")
   }
 
@@ -39,7 +42,7 @@ class ScalaUDFSuite extends SparkFunSuite with ExpressionEvalHelper {
       (s: String) => s.toLowerCase(Locale.ROOT),
       StringType,
       Literal.create(null, StringType) :: Nil,
-      false :: Nil)
+      Option(ExpressionEncoder[String]()) :: Nil)
 
     val e1 = intercept[SparkException](udf.eval())
     assert(e1.getMessage.contains("Failed to execute user defined function"))
@@ -52,7 +55,8 @@ class ScalaUDFSuite extends SparkFunSuite with ExpressionEvalHelper {
 
   test("SPARK-22695: ScalaUDF should not use global variables") {
     val ctx = new CodegenContext
-    ScalaUDF((s: String) => s + "x", StringType, Literal("a") :: Nil, false :: Nil).genCode(ctx)
+    ScalaUDF((s: String) => s + "x", StringType, Literal("a") :: Nil,
+      Option(ExpressionEncoder[String]()) :: Nil).genCode(ctx)
     assert(ctx.inlinedMutableStates.isEmpty)
   }
 
@@ -61,7 +65,8 @@ class ScalaUDFSuite extends SparkFunSuite with ExpressionEvalHelper {
       val udf = ScalaUDF(
         (a: java.math.BigDecimal) => a.multiply(new java.math.BigDecimal(100)),
         DecimalType.SYSTEM_DEFAULT,
-        Literal(BigDecimal("12345678901234567890.123")) :: Nil, false :: Nil)
+        Literal(BigDecimal("12345678901234567890.123")) :: Nil,
+        Option(ExpressionEncoder[java.math.BigDecimal]()) :: Nil)
       val e1 = intercept[ArithmeticException](udf.eval())
       assert(e1.getMessage.contains("cannot be represented as Decimal"))
       val e2 = intercept[SparkException] {
@@ -73,7 +78,8 @@ class ScalaUDFSuite extends SparkFunSuite with ExpressionEvalHelper {
       val udf = ScalaUDF(
         (a: java.math.BigDecimal) => a.multiply(new java.math.BigDecimal(100)),
         DecimalType.SYSTEM_DEFAULT,
-        Literal(BigDecimal("12345678901234567890.123")) :: Nil, false :: Nil)
+        Literal(BigDecimal("12345678901234567890.123")) :: Nil,
+        Option(ExpressionEncoder[java.math.BigDecimal]()) :: Nil)
       checkEvaluation(udf, null)
     }
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index d9a6fbf..d7eb048 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -244,7 +245,8 @@ class EliminateSortsSuite extends PlanTest {
   }
 
   test("should not remove orderBy in groupBy clause with ScalaUDF as aggs") {
-    val scalaUdf = ScalaUDF((s: Int) => s, IntegerType, 'a :: Nil, true :: Nil)
+    val scalaUdf = ScalaUDF((s: Int) => s, IntegerType, 'a :: Nil,
+      Option(ExpressionEncoder[Int]()) :: Nil)
     val projectPlan = testRelation.select('a, 'b)
     val orderByPlan = projectPlan.orderBy('a.asc, 'b.desc)
     val groupByPlan = orderByPlan.groupBy('a)(scalaUdf)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index e72b2e9..f525970 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.{AliasIdentifier, FunctionIdentifier, InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.dsl.expressions.DslString
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin, SQLHelper}
@@ -594,7 +595,8 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
   }
 
   test("toJSON should not throws java.lang.StackOverflowError") {
-    val udf = ScalaUDF(SelfReferenceUDF(), BooleanType, Seq("col1".attr), false :: Nil)
+    val udf = ScalaUDF(SelfReferenceUDF(), BooleanType, Seq("col1".attr),
+      Option(ExpressionEncoder[String]()) :: Nil)
     // Should not throw java.lang.StackOverflowError
     udf.toJSON
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index 0f08e10..ced4af4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -125,7 +125,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
     (0 to 22).foreach { x =>
       val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"})
       val typeTags = (1 to x).map(i => s"A$i: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _)
-      val inputSchemas = (1 to x).foldRight("Nil")((i, s) => {s"Try(ScalaReflection.schemaFor[A$i]).toOption :: $s"})
+      val inputEncoders = (1 to x).foldRight("Nil")((i, s) => {s"Try(ExpressionEncoder[A$i]()).toOption :: $s"})
       println(s"""
         |/**
         | * Registers a deterministic Scala closure of $x arguments as user-defined function (UDF).
@@ -134,8 +134,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
         | */
         |def register[$typeTags](name: String, func: Function$x[$types]): UserDefinedFunction = {
         |  val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-        |  val inputSchemas: Seq[Option[ScalaReflection.Schema]] = $inputSchemas
-        |  val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+        |  val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = $inputEncoders
+        |  val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
         |  val finalUdf = if (nullable) udf else udf.asNonNullable()
         |  def builder(e: Seq[Expression]) = if (e.length == $x) {
         |    finalUdf.createScalaUDF(e)
@@ -163,7 +163,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
         |def register(name: String, f: UDF$i[$extTypeArgs], returnType: DataType): Unit = {
         |  val func = $funcCall
         |  def builder(e: Seq[Expression]) = if (e.length == $i) {
-        |    ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+        |    ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
         |  } else {
         |    throw new AnalysisException("Invalid number of arguments for function " + name +
         |      ". Expected: $i; Found: " + e.length)
@@ -180,8 +180,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag](name: String, func: Function0[RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Nil
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Nil
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 0) {
       finalUdf.createScalaUDF(e)
@@ -200,8 +200,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Nil
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 1) {
       finalUdf.createScalaUDF(e)
@@ -220,8 +220,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag](name: String, func: Function2[A1, A2, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Nil
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 2) {
       finalUdf.createScalaUDF(e)
@@ -240,8 +240,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](name: String, func: Function3[A1, A2, A3, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Nil
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 3) {
       finalUdf.createScalaUDF(e)
@@ -260,8 +260,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](name: String, func: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Nil
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 4) {
       finalUdf.createScalaUDF(e)
@@ -280,8 +280,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](name: String, func: Function5[A1, A2, A3, A4, A5, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Nil
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 5) {
       finalUdf.createScalaUDF(e)
@@ -300,8 +300,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](name: String, func: Function6[A1, A2, A3, A4, A5, A6, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Nil
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 6) {
       finalUdf.createScalaUDF(e)
@@ -320,8 +320,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](name: String, func: Function7[A1, A2, A3, A4, A5, A6, A7, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Nil
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 7) {
       finalUdf.createScalaUDF(e)
@@ -340,8 +340,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](name: String, func: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Nil
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 8) {
       finalUdf.createScalaUDF(e)
@@ -360,8 +360,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](name: String, func: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Try(ScalaReflection.schemaFor[A9]).toOption :: Nil
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 9) {
       finalUdf.createScalaUDF(e)
@@ -380,8 +380,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](name: String, func: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Try(ScalaReflection.schemaFor[A9]).toOption :: Try(Scala [...]
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Try(ExpressionEncoder[A10]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 10) {
       finalUdf.createScalaUDF(e)
@@ -400,8 +400,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag](name: String, func: Function11[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Try(ScalaReflection.schemaFor[A9]).toOption :: Try(Scala [...]
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Try(ExpressionEncoder[A10]()).toOption :: Try(ExpressionEncoder[ [...]
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 11) {
       finalUdf.createScalaUDF(e)
@@ -420,8 +420,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag](name: String, func: Function12[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Try(ScalaReflection.schemaFor[A9]).toOption :: Try(Scala [...]
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Try(ExpressionEncoder[A10]()).toOption :: Try(ExpressionEncoder[ [...]
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 12) {
       finalUdf.createScalaUDF(e)
@@ -440,8 +440,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag](name: String, func: Function13[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Try(ScalaReflection.schemaFor[A9]).toOption :: Try(Scala [...]
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Try(ExpressionEncoder[A10]()).toOption :: Try(ExpressionEncoder[ [...]
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 13) {
       finalUdf.createScalaUDF(e)
@@ -460,8 +460,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag](name: String, func: Function14[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Try(ScalaReflection.schemaFor[A9]).toOption :: Try(Scala [...]
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Try(ExpressionEncoder[A10]()).toOption :: Try(ExpressionEncoder[ [...]
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 14) {
       finalUdf.createScalaUDF(e)
@@ -480,8 +480,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag](name: String, func: Function15[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Try(ScalaReflection.schemaFor[A9]).toOption :: Try(Scala [...]
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Try(ExpressionEncoder[A10]()).toOption :: Try(ExpressionEncoder[ [...]
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 15) {
       finalUdf.createScalaUDF(e)
@@ -500,8 +500,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag](name: String, func: Function16[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Try(ScalaReflection.schemaFor[A9]).toOption :: Try(Scala [...]
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Try(ExpressionEncoder[A10]()).toOption :: Try(ExpressionEncoder[ [...]
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 16) {
       finalUdf.createScalaUDF(e)
@@ -520,8 +520,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag](name: String, func: Function17[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Try(ScalaReflection.schemaFor[A9]).toOption :: Try(Scala [...]
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Try(ExpressionEncoder[A10]()).toOption :: Try(ExpressionEncoder[ [...]
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 17) {
       finalUdf.createScalaUDF(e)
@@ -540,8 +540,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag](name: String, func: Function18[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Try(ScalaReflection.schemaFor[A9]).toOption :: Try(Scala [...]
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Try(ExpressionEncoder[A10]()).toOption :: Try(ExpressionEncoder[ [...]
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 18) {
       finalUdf.createScalaUDF(e)
@@ -560,8 +560,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag](name: String, func: Function19[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Try(ScalaReflection.schemaFor[A9]).toOption :: Try(Scala [...]
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Try(ExpressionEncoder[A10]()).toOption :: Try(ExpressionEncoder[ [...]
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 19) {
       finalUdf.createScalaUDF(e)
@@ -580,8 +580,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag](name: String, func: Function20[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Try(ScalaReflection.schemaFor[A9]).toOption :: Try(Scala [...]
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Try(ExpressionEncoder[A10]()).toOption :: Try(ExpressionEncoder[ [...]
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 20) {
       finalUdf.createScalaUDF(e)
@@ -600,8 +600,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag](name: String, func: Function21[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Try(ScalaReflection.schemaFor[A9]).toOption :: Try(Scala [...]
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Try(ExpressionEncoder[A10]()).toOption :: Try(ExpressionEncoder[ [...]
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 21) {
       finalUdf.createScalaUDF(e)
@@ -620,8 +620,8 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag, A22: TypeTag](name: String, func: Function22[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, A22, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1]).toOption :: Try(ScalaReflection.schemaFor[A2]).toOption :: Try(ScalaReflection.schemaFor[A3]).toOption :: Try(ScalaReflection.schemaFor[A4]).toOption :: Try(ScalaReflection.schemaFor[A5]).toOption :: Try(ScalaReflection.schemaFor[A6]).toOption :: Try(ScalaReflection.schemaFor[A7]).toOption :: Try(ScalaReflection.schemaFor[A8]).toOption :: Try(ScalaReflection.schemaFor[A9]).toOption :: Try(Scala [...]
-    val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
+    val inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Try(ExpressionEncoder[A10]()).toOption :: Try(ExpressionEncoder[ [...]
+    val udf = SparkUserDefinedFunction(func, dataType, inputEncoders).withName(name)
     val finalUdf = if (nullable) udf else udf.asNonNullable()
     def builder(e: Seq[Expression]) = if (e.length == 22) {
       finalUdf.createScalaUDF(e)
@@ -731,7 +731,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF0[_], returnType: DataType): Unit = {
     val func = () => f.asInstanceOf[UDF0[Any]].call()
     def builder(e: Seq[Expression]) = if (e.length == 0) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 0; Found: " + e.length)
@@ -746,7 +746,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF1[_, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any)
     def builder(e: Seq[Expression]) = if (e.length == 1) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 1; Found: " + e.length)
@@ -761,7 +761,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF2[_, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 2) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 2; Found: " + e.length)
@@ -776,7 +776,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF3[_, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 3) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 3; Found: " + e.length)
@@ -791,7 +791,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF4[_, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 4) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 4; Found: " + e.length)
@@ -806,7 +806,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF5[_, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 5) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 5; Found: " + e.length)
@@ -821,7 +821,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF6[_, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 6) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 6; Found: " + e.length)
@@ -836,7 +836,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF7[_, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 7) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 7; Found: " + e.length)
@@ -851,7 +851,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF8[_, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 8) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 8; Found: " + e.length)
@@ -866,7 +866,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF9[_, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 9) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 9; Found: " + e.length)
@@ -881,7 +881,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF10[_, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 10) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 10; Found: " + e.length)
@@ -896,7 +896,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF11[_, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 11) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 11; Found: " + e.length)
@@ -911,7 +911,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 12) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 12; Found: " + e.length)
@@ -926,7 +926,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 13) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 13; Found: " + e.length)
@@ -941,7 +941,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 14) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 14; Found: " + e.length)
@@ -956,7 +956,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 15) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 15; Found: " + e.length)
@@ -971,7 +971,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 16) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 16; Found: " + e.length)
@@ -986,7 +986,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 17) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 17; Found: " + e.length)
@@ -1001,7 +1001,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 18) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 18; Found: " + e.length)
@@ -1016,7 +1016,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 19) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 19; Found: " + e.length)
@@ -1031,7 +1031,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 20) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 20; Found: " + e.length)
@@ -1046,7 +1046,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 21) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 21; Found: " + e.length)
@@ -1061,7 +1061,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   def register(name: String, f: UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
     val func = f.asInstanceOf[UDF22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
     def builder(e: Seq[Expression]) = if (e.length == 22) {
-      ScalaUDF(func, returnType, e, e.map(_ => false), udfName = Some(name))
+      ScalaUDF(func, returnType, e, Nil, udfName = Some(name))
     } else {
       throw new AnalysisException("Invalid number of arguments for function " + name +
         ". Expected: 22; Found: " + e.length)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
index 50c4f6c..edb49d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
@@ -182,8 +182,7 @@ class DynamicPartitionDataWriter(
       val partitionName = ScalaUDF(
         ExternalCatalogUtils.getPartitionPathString _,
         StringType,
-        Seq(Literal(c.name), Cast(c, StringType, Option(description.timeZoneId))),
-        Seq(false, false))
+        Seq(Literal(c.name), Cast(c, StringType, Option(description.timeZoneId))))
       if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName)
     })
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
index c50168c..2ef6e3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
@@ -93,7 +93,7 @@ sealed abstract class UserDefinedFunction {
 private[spark] case class SparkUserDefinedFunction(
     f: AnyRef,
     dataType: DataType,
-    inputSchemas: Seq[Option[ScalaReflection.Schema]],
+    inputEncoders: Seq[Option[ExpressionEncoder[_]]] = Nil,
     name: Option[String] = None,
     nullable: Boolean = true,
     deterministic: Boolean = true) extends UserDefinedFunction {
@@ -104,18 +104,11 @@ private[spark] case class SparkUserDefinedFunction(
   }
 
   private[sql] def createScalaUDF(exprs: Seq[Expression]): ScalaUDF = {
-    // It's possible that some of the inputs don't have a specific type(e.g. `Any`),  skip type
-    // check.
-    val inputTypes = inputSchemas.map(_.map(_.dataType).getOrElse(AnyDataType))
-    // `ScalaReflection.Schema.nullable` is false iff the type is primitive. Also `Any` is not
-    // primitive.
-    val inputsPrimitive = inputSchemas.map(_.map(!_.nullable).getOrElse(false))
     ScalaUDF(
       f,
       dataType,
       exprs,
-      inputsPrimitive,
-      inputTypes,
+      inputEncoders,
       udfName = name,
       nullable = nullable,
       udfDeterministic = deterministic)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 11c04e5..cad7916 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -4282,7 +4282,7 @@ object functions {
   (0 to 10).foreach { x =>
     val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"})
     val typeTags = (1 to x).map(i => s"A$i: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _)
-    val inputSchemas = (1 to x).foldRight("Nil")((i, s) => {s"Try(ScalaReflection.schemaFor(typeTag[A$i])).toOption :: $s"})
+    val inputEncoders = (1 to x).foldRight("Nil")((i, s) => {s"Try(ExpressionEncoder[A$i]()).toOption :: $s"})
     println(s"""
       |/**
       | * Defines a Scala closure of $x arguments as user-defined function (UDF).
@@ -4295,8 +4295,8 @@ object functions {
       | */
       |def udf[$typeTags](f: Function$x[$types]): UserDefinedFunction = {
       |  val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-      |  val inputSchemas = $inputSchemas
-      |  val udf = SparkUserDefinedFunction(f, dataType, inputSchemas)
+      |  val inputEncoders = $inputEncoders
+      |  val udf = SparkUserDefinedFunction(f, dataType, inputEncoders)
       |  if (nullable) udf else udf.asNonNullable()
       |}""".stripMargin)
   }
@@ -4319,7 +4319,7 @@ object functions {
       | */
       |def udf(f: UDF$i[$extTypeArgs], returnType: DataType): UserDefinedFunction = {
       |  val func = $funcCall
-      |  SparkUserDefinedFunction(func, returnType, inputSchemas = Seq.fill($i)(None))
+      |  SparkUserDefinedFunction(func, returnType, inputEncoders = Seq.fill($i)(None))
       |}""".stripMargin)
   }
 
@@ -4401,8 +4401,8 @@ object functions {
    */
   def udf[RT: TypeTag](f: Function0[RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas = Nil
-    val udf = SparkUserDefinedFunction(f, dataType, inputSchemas)
+    val inputEncoders = Nil
+    val udf = SparkUserDefinedFunction(f, dataType, inputEncoders)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4417,8 +4417,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Nil
-    val udf = SparkUserDefinedFunction(f, dataType, inputSchemas)
+    val inputEncoders = Try(ExpressionEncoder[A1]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(f, dataType, inputEncoders)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4433,8 +4433,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Nil
-    val udf = SparkUserDefinedFunction(f, dataType, inputSchemas)
+    val inputEncoders = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(f, dataType, inputEncoders)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4449,8 +4449,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](f: Function3[A1, A2, A3, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A3])).toOption :: Nil
-    val udf = SparkUserDefinedFunction(f, dataType, inputSchemas)
+    val inputEncoders = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(f, dataType, inputEncoders)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4465,8 +4465,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](f: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A3])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A4])).toOption :: Nil
-    val udf = SparkUserDefinedFunction(f, dataType, inputSchemas)
+    val inputEncoders = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(f, dataType, inputEncoders)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4481,8 +4481,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](f: Function5[A1, A2, A3, A4, A5, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A3])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A4])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A5])).toOption :: Nil
-    val udf = SparkUserDefinedFunction(f, dataType, inputSchemas)
+    val inputEncoders = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(f, dataType, inputEncoders)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4497,8 +4497,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](f: Function6[A1, A2, A3, A4, A5, A6, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A3])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A4])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A5])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A6])).toOption :: Nil
-    val udf = SparkUserDefinedFunction(f, dataType, inputSchemas)
+    val inputEncoders = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(f, dataType, inputEncoders)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4513,8 +4513,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](f: Function7[A1, A2, A3, A4, A5, A6, A7, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A3])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A4])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A5])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A6])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A7])).toOption :: Nil
-    val udf = SparkUserDefinedFunction(f, dataType, inputSchemas)
+    val inputEncoders = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(f, dataType, inputEncoders)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4529,8 +4529,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](f: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A3])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A4])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A5])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A6])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A7])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A8])).toOption :: Nil
-    val udf = SparkUserDefinedFunction(f, dataType, inputSchemas)
+    val inputEncoders = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(f, dataType, inputEncoders)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4545,8 +4545,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](f: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A3])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A4])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A5])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A6])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A7])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A8])).toOption :: Try(ScalaReflection.s [...]
-    val udf = SparkUserDefinedFunction(f, dataType, inputSchemas)
+    val inputEncoders = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(f, dataType, inputEncoders)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4561,8 +4561,8 @@ object functions {
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](f: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction = {
     val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
-    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A3])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A4])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A5])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A6])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A7])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A8])).toOption :: Try(ScalaReflection.s [...]
-    val udf = SparkUserDefinedFunction(f, dataType, inputSchemas)
+    val inputEncoders = Try(ExpressionEncoder[A1]()).toOption :: Try(ExpressionEncoder[A2]()).toOption :: Try(ExpressionEncoder[A3]()).toOption :: Try(ExpressionEncoder[A4]()).toOption :: Try(ExpressionEncoder[A5]()).toOption :: Try(ExpressionEncoder[A6]()).toOption :: Try(ExpressionEncoder[A7]()).toOption :: Try(ExpressionEncoder[A8]()).toOption :: Try(ExpressionEncoder[A9]()).toOption :: Try(ExpressionEncoder[A10]()).toOption :: Nil
+    val udf = SparkUserDefinedFunction(f, dataType, inputEncoders)
     if (nullable) udf else udf.asNonNullable()
   }
 
@@ -4581,7 +4581,7 @@ object functions {
    */
   def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = {
     val func = () => f.asInstanceOf[UDF0[Any]].call()
-    SparkUserDefinedFunction(func, returnType, inputSchemas = Seq.fill(0)(None))
+    SparkUserDefinedFunction(func, returnType, inputEncoders = Seq.fill(0)(None))
   }
 
   /**
@@ -4595,7 +4595,7 @@ object functions {
    */
   def udf(f: UDF1[_, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any)
-    SparkUserDefinedFunction(func, returnType, inputSchemas = Seq.fill(1)(None))
+    SparkUserDefinedFunction(func, returnType, inputEncoders = Seq.fill(1)(None))
   }
 
   /**
@@ -4609,7 +4609,7 @@ object functions {
    */
   def udf(f: UDF2[_, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any)
-    SparkUserDefinedFunction(func, returnType, inputSchemas = Seq.fill(2)(None))
+    SparkUserDefinedFunction(func, returnType, inputEncoders = Seq.fill(2)(None))
   }
 
   /**
@@ -4623,7 +4623,7 @@ object functions {
    */
   def udf(f: UDF3[_, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any)
-    SparkUserDefinedFunction(func, returnType, inputSchemas = Seq.fill(3)(None))
+    SparkUserDefinedFunction(func, returnType, inputEncoders = Seq.fill(3)(None))
   }
 
   /**
@@ -4637,7 +4637,7 @@ object functions {
    */
   def udf(f: UDF4[_, _, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any)
-    SparkUserDefinedFunction(func, returnType, inputSchemas = Seq.fill(4)(None))
+    SparkUserDefinedFunction(func, returnType, inputEncoders = Seq.fill(4)(None))
   }
 
   /**
@@ -4651,7 +4651,7 @@ object functions {
    */
   def udf(f: UDF5[_, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any)
-    SparkUserDefinedFunction(func, returnType, inputSchemas = Seq.fill(5)(None))
+    SparkUserDefinedFunction(func, returnType, inputEncoders = Seq.fill(5)(None))
   }
 
   /**
@@ -4665,7 +4665,7 @@ object functions {
    */
   def udf(f: UDF6[_, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
-    SparkUserDefinedFunction(func, returnType, inputSchemas = Seq.fill(6)(None))
+    SparkUserDefinedFunction(func, returnType, inputEncoders = Seq.fill(6)(None))
   }
 
   /**
@@ -4679,7 +4679,7 @@ object functions {
    */
   def udf(f: UDF7[_, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
-    SparkUserDefinedFunction(func, returnType, inputSchemas = Seq.fill(7)(None))
+    SparkUserDefinedFunction(func, returnType, inputEncoders = Seq.fill(7)(None))
   }
 
   /**
@@ -4693,7 +4693,7 @@ object functions {
    */
   def udf(f: UDF8[_, _, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
-    SparkUserDefinedFunction(func, returnType, inputSchemas = Seq.fill(8)(None))
+    SparkUserDefinedFunction(func, returnType, inputEncoders = Seq.fill(8)(None))
   }
 
   /**
@@ -4707,7 +4707,7 @@ object functions {
    */
   def udf(f: UDF9[_, _, _, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
-    SparkUserDefinedFunction(func, returnType, inputSchemas = Seq.fill(9)(None))
+    SparkUserDefinedFunction(func, returnType, inputEncoders = Seq.fill(9)(None))
   }
 
   /**
@@ -4721,7 +4721,7 @@ object functions {
    */
   def udf(f: UDF10[_, _, _, _, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
     val func = f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
-    SparkUserDefinedFunction(func, returnType, inputSchemas = Seq.fill(10)(None))
+    SparkUserDefinedFunction(func, returnType, inputEncoders = Seq.fill(10)(None))
   }
 
   // scalastyle:on parameter.number
@@ -4759,7 +4759,7 @@ object functions {
         s"caution."
       throw new AnalysisException(errorMsg)
     }
-    SparkUserDefinedFunction(f, dataType, inputSchemas = Nil)
+    SparkUserDefinedFunction(f, dataType, inputEncoders = Nil)
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
index 51150a1..4a4504a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
@@ -337,7 +337,7 @@ object IntegratedUDFTestUtils extends SQLHelper {
         input.toString
       },
       StringType,
-      inputSchemas = Seq.fill(1)(None),
+      inputEncoders = Seq.fill(1)(None),
       name = Some(name)) {
 
       override def apply(exprs: Column*): Column = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index e0857ed..08f41f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.test.SQLTestData._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.QueryExecutionListener
 
-
 private case class FunctionResult(f1: String, f2: String)
 
 class UDFSuite extends QueryTest with SharedSparkSession {
@@ -551,4 +550,47 @@ class UDFSuite extends QueryTest with SharedSparkSession {
     }
     assert(e.getMessage.contains("Invalid arguments for function cast"))
   }
+
+  test("only one case class parameter") {
+    val f = (d: TestData) => d.key * d.value.toInt
+    val myUdf = udf(f)
+    val df = Seq(("data", TestData(50, "2"))).toDF("col1", "col2")
+    checkAnswer(df.select(myUdf(Column("col2"))), Row(100) :: Nil)
+  }
+
+  test("one case class with primitive parameter") {
+    val f = (i: Int, p: TestData) => p.key * i
+    val myUdf = udf(f)
+    val df = Seq((2, TestData(50, "data"))).toDF("col1", "col2")
+    checkAnswer(df.select(myUdf(Column("col1"), Column("col2"))), Row(100) :: Nil)
+  }
+
+  test("multiple case class parameters") {
+    val f = (d1: TestData, d2: TestData) => d1.key * d2.key
+    val myUdf = udf(f)
+    val df = Seq((TestData(10, "d1"), TestData(50, "d2"))).toDF("col1", "col2")
+    checkAnswer(df.select(myUdf(Column("col1"), Column("col2"))), Row(500) :: Nil)
+  }
+
+  test("input case class parameter and return case class") {
+    val f = (d: TestData) => TestData(d.key * 2, "copy")
+    val myUdf = udf(f)
+    val df = Seq(("data", TestData(50, "d2"))).toDF("col1", "col2")
+    checkAnswer(df.select(myUdf(Column("col2"))), Row(Row(100, "copy")) :: Nil)
+  }
+
+  test("any and case class parameter") {
+    val f = (any: Any, d: TestData) => s"${any.toString}, ${d.value}"
+    val myUdf = udf(f)
+    val df = Seq(("Hello", TestData(50, "World"))).toDF("col1", "col2")
+    checkAnswer(df.select(myUdf(Column("col1"), Column("col2"))), Row("Hello, World") :: Nil)
+  }
+
+  test("nested case class parameter") {
+    val f = (y: Int, training: TrainingSales) => training.sales.year + y
+    val myUdf = udf(f)
+    val df = Seq((20, TrainingSales("training", CourseSales("course", 2000, 3.14))))
+      .toDF("col1", "col2")
+    checkAnswer(df.select(myUdf(Column("col1"), Column("col2"))), Row(2020) :: Nil)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org