You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/10/14 02:22:18 UTC
[spark] branch master updated: [SPARK-36868][SQL][FOLLOWUP] Move
parsing Create Function from catalyst to core
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d9b4cc6 [SPARK-36868][SQL][FOLLOWUP] Move parsing Create Function from catalyst to core
d9b4cc6 is described below
commit d9b4cc65b89d8497dc4e315395f2c98cc4ac9327
Author: Huaxin Gao <hu...@apple.com>
AuthorDate: Thu Oct 14 10:21:29 2021 +0800
[SPARK-36868][SQL][FOLLOWUP] Move parsing Create Function from catalyst to core
### What changes were proposed in this pull request?
Move parsing Create Function from `AstBuilder` to `SparkSqlParser`
### Why are the changes needed?
code simplification
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
Closes #34274 from huaxingao/create_function_followup.
Authored-by: Huaxin Gao <hu...@apple.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/parser/AstBuilder.scala | 42 +-----------------
.../sql/catalyst/plans/logical/v2Commands.scala | 11 -----
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 40 +----------------
.../catalyst/analysis/ResolveSessionCatalog.scala | 51 +++++++---------------
.../spark/sql/execution/SparkSqlParser.scala | 50 ++++++++++++++++++++-
.../spark/sql/execution/command/functions.scala | 5 ---
.../sql/execution/command/DDLParserSuite.scala | 40 +++++++++++++++++
7 files changed, 107 insertions(+), 132 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 3bddc8e..b4232fb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, FunctionResource, FunctionResourceType}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -4384,46 +4384,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
ctx.TEMPORARY != null)
}
- /**
- * Create a [[CreateFunction]] command.
- *
- * For example:
- * {{{
- * CREATE [OR REPLACE] [TEMPORARY] FUNCTION [IF NOT EXISTS] [db_name.]function_name
- * AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
- * }}}
- */
- override def visitCreateFunction(ctx: CreateFunctionContext): LogicalPlan = withOrigin(ctx) {
- val resources = ctx.resource.asScala.map { resource =>
- val resourceType = resource.identifier.getText.toLowerCase(Locale.ROOT)
- resourceType match {
- case "jar" | "file" | "archive" =>
- FunctionResource(FunctionResourceType.fromString(resourceType), string(resource.STRING))
- case other =>
- operationNotAllowed(s"CREATE FUNCTION with resource type '$resourceType'", ctx)
- }
- }
-
- val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier)
- if (ctx.TEMPORARY != null) {
- CreateTempFunction(
- functionIdentifier,
- string(ctx.className),
- resources.toSeq,
- ctx.EXISTS != null,
- ctx.REPLACE != null)
- } else {
- CreateFunction(
- UnresolvedDBObjectName(
- functionIdentifier,
- isNamespace = false),
- string(ctx.className),
- resources.toSeq,
- ctx.EXISTS != null,
- ctx.REPLACE != null)
- }
- }
-
override def visitRefreshFunction(ctx: RefreshFunctionContext): LogicalPlan = withOrigin(ctx) {
val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier)
RefreshFunction(UnresolvedFunc(functionIdentifier))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 91f5b7c..31fdb6c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -688,17 +688,6 @@ case class DescribeFunction(child: LogicalPlan, isExtended: Boolean) extends Una
}
/**
- * The logical plan of the CREATE TEMPORARY FUNCTION command.
- */
-case class CreateTempFunction(
- nameParts: Seq[String],
- className: String,
- resources: Seq[FunctionResource],
- ifExists: Boolean,
- replace: Boolean) extends LeafCommand {
-}
-
-/**
* The logical plan of the CREATE FUNCTION command.
*/
case class CreateFunction(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index f8bd221..7bcc2b7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -21,7 +21,7 @@ import java.util.Locale
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
@@ -2273,44 +2273,6 @@ class DDLParserSuite extends AnalysisTest {
DropFunction(UnresolvedFunc(Seq("a", "b", "c")), true, true))
}
- test("CREATE FUNCTION") {
- parseCompare("CREATE FUNCTION a as 'fun'",
- CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun", Seq(), false, false))
-
- parseCompare("CREATE FUNCTION a.b.c as 'fun'",
- CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), false, false))
-
- parseCompare("CREATE OR REPLACE FUNCTION a.b.c as 'fun'",
- CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), false, true))
-
- parseCompare("CREATE TEMPORARY FUNCTION a.b.c as 'fun'",
- CreateTempFunction(Seq("a", "b", "c"), "fun", Seq(), false, false))
-
- parseCompare("CREATE FUNCTION IF NOT EXISTS a.b.c as 'fun'",
- CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), true, false))
-
- parseCompare("CREATE FUNCTION a as 'fun' USING JAR 'j'",
- CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
- Seq(FunctionResource(JarResource, "j")), false, false))
-
- parseCompare("CREATE FUNCTION a as 'fun' USING ARCHIVE 'a'",
- CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
- Seq(FunctionResource(ArchiveResource, "a")), false, false))
-
- parseCompare("CREATE FUNCTION a as 'fun' USING FILE 'f'",
- CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
- Seq(FunctionResource(FileResource, "f")), false, false))
-
- parseCompare("CREATE FUNCTION a as 'fun' USING JAR 'j', ARCHIVE 'a', FILE 'f'",
- CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
- Seq(FunctionResource(JarResource, "j"),
- FunctionResource(ArchiveResource, "a"), FunctionResource(FileResource, "f")),
- false, false))
-
- intercept("CREATE FUNCTION a as 'fun' USING OTHER 'o'",
- "Operation not allowed: CREATE FUNCTION with resource type 'other'")
- }
-
test("REFRESH FUNCTION") {
parseCompare("REFRESH FUNCTION c",
RefreshFunction(UnresolvedFunc(Seq("c"))))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index c77be4e..171e476 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, FunctionResource}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
@@ -433,14 +433,24 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
val funcIdentifier = identifier.asFunctionIdentifier
DropFunctionCommand(funcIdentifier.database, funcIdentifier.funcName, ifExists, isTemp)
- case CreateTempFunction(nameParts, className, resources, ignoreIfExists, replace) =>
- // temp func doesn't belong to any catalog and we shouldn't resolve catalog in the name.
- convertToV1CreateFunction(nameParts, className, resources, true, ignoreIfExists, replace)
-
case CreateFunction(ResolvedDBObjectName(catalog, nameParts),
className, resources, ignoreIfExists, replace) =>
if (isSessionCatalog(catalog)) {
- convertToV1CreateFunction(nameParts, className, resources, false, ignoreIfExists, replace)
+ val database = if (nameParts.length > 2) {
+ throw QueryCompilationErrors.unsupportedFunctionNameError(nameParts.quoted)
+ } else if (nameParts.length == 2) {
+ Some(nameParts.head)
+ } else {
+ None
+ }
+ CreateFunctionCommand(
+ database,
+ nameParts.last,
+ className,
+ resources,
+ false,
+ ignoreIfExists,
+ replace)
} else {
throw QueryCompilationErrors.functionUnsupportedInV2CatalogError()
}
@@ -451,35 +461,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName)
}
- private def convertToV1CreateFunction(
- nameParts: Seq[String],
- className: String,
- resources: Seq[FunctionResource],
- isTemp: Boolean,
- ignoreIfExists: Boolean,
- replace: Boolean) = {
- val database = if (nameParts.length > 2) {
- throw QueryCompilationErrors.unsupportedFunctionNameError(nameParts.quoted)
- } else if (nameParts.length == 2) {
- Some(nameParts.head)
- } else {
- None
- }
- CreateFunctionCommand(
- database,
- nameParts.last,
- className,
- resources,
- isTemp,
- ignoreIfExists,
- replace)
- }
-
- private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match {
- case SessionCatalogAndTable(_, tbl) => tbl
- case _ => throw QueryCompilationErrors.sqlOnlySupportedWithV1TablesError(sql)
- }
-
private def getStorageFormatAndProvider(
provider: Option[String],
options: Map[String, String],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index bdf15f5b..50d99a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.DateTimeConstants
-import org.apache.spark.sql.errors.QueryParsingErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryParsingErrors}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
@@ -492,6 +492,54 @@ class SparkSqlAstBuilder extends AstBuilder {
}
}
+ /**
+ * Create a [[CreateFunctionCommand]].
+ *
+ * For example:
+ * {{{
+ * CREATE [OR REPLACE] [TEMPORARY] FUNCTION [IF NOT EXISTS] [db_name.]function_name
+ * AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
+ * }}}
+ */
+ override def visitCreateFunction(ctx: CreateFunctionContext): LogicalPlan = withOrigin(ctx) {
+ val resources = ctx.resource.asScala.map { resource =>
+ val resourceType = resource.identifier.getText.toLowerCase(Locale.ROOT)
+ resourceType match {
+ case "jar" | "file" | "archive" =>
+ FunctionResource(FunctionResourceType.fromString(resourceType), string(resource.STRING))
+ case other =>
+ operationNotAllowed(s"CREATE FUNCTION with resource type '$resourceType'", ctx)
+ }
+ }
+
+ val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier)
+ if (ctx.TEMPORARY == null) {
+ CreateFunction(
+ UnresolvedDBObjectName(
+ functionIdentifier,
+ isNamespace = false),
+ string(ctx.className),
+ resources.toSeq,
+ ctx.EXISTS != null,
+ ctx.REPLACE != null)
+ } else {
+ if (functionIdentifier.length > 2) {
+ throw QueryCompilationErrors.unsupportedFunctionNameError(functionIdentifier.quoted)
+ } else if (functionIdentifier.length == 2) {
+ // Temporary function names should not contain database prefix like "database.function"
+ throw QueryCompilationErrors.specifyingDBInCreateTempFuncError(functionIdentifier.head)
+ }
+ CreateFunctionCommand(
+ None,
+ functionIdentifier.last,
+ string(ctx.className),
+ resources.toSeq,
+ true,
+ ctx.EXISTS != null,
+ ctx.REPLACE != null)
+ }
+ }
+
private def toStorageFormat(
location: Option[String],
maybeSerdeInfo: Option[SerdeInfo],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index ae9a77d..f39df38 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -66,11 +66,6 @@ case class CreateFunctionCommand(
throw QueryCompilationErrors.defineTempFuncWithIfNotExistsError()
}
- // Temporary function names should not contain database prefix like "database.function"
- if (databaseName.isDefined && isTemp) {
- throw QueryCompilationErrors.specifyingDBInCreateTempFuncError(databaseName.get)
- }
-
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
index 43557df..91aeb96 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
@@ -21,6 +21,7 @@ import java.util.Locale
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, UnresolvedAttribute, UnresolvedDBObjectName}
+import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans
import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan
@@ -425,6 +426,45 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
intercept(sql2, "Found duplicate clauses: TBLPROPERTIES")
}
+ test("CREATE FUNCTION") {
+ comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun'"),
+ CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun", Seq(), false, false))
+
+ comparePlans(parser.parsePlan("CREATE FUNCTION a.b.c as 'fun'"),
+ CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), false, false))
+
+ comparePlans(parser.parsePlan("CREATE OR REPLACE FUNCTION a.b.c as 'fun'"),
+ CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), false, true))
+
+ comparePlans(parser.parsePlan("CREATE TEMPORARY FUNCTION a as 'fun'"),
+ CreateFunctionCommand(None, "a", "fun", Seq(), true, false, false))
+
+ comparePlans(parser.parsePlan("CREATE FUNCTION IF NOT EXISTS a.b.c as 'fun'"),
+ CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), true, false))
+
+ comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun' USING JAR 'j'"),
+ CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+ Seq(FunctionResource(JarResource, "j")), false, false))
+
+ comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun' USING ARCHIVE 'a'"),
+ CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+ Seq(FunctionResource(ArchiveResource, "a")), false, false))
+
+ comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun' USING FILE 'f'"),
+ CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+ Seq(FunctionResource(FileResource, "f")), false, false))
+
+ comparePlans(
+ parser.parsePlan("CREATE FUNCTION a as 'fun' USING JAR 'j', ARCHIVE 'a', FILE 'f'"),
+ CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+ Seq(FunctionResource(JarResource, "j"),
+ FunctionResource(ArchiveResource, "a"), FunctionResource(FileResource, "f")),
+ false, false))
+
+ intercept("CREATE FUNCTION a as 'fun' USING OTHER 'o'",
+ "Operation not allowed: CREATE FUNCTION with resource type 'other'")
+ }
+
test("SPARK-32374: create temporary view with properties not allowed") {
assertUnsupported(
sql = """
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org