You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/08/22 20:01:38 UTC

spark git commit: [SPARK-21499][SQL] Support creating persistent function for Spark UDAF(UserDefinedAggregateFunction)

Repository: spark
Updated Branches:
  refs/heads/master 3ed1ae100 -> 43d71d965


[SPARK-21499][SQL] Support creating persistent function for Spark UDAF(UserDefinedAggregateFunction)

## What changes were proposed in this pull request?
This PR is to enable users to create persistent Scala UDAF (that extends UserDefinedAggregateFunction).

```SQL
CREATE FUNCTION myDoubleAvg AS 'test.org.apache.spark.sql.MyDoubleAvg'
```

Before this PR, Spark UDAF only can be registered through the API `spark.udf.register(...)`

## How was this patch tested?
Added test cases

Author: gatorsmile <ga...@gmail.com>

Closes #18700 from gatorsmile/javaUDFinScala.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43d71d96
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43d71d96
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43d71d96

Branch: refs/heads/master
Commit: 43d71d96596baa8d2111a4b20bf21c1c668ad793
Parents: 3ed1ae1
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Aug 22 13:01:35 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Aug 22 13:01:35 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   |  41 +++++++-
 .../test/resources/sql-tests/inputs/udaf.sql    |  13 +++
 .../resources/sql-tests/results/udaf.sql.out    |  54 ++++++++++
 .../spark/sql/hive/HiveSessionCatalog.scala     |  62 ++++++------
 .../sql/hive/execution/HiveUDAFSuite.scala      |  13 +++
 .../spark/sql/hive/execution/HiveUDFSuite.scala | 101 +++++++++++--------
 .../sql/hive/execution/SQLQuerySuite.scala      |  42 +-------
 7 files changed, 204 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 6030d90..0908d68 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+import java.lang.reflect.InvocationTargetException
 import java.net.URI
 import java.util.Locale
 import java.util.concurrent.Callable
@@ -24,6 +25,7 @@ import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
 import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
 
 import com.google.common.cache.{Cache, CacheBuilder}
 import org.apache.hadoop.conf.Configuration
@@ -39,7 +41,9 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
 import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
 
 object SessionCatalog {
   val DEFAULT_DATABASE = "default"
@@ -1075,13 +1079,33 @@ class SessionCatalog(
   // ----------------------------------------------------------------
 
   /**
-   * Construct a [[FunctionBuilder]] based on the provided class that represents a function.
+   * Constructs a [[FunctionBuilder]] based on the provided class that represents a function.
+   */
+  private def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
+    val clazz = Utils.classForName(functionClassName)
+    (input: Seq[Expression]) => makeFunctionExpression(name, clazz, input)
+  }
+
+  /**
+   * Constructs a [[Expression]] based on the provided class that represents a function.
    *
    * This performs reflection to decide what type of [[Expression]] to return in the builder.
    */
-  protected def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
-    // TODO: at least support UDAFs here
-    throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.")
+  protected def makeFunctionExpression(
+      name: String,
+      clazz: Class[_],
+      input: Seq[Expression]): Expression = {
+    val clsForUDAF =
+      Utils.classForName("org.apache.spark.sql.expressions.UserDefinedAggregateFunction")
+    if (clsForUDAF.isAssignableFrom(clazz)) {
+      val cls = Utils.classForName("org.apache.spark.sql.execution.aggregate.ScalaUDAF")
+      cls.getConstructor(classOf[Seq[Expression]], clsForUDAF, classOf[Int], classOf[Int])
+        .newInstance(input, clazz.newInstance().asInstanceOf[Object], Int.box(1), Int.box(1))
+        .asInstanceOf[Expression]
+    } else {
+      throw new AnalysisException(s"No handler for UDAF '${clazz.getCanonicalName}'. " +
+        s"Use sparkSession.udf.register(...) instead.")
+    }
   }
 
   /**
@@ -1105,7 +1129,14 @@ class SessionCatalog(
     }
     val info = new ExpressionInfo(funcDefinition.className, func.database.orNull, func.funcName)
     val builder =
-      functionBuilder.getOrElse(makeFunctionBuilder(func.unquotedString, funcDefinition.className))
+      functionBuilder.getOrElse {
+        val className = funcDefinition.className
+        if (!Utils.classIsLoadable(className)) {
+          throw new AnalysisException(s"Can not load class '$className' when registering " +
+            s"the function '$func', please make sure it is on the classpath")
+        }
+        makeFunctionBuilder(func.unquotedString, className)
+      }
     functionRegistry.registerFunction(func, info, builder)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/core/src/test/resources/sql-tests/inputs/udaf.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/udaf.sql b/sql/core/src/test/resources/sql-tests/inputs/udaf.sql
new file mode 100644
index 0000000..2183ba2
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/udaf.sql
@@ -0,0 +1,13 @@
+CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
+(1), (2), (3), (4)
+as t1(int_col1);
+
+CREATE FUNCTION myDoubleAvg AS 'test.org.apache.spark.sql.MyDoubleAvg';
+
+SELECT default.myDoubleAvg(int_col1) as my_avg from t1;
+
+SELECT default.myDoubleAvg(int_col1, 3) as my_avg from t1;
+
+CREATE FUNCTION udaf1 AS 'test.non.existent.udaf';
+
+SELECT default.udaf1(int_col1) as udaf1 from t1;

http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/core/src/test/resources/sql-tests/results/udaf.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/udaf.sql.out b/sql/core/src/test/resources/sql-tests/results/udaf.sql.out
new file mode 100644
index 0000000..4815a57
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/udaf.sql.out
@@ -0,0 +1,54 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 6
+
+
+-- !query 0
+CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
+(1), (2), (3), (4)
+as t1(int_col1)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+CREATE FUNCTION myDoubleAvg AS 'test.org.apache.spark.sql.MyDoubleAvg'
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+SELECT default.myDoubleAvg(int_col1) as my_avg from t1
+-- !query 2 schema
+struct<my_avg:double>
+-- !query 2 output
+102.5
+
+
+-- !query 3
+SELECT default.myDoubleAvg(int_col1, 3) as my_avg from t1
+-- !query 3 schema
+struct<>
+-- !query 3 output
+java.lang.AssertionError
+assertion failed: Incorrect number of children
+
+
+-- !query 4
+CREATE FUNCTION udaf1 AS 'test.non.existent.udaf'
+-- !query 4 schema
+struct<>
+-- !query 4 output
+
+
+
+-- !query 5
+SELECT default.udaf1(int_col1) as udaf1 from t1
+-- !query 5 schema
+struct<>
+-- !query 5 output
+org.apache.spark.sql.AnalysisException
+Can not load class 'test.non.existent.udaf' when registering the function 'default.udaf1', please make sure it is on the classpath; line 1 pos 7

http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 0d0269f..b352bf6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DecimalType, DoubleType}
-import org.apache.spark.util.Utils
 
 
 private[sql] class HiveSessionCatalog(
@@ -58,55 +57,52 @@ private[sql] class HiveSessionCatalog(
       parser,
       functionResourceLoader) {
 
-  override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = {
-    makeFunctionBuilder(funcName, Utils.classForName(className))
-  }
-
   /**
-   * Construct a [[FunctionBuilder]] based on the provided class that represents a function.
+   * Constructs a [[Expression]] based on the provided class that represents a function.
+   *
+   * This performs reflection to decide what type of [[Expression]] to return in the builder.
    */
-  private def makeFunctionBuilder(name: String, clazz: Class[_]): FunctionBuilder = {
-    // When we instantiate hive UDF wrapper class, we may throw exception if the input
-    // expressions don't satisfy the hive UDF, such as type mismatch, input number
-    // mismatch, etc. Here we catch the exception and throw AnalysisException instead.
-    (children: Seq[Expression]) => {
+  override def makeFunctionExpression(
+      name: String,
+      clazz: Class[_],
+      input: Seq[Expression]): Expression = {
+
+    Try(super.makeFunctionExpression(name, clazz, input)).getOrElse {
+      var udfExpr: Option[Expression] = None
       try {
+        // When we instantiate hive UDF wrapper class, we may throw exception if the input
+        // expressions don't satisfy the hive UDF, such as type mismatch, input number
+        // mismatch, etc. Here we catch the exception and throw AnalysisException instead.
         if (classOf[UDF].isAssignableFrom(clazz)) {
-          val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), children)
-          udf.dataType // Force it to check input data types.
-          udf
+          udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input))
+          udfExpr.get.dataType // Force it to check input data types.
         } else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
-          val udf = HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), children)
-          udf.dataType // Force it to check input data types.
-          udf
+          udfExpr = Some(HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), input))
+          udfExpr.get.dataType // Force it to check input data types.
         } else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
-          val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), children)
-          udaf.dataType // Force it to check input data types.
-          udaf
+          udfExpr = Some(HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), input))
+          udfExpr.get.dataType // Force it to check input data types.
         } else if (classOf[UDAF].isAssignableFrom(clazz)) {
-          val udaf = HiveUDAFFunction(
+          udfExpr = Some(HiveUDAFFunction(
             name,
             new HiveFunctionWrapper(clazz.getName),
-            children,
-            isUDAFBridgeRequired = true)
-          udaf.dataType  // Force it to check input data types.
-          udaf
+            input,
+            isUDAFBridgeRequired = true))
+          udfExpr.get.dataType // Force it to check input data types.
         } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
-          val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), children)
-          udtf.elementSchema // Force it to check input data types.
-          udtf
-        } else {
-          throw new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}'")
+          udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input))
+          udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it to check data types.
         }
       } catch {
-        case ae: AnalysisException =>
-          throw ae
         case NonFatal(e) =>
           val analysisException =
-            new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}': $e")
+            new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e")
           analysisException.setStackTrace(e.getStackTrace)
           throw analysisException
       }
+      udfExpr.getOrElse {
+        throw new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}'")
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala
index 479ca1e..8986fb5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel
 import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory}
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo
+import test.org.apache.spark.sql.MyDoubleAvg
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
@@ -86,6 +87,18 @@ class HiveUDAFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
     ))
   }
 
+  test("call JAVA UDAF") {
+    withTempView("temp") {
+      withUserDefinedFunction("myDoubleAvg" -> false) {
+        spark.range(1, 10).toDF("value").createOrReplaceTempView("temp")
+        sql(s"CREATE FUNCTION myDoubleAvg AS '${classOf[MyDoubleAvg].getName}'")
+        checkAnswer(
+          spark.sql("SELECT default.myDoubleAvg(value) as my_avg from temp"),
+          Row(105.0))
+      }
+    }
+  }
+
   test("non-deterministic children expressions of UDAF") {
     withTempView("view1") {
       spark.range(1).selectExpr("id as x", "id as y").createTempView("view1")

http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index cae338c..383d41f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -404,59 +404,34 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
   }
 
   test("Hive UDFs with insufficient number of input arguments should trigger an analysis error") {
-    Seq((1, 2)).toDF("a", "b").createOrReplaceTempView("testUDF")
+    withTempView("testUDF") {
+      Seq((1, 2)).toDF("a", "b").createOrReplaceTempView("testUDF")
+
+      def testErrorMsgForFunc(funcName: String, className: String): Unit = {
+        withUserDefinedFunction(funcName -> true) {
+          sql(s"CREATE TEMPORARY FUNCTION $funcName AS '$className'")
+          val message = intercept[AnalysisException] {
+            sql(s"SELECT $funcName() FROM testUDF")
+          }.getMessage
+          assert(message.contains(s"No handler for UDF/UDAF/UDTF '$className'"))
+        }
+      }
 
-    {
       // HiveSimpleUDF
-      sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'")
-      val message = intercept[AnalysisException] {
-        sql("SELECT testUDFTwoListList() FROM testUDF")
-      }.getMessage
-      assert(message.contains("No handler for Hive UDF"))
-      sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList")
-    }
+      testErrorMsgForFunc("testUDFTwoListList", classOf[UDFTwoListList].getName)
 
-    {
       // HiveGenericUDF
-      sql(s"CREATE TEMPORARY FUNCTION testUDFAnd AS '${classOf[GenericUDFOPAnd].getName}'")
-      val message = intercept[AnalysisException] {
-        sql("SELECT testUDFAnd() FROM testUDF")
-      }.getMessage
-      assert(message.contains("No handler for Hive UDF"))
-      sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFAnd")
-    }
+      testErrorMsgForFunc("testUDFAnd", classOf[GenericUDFOPAnd].getName)
 
-    {
       // Hive UDAF
-      sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS '${classOf[UDAFPercentile].getName}'")
-      val message = intercept[AnalysisException] {
-        sql("SELECT testUDAFPercentile(a) FROM testUDF GROUP BY b")
-      }.getMessage
-      assert(message.contains("No handler for Hive UDF"))
-      sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFPercentile")
-    }
+      testErrorMsgForFunc("testUDAFPercentile", classOf[UDAFPercentile].getName)
 
-    {
       // AbstractGenericUDAFResolver
-      sql(s"CREATE TEMPORARY FUNCTION testUDAFAverage AS '${classOf[GenericUDAFAverage].getName}'")
-      val message = intercept[AnalysisException] {
-        sql("SELECT testUDAFAverage() FROM testUDF GROUP BY b")
-      }.getMessage
-      assert(message.contains("No handler for Hive UDF"))
-      sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFAverage")
-    }
+      testErrorMsgForFunc("testUDAFAverage", classOf[GenericUDAFAverage].getName)
 
-    {
-      // Hive UDTF
-      sql(s"CREATE TEMPORARY FUNCTION testUDTFExplode AS '${classOf[GenericUDTFExplode].getName}'")
-      val message = intercept[AnalysisException] {
-        sql("SELECT testUDTFExplode() FROM testUDF")
-      }.getMessage
-      assert(message.contains("No handler for Hive UDF"))
-      sql("DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode")
+      // AbstractGenericUDAFResolver
+      testErrorMsgForFunc("testUDTFExplode", classOf[GenericUDTFExplode].getName)
     }
-
-    spark.catalog.dropTempView("testUDF")
   }
 
   test("Hive UDF in group by") {
@@ -621,6 +596,46 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
       }
     }
   }
+
+  test("UDTF") {
+    withUserDefinedFunction("udtf_count2" -> true) {
+      sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath}")
+      // The function source code can be found at:
+      // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
+      sql(
+        """
+          |CREATE TEMPORARY FUNCTION udtf_count2
+          |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+        """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT key, cc FROM src LATERAL VIEW udtf_count2(value) dd AS cc"),
+        Row(97, 500) :: Row(97, 500) :: Nil)
+
+      checkAnswer(
+        sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
+        Row(3) :: Row(3) :: Nil)
+    }
+  }
+
+  test("permanent UDTF") {
+    withUserDefinedFunction("udtf_count_temp" -> false) {
+      sql(
+        s"""
+           |CREATE FUNCTION udtf_count_temp
+           |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+           |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").toURI}'
+        """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT key, cc FROM src LATERAL VIEW udtf_count_temp(value) dd AS cc"),
+        Row(97, 500) :: Row(97, 500) :: Nil)
+
+      checkAnswer(
+        sql("SELECT udtf_count_temp(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
+        Row(3) :: Row(3) :: Nil)
+    }
+  }
 }
 
 class TestPair(x: Int, y: Int) extends Writable with Serializable {

http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index d0e0d20..02cfa02 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.TestUtils
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
@@ -98,46 +98,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     checkAnswer(query1, Row("x1_y1") :: Row("x2_y2") :: Nil)
   }
 
-  test("UDTF") {
-    withUserDefinedFunction("udtf_count2" -> true) {
-      sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}")
-      // The function source code can be found at:
-      // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
-      sql(
-        """
-          |CREATE TEMPORARY FUNCTION udtf_count2
-          |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
-        """.stripMargin)
-
-      checkAnswer(
-        sql("SELECT key, cc FROM src LATERAL VIEW udtf_count2(value) dd AS cc"),
-        Row(97, 500) :: Row(97, 500) :: Nil)
-
-      checkAnswer(
-        sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
-        Row(3) :: Row(3) :: Nil)
-    }
-  }
-
-  test("permanent UDTF") {
-    withUserDefinedFunction("udtf_count_temp" -> false) {
-      sql(
-        s"""
-          |CREATE FUNCTION udtf_count_temp
-          |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
-          |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").toURI}'
-        """.stripMargin)
-
-      checkAnswer(
-        sql("SELECT key, cc FROM src LATERAL VIEW udtf_count_temp(value) dd AS cc"),
-        Row(97, 500) :: Row(97, 500) :: Nil)
-
-      checkAnswer(
-        sql("SELECT udtf_count_temp(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
-        Row(3) :: Row(3) :: Nil)
-    }
-  }
-
   test("SPARK-6835: udtf in lateral view") {
     val df = Seq((1, 1)).toDF("c1", "c2")
     df.createOrReplaceTempView("table1")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org