You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/08/22 20:01:38 UTC
spark git commit: [SPARK-21499][SQL] Support creating persistent
function for Spark UDAF(UserDefinedAggregateFunction)
Repository: spark
Updated Branches:
refs/heads/master 3ed1ae100 -> 43d71d965
[SPARK-21499][SQL] Support creating persistent function for Spark UDAF(UserDefinedAggregateFunction)
## What changes were proposed in this pull request?
This PR is to enable users to create persistent Scala UDAF (that extends UserDefinedAggregateFunction).
```SQL
CREATE FUNCTION myDoubleAvg AS 'test.org.apache.spark.sql.MyDoubleAvg'
```
Before this PR, Spark UDAF only can be registered through the API `spark.udf.register(...)`
## How was this patch tested?
Added test cases
Author: gatorsmile <ga...@gmail.com>
Closes #18700 from gatorsmile/javaUDFinScala.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43d71d96
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43d71d96
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43d71d96
Branch: refs/heads/master
Commit: 43d71d96596baa8d2111a4b20bf21c1c668ad793
Parents: 3ed1ae1
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Aug 22 13:01:35 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Aug 22 13:01:35 2017 -0700
----------------------------------------------------------------------
.../sql/catalyst/catalog/SessionCatalog.scala | 41 +++++++-
.../test/resources/sql-tests/inputs/udaf.sql | 13 +++
.../resources/sql-tests/results/udaf.sql.out | 54 ++++++++++
.../spark/sql/hive/HiveSessionCatalog.scala | 62 ++++++------
.../sql/hive/execution/HiveUDAFSuite.scala | 13 +++
.../spark/sql/hive/execution/HiveUDFSuite.scala | 101 +++++++++++--------
.../sql/hive/execution/SQLQuerySuite.scala | 42 +-------
7 files changed, 204 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 6030d90..0908d68 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.catalog
+import java.lang.reflect.InvocationTargetException
import java.net.URI
import java.util.Locale
import java.util.concurrent.Callable
@@ -24,6 +25,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
import com.google.common.cache.{Cache, CacheBuilder}
import org.apache.hadoop.conf.Configuration
@@ -39,7 +41,9 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
object SessionCatalog {
val DEFAULT_DATABASE = "default"
@@ -1075,13 +1079,33 @@ class SessionCatalog(
// ----------------------------------------------------------------
/**
- * Construct a [[FunctionBuilder]] based on the provided class that represents a function.
+ * Constructs a [[FunctionBuilder]] based on the provided class that represents a function.
+ */
+ private def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
+ val clazz = Utils.classForName(functionClassName)
+ (input: Seq[Expression]) => makeFunctionExpression(name, clazz, input)
+ }
+
+ /**
+ * Constructs a [[Expression]] based on the provided class that represents a function.
*
* This performs reflection to decide what type of [[Expression]] to return in the builder.
*/
- protected def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
- // TODO: at least support UDAFs here
- throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.")
+ protected def makeFunctionExpression(
+ name: String,
+ clazz: Class[_],
+ input: Seq[Expression]): Expression = {
+ val clsForUDAF =
+ Utils.classForName("org.apache.spark.sql.expressions.UserDefinedAggregateFunction")
+ if (clsForUDAF.isAssignableFrom(clazz)) {
+ val cls = Utils.classForName("org.apache.spark.sql.execution.aggregate.ScalaUDAF")
+ cls.getConstructor(classOf[Seq[Expression]], clsForUDAF, classOf[Int], classOf[Int])
+ .newInstance(input, clazz.newInstance().asInstanceOf[Object], Int.box(1), Int.box(1))
+ .asInstanceOf[Expression]
+ } else {
+ throw new AnalysisException(s"No handler for UDAF '${clazz.getCanonicalName}'. " +
+ s"Use sparkSession.udf.register(...) instead.")
+ }
}
/**
@@ -1105,7 +1129,14 @@ class SessionCatalog(
}
val info = new ExpressionInfo(funcDefinition.className, func.database.orNull, func.funcName)
val builder =
- functionBuilder.getOrElse(makeFunctionBuilder(func.unquotedString, funcDefinition.className))
+ functionBuilder.getOrElse {
+ val className = funcDefinition.className
+ if (!Utils.classIsLoadable(className)) {
+ throw new AnalysisException(s"Can not load class '$className' when registering " +
+ s"the function '$func', please make sure it is on the classpath")
+ }
+ makeFunctionBuilder(func.unquotedString, className)
+ }
functionRegistry.registerFunction(func, info, builder)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/core/src/test/resources/sql-tests/inputs/udaf.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/udaf.sql b/sql/core/src/test/resources/sql-tests/inputs/udaf.sql
new file mode 100644
index 0000000..2183ba2
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/udaf.sql
@@ -0,0 +1,13 @@
+CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
+(1), (2), (3), (4)
+as t1(int_col1);
+
+CREATE FUNCTION myDoubleAvg AS 'test.org.apache.spark.sql.MyDoubleAvg';
+
+SELECT default.myDoubleAvg(int_col1) as my_avg from t1;
+
+SELECT default.myDoubleAvg(int_col1, 3) as my_avg from t1;
+
+CREATE FUNCTION udaf1 AS 'test.non.existent.udaf';
+
+SELECT default.udaf1(int_col1) as udaf1 from t1;
http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/core/src/test/resources/sql-tests/results/udaf.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/udaf.sql.out b/sql/core/src/test/resources/sql-tests/results/udaf.sql.out
new file mode 100644
index 0000000..4815a57
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/udaf.sql.out
@@ -0,0 +1,54 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 6
+
+
+-- !query 0
+CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
+(1), (2), (3), (4)
+as t1(int_col1)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+CREATE FUNCTION myDoubleAvg AS 'test.org.apache.spark.sql.MyDoubleAvg'
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+SELECT default.myDoubleAvg(int_col1) as my_avg from t1
+-- !query 2 schema
+struct<my_avg:double>
+-- !query 2 output
+102.5
+
+
+-- !query 3
+SELECT default.myDoubleAvg(int_col1, 3) as my_avg from t1
+-- !query 3 schema
+struct<>
+-- !query 3 output
+java.lang.AssertionError
+assertion failed: Incorrect number of children
+
+
+-- !query 4
+CREATE FUNCTION udaf1 AS 'test.non.existent.udaf'
+-- !query 4 schema
+struct<>
+-- !query 4 output
+
+
+
+-- !query 5
+SELECT default.udaf1(int_col1) as udaf1 from t1
+-- !query 5 schema
+struct<>
+-- !query 5 output
+org.apache.spark.sql.AnalysisException
+Can not load class 'test.non.existent.udaf' when registering the function 'default.udaf1', please make sure it is on the classpath; line 1 pos 7
http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 0d0269f..b352bf6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DecimalType, DoubleType}
-import org.apache.spark.util.Utils
private[sql] class HiveSessionCatalog(
@@ -58,55 +57,52 @@ private[sql] class HiveSessionCatalog(
parser,
functionResourceLoader) {
- override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = {
- makeFunctionBuilder(funcName, Utils.classForName(className))
- }
-
/**
- * Construct a [[FunctionBuilder]] based on the provided class that represents a function.
+ * Constructs a [[Expression]] based on the provided class that represents a function.
+ *
+ * This performs reflection to decide what type of [[Expression]] to return in the builder.
*/
- private def makeFunctionBuilder(name: String, clazz: Class[_]): FunctionBuilder = {
- // When we instantiate hive UDF wrapper class, we may throw exception if the input
- // expressions don't satisfy the hive UDF, such as type mismatch, input number
- // mismatch, etc. Here we catch the exception and throw AnalysisException instead.
- (children: Seq[Expression]) => {
+ override def makeFunctionExpression(
+ name: String,
+ clazz: Class[_],
+ input: Seq[Expression]): Expression = {
+
+ Try(super.makeFunctionExpression(name, clazz, input)).getOrElse {
+ var udfExpr: Option[Expression] = None
try {
+ // When we instantiate hive UDF wrapper class, we may throw exception if the input
+ // expressions don't satisfy the hive UDF, such as type mismatch, input number
+ // mismatch, etc. Here we catch the exception and throw AnalysisException instead.
if (classOf[UDF].isAssignableFrom(clazz)) {
- val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), children)
- udf.dataType // Force it to check input data types.
- udf
+ udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input))
+ udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
- val udf = HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), children)
- udf.dataType // Force it to check input data types.
- udf
+ udfExpr = Some(HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), input))
+ udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
- val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), children)
- udaf.dataType // Force it to check input data types.
- udaf
+ udfExpr = Some(HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), input))
+ udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[UDAF].isAssignableFrom(clazz)) {
- val udaf = HiveUDAFFunction(
+ udfExpr = Some(HiveUDAFFunction(
name,
new HiveFunctionWrapper(clazz.getName),
- children,
- isUDAFBridgeRequired = true)
- udaf.dataType // Force it to check input data types.
- udaf
+ input,
+ isUDAFBridgeRequired = true))
+ udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
- val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), children)
- udtf.elementSchema // Force it to check input data types.
- udtf
- } else {
- throw new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}'")
+ udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input))
+ udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it to check data types.
}
} catch {
- case ae: AnalysisException =>
- throw ae
case NonFatal(e) =>
val analysisException =
- new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}': $e")
+ new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e")
analysisException.setStackTrace(e.getStackTrace)
throw analysisException
}
+ udfExpr.getOrElse {
+ throw new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}'")
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala
index 479ca1e..8986fb5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory}
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo
+import test.org.apache.spark.sql.MyDoubleAvg
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
@@ -86,6 +87,18 @@ class HiveUDAFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
))
}
+ test("call JAVA UDAF") {
+ withTempView("temp") {
+ withUserDefinedFunction("myDoubleAvg" -> false) {
+ spark.range(1, 10).toDF("value").createOrReplaceTempView("temp")
+ sql(s"CREATE FUNCTION myDoubleAvg AS '${classOf[MyDoubleAvg].getName}'")
+ checkAnswer(
+ spark.sql("SELECT default.myDoubleAvg(value) as my_avg from temp"),
+ Row(105.0))
+ }
+ }
+ }
+
test("non-deterministic children expressions of UDAF") {
withTempView("view1") {
spark.range(1).selectExpr("id as x", "id as y").createTempView("view1")
http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index cae338c..383d41f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -404,59 +404,34 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
}
test("Hive UDFs with insufficient number of input arguments should trigger an analysis error") {
- Seq((1, 2)).toDF("a", "b").createOrReplaceTempView("testUDF")
+ withTempView("testUDF") {
+ Seq((1, 2)).toDF("a", "b").createOrReplaceTempView("testUDF")
+
+ def testErrorMsgForFunc(funcName: String, className: String): Unit = {
+ withUserDefinedFunction(funcName -> true) {
+ sql(s"CREATE TEMPORARY FUNCTION $funcName AS '$className'")
+ val message = intercept[AnalysisException] {
+ sql(s"SELECT $funcName() FROM testUDF")
+ }.getMessage
+ assert(message.contains(s"No handler for UDF/UDAF/UDTF '$className'"))
+ }
+ }
- {
// HiveSimpleUDF
- sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'")
- val message = intercept[AnalysisException] {
- sql("SELECT testUDFTwoListList() FROM testUDF")
- }.getMessage
- assert(message.contains("No handler for Hive UDF"))
- sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList")
- }
+ testErrorMsgForFunc("testUDFTwoListList", classOf[UDFTwoListList].getName)
- {
// HiveGenericUDF
- sql(s"CREATE TEMPORARY FUNCTION testUDFAnd AS '${classOf[GenericUDFOPAnd].getName}'")
- val message = intercept[AnalysisException] {
- sql("SELECT testUDFAnd() FROM testUDF")
- }.getMessage
- assert(message.contains("No handler for Hive UDF"))
- sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFAnd")
- }
+ testErrorMsgForFunc("testUDFAnd", classOf[GenericUDFOPAnd].getName)
- {
// Hive UDAF
- sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS '${classOf[UDAFPercentile].getName}'")
- val message = intercept[AnalysisException] {
- sql("SELECT testUDAFPercentile(a) FROM testUDF GROUP BY b")
- }.getMessage
- assert(message.contains("No handler for Hive UDF"))
- sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFPercentile")
- }
+ testErrorMsgForFunc("testUDAFPercentile", classOf[UDAFPercentile].getName)
- {
// AbstractGenericUDAFResolver
- sql(s"CREATE TEMPORARY FUNCTION testUDAFAverage AS '${classOf[GenericUDAFAverage].getName}'")
- val message = intercept[AnalysisException] {
- sql("SELECT testUDAFAverage() FROM testUDF GROUP BY b")
- }.getMessage
- assert(message.contains("No handler for Hive UDF"))
- sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFAverage")
- }
+ testErrorMsgForFunc("testUDAFAverage", classOf[GenericUDAFAverage].getName)
- {
- // Hive UDTF
- sql(s"CREATE TEMPORARY FUNCTION testUDTFExplode AS '${classOf[GenericUDTFExplode].getName}'")
- val message = intercept[AnalysisException] {
- sql("SELECT testUDTFExplode() FROM testUDF")
- }.getMessage
- assert(message.contains("No handler for Hive UDF"))
- sql("DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode")
+ // AbstractGenericUDAFResolver
+ testErrorMsgForFunc("testUDTFExplode", classOf[GenericUDTFExplode].getName)
}
-
- spark.catalog.dropTempView("testUDF")
}
test("Hive UDF in group by") {
@@ -621,6 +596,46 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
}
}
}
+
+ test("UDTF") {
+ withUserDefinedFunction("udtf_count2" -> true) {
+ sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath}")
+ // The function source code can be found at:
+ // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
+ sql(
+ """
+ |CREATE TEMPORARY FUNCTION udtf_count2
+ |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT key, cc FROM src LATERAL VIEW udtf_count2(value) dd AS cc"),
+ Row(97, 500) :: Row(97, 500) :: Nil)
+
+ checkAnswer(
+ sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
+ Row(3) :: Row(3) :: Nil)
+ }
+ }
+
+ test("permanent UDTF") {
+ withUserDefinedFunction("udtf_count_temp" -> false) {
+ sql(
+ s"""
+ |CREATE FUNCTION udtf_count_temp
+ |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+ |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").toURI}'
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT key, cc FROM src LATERAL VIEW udtf_count_temp(value) dd AS cc"),
+ Row(97, 500) :: Row(97, 500) :: Nil)
+
+ checkAnswer(
+ sql("SELECT udtf_count_temp(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
+ Row(3) :: Row(3) :: Nil)
+ }
+ }
}
class TestPair(x: Int, y: Int) extends Writable with Serializable {
http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index d0e0d20..02cfa02 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.TestUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
@@ -98,46 +98,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
checkAnswer(query1, Row("x1_y1") :: Row("x2_y2") :: Nil)
}
- test("UDTF") {
- withUserDefinedFunction("udtf_count2" -> true) {
- sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}")
- // The function source code can be found at:
- // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
- sql(
- """
- |CREATE TEMPORARY FUNCTION udtf_count2
- |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
- """.stripMargin)
-
- checkAnswer(
- sql("SELECT key, cc FROM src LATERAL VIEW udtf_count2(value) dd AS cc"),
- Row(97, 500) :: Row(97, 500) :: Nil)
-
- checkAnswer(
- sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
- Row(3) :: Row(3) :: Nil)
- }
- }
-
- test("permanent UDTF") {
- withUserDefinedFunction("udtf_count_temp" -> false) {
- sql(
- s"""
- |CREATE FUNCTION udtf_count_temp
- |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
- |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").toURI}'
- """.stripMargin)
-
- checkAnswer(
- sql("SELECT key, cc FROM src LATERAL VIEW udtf_count_temp(value) dd AS cc"),
- Row(97, 500) :: Row(97, 500) :: Nil)
-
- checkAnswer(
- sql("SELECT udtf_count_temp(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
- Row(3) :: Row(3) :: Nil)
- }
- }
-
test("SPARK-6835: udtf in lateral view") {
val df = Seq((1, 1)).toDF("c1", "c2")
df.createOrReplaceTempView("table1")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org