You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/10/14 02:22:18 UTC

[spark] branch master updated: [SPARK-36868][SQL][FOLLOWUP] Move parsing Create Function from catalyst to core

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d9b4cc6  [SPARK-36868][SQL][FOLLOWUP] Move parsing Create Function from catalyst to core
d9b4cc6 is described below

commit d9b4cc65b89d8497dc4e315395f2c98cc4ac9327
Author: Huaxin Gao <hu...@apple.com>
AuthorDate: Thu Oct 14 10:21:29 2021 +0800

    [SPARK-36868][SQL][FOLLOWUP] Move parsing Create Function from catalyst to core
    
    ### What changes were proposed in this pull request?
    Move parsing Create Function from `AstBuilder` to `SparkSqlParser`
    
    ### Why are the changes needed?
    code simplification
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    existing tests
    
    Closes #34274 from huaxingao/create_function_followup.
    
    Authored-by: Huaxin Gao <hu...@apple.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 42 +-----------------
 .../sql/catalyst/plans/logical/v2Commands.scala    | 11 -----
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 40 +----------------
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 51 +++++++---------------
 .../spark/sql/execution/SparkSqlParser.scala       | 50 ++++++++++++++++++++-
 .../spark/sql/execution/command/functions.scala    |  5 ---
 .../sql/execution/command/DDLParserSuite.scala     | 40 +++++++++++++++++
 7 files changed, 107 insertions(+), 132 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 3bddc8e..b4232fb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, FunctionResource, FunctionResourceType}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -4384,46 +4384,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
       ctx.TEMPORARY != null)
   }
 
-  /**
-   * Create a [[CreateFunction]] command.
-   *
-   * For example:
-   * {{{
-   *   CREATE [OR REPLACE] [TEMPORARY] FUNCTION [IF NOT EXISTS] [db_name.]function_name
-   *   AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
-   * }}}
-   */
-  override def visitCreateFunction(ctx: CreateFunctionContext): LogicalPlan = withOrigin(ctx) {
-    val resources = ctx.resource.asScala.map { resource =>
-      val resourceType = resource.identifier.getText.toLowerCase(Locale.ROOT)
-      resourceType match {
-        case "jar" | "file" | "archive" =>
-          FunctionResource(FunctionResourceType.fromString(resourceType), string(resource.STRING))
-        case other =>
-          operationNotAllowed(s"CREATE FUNCTION with resource type '$resourceType'", ctx)
-      }
-    }
-
-    val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier)
-    if (ctx.TEMPORARY != null) {
-      CreateTempFunction(
-        functionIdentifier,
-        string(ctx.className),
-        resources.toSeq,
-        ctx.EXISTS != null,
-        ctx.REPLACE != null)
-    } else {
-      CreateFunction(
-        UnresolvedDBObjectName(
-          functionIdentifier,
-          isNamespace = false),
-        string(ctx.className),
-        resources.toSeq,
-        ctx.EXISTS != null,
-        ctx.REPLACE != null)
-    }
-  }
-
   override def visitRefreshFunction(ctx: RefreshFunctionContext): LogicalPlan = withOrigin(ctx) {
     val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier)
     RefreshFunction(UnresolvedFunc(functionIdentifier))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 91f5b7c..31fdb6c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -688,17 +688,6 @@ case class DescribeFunction(child: LogicalPlan, isExtended: Boolean) extends Una
 }
 
 /**
- * The logical plan of the CREATE TEMPORARY FUNCTION command.
- */
-case class CreateTempFunction(
-    nameParts: Seq[String],
-    className: String,
-    resources: Seq[FunctionResource],
-    ifExists: Boolean,
-    replace: Boolean) extends LeafCommand {
-}
-
-/**
  * The logical plan of the CREATE FUNCTION command.
  */
 case class CreateFunction(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index f8bd221..7bcc2b7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -21,7 +21,7 @@ import java.util.Locale
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
@@ -2273,44 +2273,6 @@ class DDLParserSuite extends AnalysisTest {
       DropFunction(UnresolvedFunc(Seq("a", "b", "c")), true, true))
   }
 
-  test("CREATE FUNCTION") {
-    parseCompare("CREATE FUNCTION a as 'fun'",
-      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun", Seq(), false, false))
-
-    parseCompare("CREATE FUNCTION a.b.c as 'fun'",
-      CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), false, false))
-
-    parseCompare("CREATE OR REPLACE FUNCTION a.b.c as 'fun'",
-      CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), false, true))
-
-    parseCompare("CREATE TEMPORARY FUNCTION a.b.c as 'fun'",
-      CreateTempFunction(Seq("a", "b", "c"), "fun", Seq(), false, false))
-
-    parseCompare("CREATE FUNCTION IF NOT EXISTS a.b.c as 'fun'",
-      CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), true, false))
-
-    parseCompare("CREATE FUNCTION a as 'fun' USING JAR 'j'",
-      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
-        Seq(FunctionResource(JarResource, "j")), false, false))
-
-    parseCompare("CREATE FUNCTION a as 'fun' USING ARCHIVE 'a'",
-      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
-        Seq(FunctionResource(ArchiveResource, "a")), false, false))
-
-    parseCompare("CREATE FUNCTION a as 'fun' USING FILE 'f'",
-      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
-        Seq(FunctionResource(FileResource, "f")), false, false))
-
-    parseCompare("CREATE FUNCTION a as 'fun' USING JAR 'j', ARCHIVE 'a', FILE 'f'",
-      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
-        Seq(FunctionResource(JarResource, "j"),
-        FunctionResource(ArchiveResource, "a"), FunctionResource(FileResource, "f")),
-        false, false))
-
-    intercept("CREATE FUNCTION a as 'fun' USING OTHER 'o'",
-      "Operation not allowed: CREATE FUNCTION with resource type 'other'")
-  }
-
   test("REFRESH FUNCTION") {
     parseCompare("REFRESH FUNCTION c",
       RefreshFunction(UnresolvedFunc(Seq("c"))))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index c77be4e..171e476 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, FunctionResource}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -433,14 +433,24 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
       val funcIdentifier = identifier.asFunctionIdentifier
       DropFunctionCommand(funcIdentifier.database, funcIdentifier.funcName, ifExists, isTemp)
 
-    case CreateTempFunction(nameParts, className, resources, ignoreIfExists, replace) =>
-      // temp func doesn't belong to any catalog and we shouldn't resolve catalog in the name.
-      convertToV1CreateFunction(nameParts, className, resources, true, ignoreIfExists, replace)
-
     case CreateFunction(ResolvedDBObjectName(catalog, nameParts),
         className, resources, ignoreIfExists, replace) =>
       if (isSessionCatalog(catalog)) {
-        convertToV1CreateFunction(nameParts, className, resources, false, ignoreIfExists, replace)
+        val database = if (nameParts.length > 2) {
+          throw QueryCompilationErrors.unsupportedFunctionNameError(nameParts.quoted)
+        } else if (nameParts.length == 2) {
+          Some(nameParts.head)
+        } else {
+          None
+        }
+        CreateFunctionCommand(
+          database,
+          nameParts.last,
+          className,
+          resources,
+          false,
+          ignoreIfExists,
+          replace)
       } else {
         throw QueryCompilationErrors.functionUnsupportedInV2CatalogError()
       }
@@ -451,35 +461,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
       RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName)
   }
 
-  private def convertToV1CreateFunction(
-      nameParts: Seq[String],
-      className: String,
-      resources: Seq[FunctionResource],
-      isTemp: Boolean,
-      ignoreIfExists: Boolean,
-      replace: Boolean) = {
-    val database = if (nameParts.length > 2) {
-      throw QueryCompilationErrors.unsupportedFunctionNameError(nameParts.quoted)
-    } else if (nameParts.length == 2) {
-      Some(nameParts.head)
-    } else {
-      None
-    }
-    CreateFunctionCommand(
-      database,
-      nameParts.last,
-      className,
-      resources,
-      isTemp,
-      ignoreIfExists,
-      replace)
-  }
-
-  private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match {
-    case SessionCatalogAndTable(_, tbl) => tbl
-    case _ => throw QueryCompilationErrors.sqlOnlySupportedWithV1TablesError(sql)
-  }
-
   private def getStorageFormatAndProvider(
       provider: Option[String],
       options: Map[String, String],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index bdf15f5b..50d99a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.DateTimeConstants
-import org.apache.spark.sql.errors.QueryParsingErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryParsingErrors}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
@@ -492,6 +492,54 @@ class SparkSqlAstBuilder extends AstBuilder {
     }
   }
 
+  /**
+   * Create a [[CreateFunctionCommand]].
+   *
+   * For example:
+   * {{{
+   *   CREATE [OR REPLACE] [TEMPORARY] FUNCTION [IF NOT EXISTS] [db_name.]function_name
+   *   AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
+   * }}}
+   */
+  override def visitCreateFunction(ctx: CreateFunctionContext): LogicalPlan = withOrigin(ctx) {
+    val resources = ctx.resource.asScala.map { resource =>
+      val resourceType = resource.identifier.getText.toLowerCase(Locale.ROOT)
+      resourceType match {
+        case "jar" | "file" | "archive" =>
+          FunctionResource(FunctionResourceType.fromString(resourceType), string(resource.STRING))
+        case other =>
+          operationNotAllowed(s"CREATE FUNCTION with resource type '$resourceType'", ctx)
+      }
+    }
+
+    val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier)
+    if (ctx.TEMPORARY == null) {
+      CreateFunction(
+        UnresolvedDBObjectName(
+          functionIdentifier,
+          isNamespace = false),
+        string(ctx.className),
+        resources.toSeq,
+        ctx.EXISTS != null,
+        ctx.REPLACE != null)
+    } else {
+      if (functionIdentifier.length > 2) {
+        throw QueryCompilationErrors.unsupportedFunctionNameError(functionIdentifier.quoted)
+      } else if (functionIdentifier.length == 2) {
+        // Temporary function names should not contain database prefix like "database.function"
+        throw QueryCompilationErrors.specifyingDBInCreateTempFuncError(functionIdentifier.head)
+      }
+      CreateFunctionCommand(
+        None,
+        functionIdentifier.last,
+        string(ctx.className),
+        resources.toSeq,
+        true,
+        ctx.EXISTS != null,
+        ctx.REPLACE != null)
+    }
+  }
+
   private def toStorageFormat(
       location: Option[String],
       maybeSerdeInfo: Option[SerdeInfo],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index ae9a77d..f39df38 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -66,11 +66,6 @@ case class CreateFunctionCommand(
     throw QueryCompilationErrors.defineTempFuncWithIfNotExistsError()
   }
 
-  // Temporary function names should not contain database prefix like "database.function"
-  if (databaseName.isDefined && isTemp) {
-    throw QueryCompilationErrors.specifyingDBInCreateTempFuncError(databaseName.get)
-  }
-
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
index 43557df..91aeb96 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
@@ -21,6 +21,7 @@ import java.util.Locale
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, UnresolvedAttribute, UnresolvedDBObjectName}
+import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, FileResource, FunctionResource, JarResource}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans
 import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan
@@ -425,6 +426,45 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
     intercept(sql2, "Found duplicate clauses: TBLPROPERTIES")
   }
 
+  test("CREATE FUNCTION") {
+    comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun'"),
+      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun", Seq(), false, false))
+
+    comparePlans(parser.parsePlan("CREATE FUNCTION a.b.c as 'fun'"),
+      CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), false, false))
+
+    comparePlans(parser.parsePlan("CREATE OR REPLACE FUNCTION a.b.c as 'fun'"),
+      CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), false, true))
+
+    comparePlans(parser.parsePlan("CREATE TEMPORARY FUNCTION a as 'fun'"),
+      CreateFunctionCommand(None, "a", "fun", Seq(), true, false, false))
+
+    comparePlans(parser.parsePlan("CREATE FUNCTION IF NOT EXISTS a.b.c as 'fun'"),
+      CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), true, false))
+
+    comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun' USING JAR 'j'"),
+      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+        Seq(FunctionResource(JarResource, "j")), false, false))
+
+    comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun' USING ARCHIVE 'a'"),
+      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+        Seq(FunctionResource(ArchiveResource, "a")), false, false))
+
+    comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun' USING FILE 'f'"),
+      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+        Seq(FunctionResource(FileResource, "f")), false, false))
+
+    comparePlans(
+      parser.parsePlan("CREATE FUNCTION a as 'fun' USING JAR 'j', ARCHIVE 'a', FILE 'f'"),
+      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+        Seq(FunctionResource(JarResource, "j"),
+          FunctionResource(ArchiveResource, "a"), FunctionResource(FileResource, "f")),
+        false, false))
+
+    intercept("CREATE FUNCTION a as 'fun' USING OTHER 'o'",
+      "Operation not allowed: CREATE FUNCTION with resource type 'other'")
+  }
+
   test("SPARK-32374: create temporary view with properties not allowed") {
     assertUnsupported(
       sql = """

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org