You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/11/12 14:21:08 UTC

[spark] branch master updated: [SPARK-37702][SQL] Use AnalysisContext to track referred temp functions

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 68a0ab5  [SPARK-37702][SQL] Use AnalysisContext to track referred temp functions
68a0ab5 is described below

commit 68a0ab5960e847e0fa1a59da0316d0c111574af4
Author: Linhong Liu <li...@databricks.com>
AuthorDate: Fri Nov 12 22:20:02 2021 +0800

    [SPARK-37702][SQL] Use AnalysisContext to track referred temp functions
    
    ### What changes were proposed in this pull request?
    This PR uses `AnalysisContext` to track the referred temp functions in order to fix a temp
    function resolution issue when it's registered with a `FunctionBuilder` and referred by a temp view.
    
    During temporary view creation, we need to collect all the temp functions and save them
    to the metadata. So that next time when resolving the view SQL text, the functions can be
    resolved correctly. But if the temp function is registered with a `FunctionBuilder`, it's not a
    `UserDefinedExpression` so it cannot be collected as a temp function. As a result, the next time
    when the analyzer resolves a temp view, the registered function couldn't be found.
    
    Example:
    ```scala
    val func = CatalogFunction(FunctionIdentifier("tempFunc", None), ...)
    val builder = (e: Seq[Expression]) => e.head
    spark.sessionState.catalog.registerFunction(func, Some(builder))
    sql("create temp view tv as select tempFunc(a, b) from values (1, 2) t(a, b)")
    sql("select * from tv").collect()
    ```
    
    ### Why are the changes needed?
    bug-fix
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    newly added test cases.
    
    Closes #34546 from linhongliu-db/SPARK-37702-ver3.
    
    Authored-by: Linhong Liu <li...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 21 ++++--
 .../sql/catalyst/catalog/SessionCatalog.scala      |  5 ++
 .../spark/sql/catalyst/plans/logical/Command.scala |  5 +-
 .../sql/catalyst/plans/logical/v2Commands.scala    |  8 +--
 .../apache/spark/sql/execution/command/views.scala | 82 ++++++++++++----------
 .../spark/sql/execution/datasources/ddl.scala      |  6 +-
 .../spark/sql/SparkSessionExtensionSuite.scala     | 10 +++
 .../spark/sql/execution/SQLViewTestSuite.scala     | 26 ++++++-
 .../execution/command/PlanResolutionSuite.scala    |  6 +-
 9 files changed, 111 insertions(+), 58 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 068886e..26d206f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -125,7 +125,11 @@ case class AnalysisContext(
     maxNestedViewDepth: Int = -1,
     relationCache: mutable.Map[Seq[String], LogicalPlan] = mutable.Map.empty,
     referredTempViewNames: Seq[Seq[String]] = Seq.empty,
-    referredTempFunctionNames: Seq[String] = Seq.empty,
+    // 1. If we are resolving a view, this field will be restored from the view metadata,
+    //    by calling `AnalysisContext.withAnalysisContext(viewDesc)`.
+    // 2. If we are not resolving a view, this field will be updated everytime the analyzer
+    //    lookup a temporary function. And export to the view metadata.
+    referredTempFunctionNames: mutable.Set[String] = mutable.Set.empty,
     outerPlan: Option[LogicalPlan] = None)
 
 object AnalysisContext {
@@ -152,11 +156,17 @@ object AnalysisContext {
       maxNestedViewDepth,
       originContext.relationCache,
       viewDesc.viewReferredTempViewNames,
-      viewDesc.viewReferredTempFunctionNames)
+      mutable.Set(viewDesc.viewReferredTempFunctionNames: _*))
     set(context)
     try f finally { set(originContext) }
   }
 
+  def withNewAnalysisContext[A](f: => A): A = {
+    val originContext = value.get()
+    reset()
+    try f finally { set(originContext) }
+  }
+
   def withOuterPlan[A](outerPlan: LogicalPlan)(f: => A): A = {
     val originContext = value.get()
     val context = originContext.copy(outerPlan = Some(outerPlan))
@@ -204,11 +214,8 @@ class Analyzer(override val catalogManager: CatalogManager)
   }
 
   override def execute(plan: LogicalPlan): LogicalPlan = {
-    AnalysisContext.reset()
-    try {
+    AnalysisContext.withNewAnalysisContext {
       executeSameContext(plan)
-    } finally {
-      AnalysisContext.reset()
     }
   }
 
@@ -3651,7 +3658,7 @@ class Analyzer(override val catalogManager: CatalogManager)
       _.containsPattern(COMMAND)) {
       case c: AnalysisOnlyCommand if c.resolved =>
         checkAnalysis(c)
-        c.markAsAnalyzed()
+        c.markAsAnalyzed(AnalysisContext.get)
     }
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 12a5cbc..471e52d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1605,6 +1605,11 @@ class SessionCatalog(
       if (!isResolvingView ||
           !isTemporaryFunction(name) ||
           referredTempFunctionNames.contains(name.funcName)) {
+        // We are not resolving a view and the function is a temp one, add it to `AnalysisContext`,
+        // so during the view creation, we can save all referred temp functions to view metadata
+        if (!isResolvingView && isTemporaryFunction(name)) {
+          AnalysisContext.get.referredTempFunctionNames.add(name.funcName)
+        }
         // This function has been already loaded into the function registry.
         return registry.lookupFunction(name, children)
       }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
index 92b6b93..8e23c2e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
+import org.apache.spark.sql.catalyst.analysis.AnalysisContext
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, UnaryLike}
@@ -50,5 +51,7 @@ trait AnalysisOnlyCommand extends Command {
   def childrenToAnalyze: Seq[LogicalPlan]
   override final def children: Seq[LogicalPlan] = if (isAnalyzed) Nil else childrenToAnalyze
   override def innerChildren: Seq[QueryPlan[_]] = if (isAnalyzed) childrenToAnalyze else Nil
-  def markAsAnalyzed(): LogicalPlan
+  // After the analysis finished, we give the command a chance to update it's state based
+  // on the `AnalysisContext`
+  def markAsAnalyzed(analysisContext: AnalysisContext): LogicalPlan
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 61d8a21..a2c2ae6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.catalyst.analysis.{FieldName, NamedRelation, PartitionSpec, UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.FunctionResource
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
@@ -1015,7 +1015,7 @@ case class CacheTable(
 
   override def childrenToAnalyze: Seq[LogicalPlan] = table :: Nil
 
-  override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+  override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
 }
 
 /**
@@ -1036,7 +1036,7 @@ case class CacheTableAsSelect(
 
   override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil
 
-  override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+  override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
 }
 
 /**
@@ -1054,7 +1054,7 @@ case class UncacheTable(
 
   override def childrenToAnalyze: Seq[LogicalPlan] = table :: Nil
 
-  override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+  override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index d987a81..1452871 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -24,10 +24,10 @@ import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, ViewType}
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression}
+import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, ViewType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
@@ -67,7 +67,9 @@ case class CreateViewCommand(
     allowExisting: Boolean,
     replace: Boolean,
     viewType: ViewType,
-    isAnalyzed: Boolean = false) extends RunnableCommand with AnalysisOnlyCommand {
+    isAnalyzed: Boolean = false,
+    referredTempFunctions: Seq[String] = Seq.empty)
+  extends RunnableCommand with AnalysisOnlyCommand {
 
   import ViewHelper._
 
@@ -80,7 +82,12 @@ case class CreateViewCommand(
   // `plan` needs to be analyzed, but shouldn't be optimized so that caching works correctly.
   override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil
 
-  def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+  def markAsAnalyzed(analysisContext: AnalysisContext): LogicalPlan = {
+    copy(
+      isAnalyzed = true,
+      // Collect the referred temporary functions from AnalysisContext
+      referredTempFunctions = analysisContext.referredTempFunctionNames.toSeq)
+  }
 
   private def isTemporary = viewType == LocalTempView || viewType == GlobalTempView
 
@@ -100,7 +107,7 @@ case class CreateViewCommand(
 
     // When creating a permanent view, not allowed to reference temporary objects.
     // This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved)
-    verifyTemporaryObjectsNotExists(catalog, isTemporary, name, analyzedPlan)
+    verifyTemporaryObjectsNotExists(isTemporary, name, analyzedPlan, referredTempFunctions)
     verifyAutoGeneratedAliasesNotExists(analyzedPlan, isTemporary, name)
 
     if (viewType == LocalTempView) {
@@ -112,7 +119,8 @@ case class CreateViewCommand(
         catalog.getRawTempView,
         originalText,
         analyzedPlan,
-        aliasedPlan)
+        aliasedPlan,
+        referredTempFunctions)
       catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace)
     } else if (viewType == GlobalTempView) {
       val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
@@ -125,7 +133,8 @@ case class CreateViewCommand(
         catalog.getRawGlobalTempView,
         originalText,
         analyzedPlan,
-        aliasedPlan)
+        aliasedPlan,
+        referredTempFunctions)
       catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace)
     } else if (catalog.tableExists(name)) {
       val tableMetadata = catalog.getTableMetadata(name)
@@ -220,7 +229,9 @@ case class AlterViewAsCommand(
     name: TableIdentifier,
     originalText: String,
     query: LogicalPlan,
-    isAnalyzed: Boolean = false) extends RunnableCommand with AnalysisOnlyCommand {
+    isAnalyzed: Boolean = false,
+    referredTempFunctions: Seq[String] = Seq.empty)
+  extends RunnableCommand with AnalysisOnlyCommand {
 
   import ViewHelper._
 
@@ -232,11 +243,16 @@ case class AlterViewAsCommand(
 
   override def childrenToAnalyze: Seq[LogicalPlan] = query :: Nil
 
-  def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+  def markAsAnalyzed(analysisContext: AnalysisContext): LogicalPlan = {
+    copy(
+      isAnalyzed = true,
+      // Collect the referred temporary functions from AnalysisContext
+      referredTempFunctions = analysisContext.referredTempFunctionNames.toSeq)
+  }
 
   override def run(session: SparkSession): Seq[Row] = {
     val isTemporary = session.sessionState.catalog.isTempView(name)
-    verifyTemporaryObjectsNotExists(session.sessionState.catalog, isTemporary, name, query)
+    verifyTemporaryObjectsNotExists(isTemporary, name, query, referredTempFunctions)
     verifyAutoGeneratedAliasesNotExists(query, isTemporary, name)
     if (isTemporary) {
       alterTemporaryView(session, query)
@@ -260,7 +276,8 @@ case class AlterViewAsCommand(
       getRawTempView,
       Some(originalText),
       analyzedPlan,
-      aliasedPlan = analyzedPlan)
+      aliasedPlan = analyzedPlan,
+      referredTempFunctions)
     session.sessionState.catalog.alterTempViewDefinition(name, tableDefinition)
   }
 
@@ -530,18 +547,18 @@ object ViewHelper extends SQLConfHelper with Logging {
    * Permanent views are not allowed to reference temp objects, including temp function and views
    */
   def verifyTemporaryObjectsNotExists(
-      catalog: SessionCatalog,
       isTemporary: Boolean,
       name: TableIdentifier,
-      child: LogicalPlan): Unit = {
+      child: LogicalPlan,
+      referredTempFunctions: Seq[String]): Unit = {
     import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
     if (!isTemporary) {
-      val (tempViews, tempFunctions) = collectTemporaryObjects(catalog, child)
+      val tempViews = collectTemporaryViews(child)
       tempViews.foreach { nameParts =>
         throw QueryCompilationErrors.notAllowedToCreatePermanentViewByReferencingTempViewError(
           name, nameParts.quoted)
       }
-      tempFunctions.foreach { funcName =>
+      referredTempFunctions.foreach { funcName =>
         throw QueryCompilationErrors.notAllowedToCreatePermanentViewByReferencingTempFuncError(
           name, funcName)
       }
@@ -549,10 +566,9 @@ object ViewHelper extends SQLConfHelper with Logging {
   }
 
   /**
-   * Collect all temporary views and functions and return the identifiers separately.
+   * Collect all temporary views and return the identifiers separately.
    */
-  private def collectTemporaryObjects(
-      catalog: SessionCatalog, child: LogicalPlan): (Seq[Seq[String]], Seq[String]) = {
+  private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = {
     def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = {
       child.flatMap {
         case view: View if view.isTempView =>
@@ -564,20 +580,7 @@ object ViewHelper extends SQLConfHelper with Logging {
         })
       }.distinct
     }
-
-    def collectTempFunctions(child: LogicalPlan): Seq[String] = {
-      child.flatMap {
-        case plan =>
-          plan.expressions.flatMap(_.flatMap {
-            case e: SubqueryExpression => collectTempFunctions(e.plan)
-            case e: UserDefinedExpression
-                if catalog.isTemporaryFunction(FunctionIdentifier(e.name)) =>
-              Seq(e.name)
-            case _ => Seq.empty
-          })
-      }.distinct
-    }
-    (collectTempViews(child), collectTempFunctions(child))
+    collectTempViews(child)
   }
 
   /**
@@ -604,7 +607,8 @@ object ViewHelper extends SQLConfHelper with Logging {
       getRawTempView: String => Option[TemporaryViewRelation],
       originalText: Option[String],
       analyzedPlan: LogicalPlan,
-      aliasedPlan: LogicalPlan): TemporaryViewRelation = {
+      aliasedPlan: LogicalPlan,
+      referredTempFunctions: Seq[String]): TemporaryViewRelation = {
     val uncache = getRawTempView(name.table).map { r =>
       needsToUncache(r, aliasedPlan)
     }.getOrElse(false)
@@ -620,7 +624,8 @@ object ViewHelper extends SQLConfHelper with Logging {
           session,
           analyzedPlan,
           aliasedPlan.schema,
-          originalText.get))
+          originalText.get,
+          referredTempFunctions))
     } else {
       TemporaryViewRelation(
         prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan),
@@ -652,10 +657,11 @@ object ViewHelper extends SQLConfHelper with Logging {
       session: SparkSession,
       analyzedPlan: LogicalPlan,
       viewSchema: StructType,
-      originalText: String): CatalogTable = {
+      originalText: String,
+      tempFunctions: Seq[String]): CatalogTable = {
 
     val catalog = session.sessionState.catalog
-    val (tempViews, tempFunctions) = collectTemporaryObjects(catalog, analyzedPlan)
+    val tempViews = collectTemporaryViews(analyzedPlan)
     // TBLPROPERTIES is not allowed for temporary view, so we don't use it for
     // generating temporary view properties
     val newProperties = generateViewProperties(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 90463c0..9a81f2a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -115,7 +115,8 @@ case class CreateTempViewUsing(
         catalog.getRawGlobalTempView,
         originalText = None,
         analyzedPlan,
-        aliasedPlan = analyzedPlan)
+        aliasedPlan = analyzedPlan,
+        referredTempFunctions = Seq.empty)
       catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace)
     } else {
       val viewDefinition = createTemporaryViewRelation(
@@ -125,7 +126,8 @@ case class CreateTempViewUsing(
         catalog.getRawTempView,
         originalText = None,
         analyzedPlan,
-        aliasedPlan = analyzedPlan)
+        aliasedPlan = analyzedPlan,
+        referredTempFunctions = Seq.empty)
       catalog.createTempView(tableIdent.table, viewDefinition, replace)
     }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 74813b0..046714c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -396,6 +396,16 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
         Nil)
     }
   }
+
+  test("SPARK-37202: temp view refers a inject function") {
+    val extensions = create { extensions =>
+      extensions.injectFunction(MyExtensions.myFunction)
+    }
+    withSession(extensions) { session =>
+      session.sql("CREATE TEMP VIEW v AS SELECT myFunction(a) FROM VALUES(1), (2) t(a)")
+      session.sql("SELECT * FROM v")
+    }
+  }
 }
 
 case class MyRule(spark: SparkSession) extends Rule[LogicalPlan] {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
index 8383d44..6ed9798 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
@@ -20,7 +20,9 @@ package org.apache.spark.sql.execution
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.Repartition
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.internal.SQLConf._
@@ -378,7 +380,25 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils {
   }
 }
 
-class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
+abstract class TempViewTestSuite extends SQLViewTestSuite {
+  test("SPARK-37202: temp view should capture the function registered by catalog API") {
+    val funcName = "tempFunc"
+    withUserDefinedFunction(funcName -> true) {
+      val catalogFunction = CatalogFunction(
+        FunctionIdentifier(funcName, None), "org.apache.spark.myFunc", Seq.empty)
+      val functionBuilder = (e: Seq[Expression]) => e.head
+      spark.sessionState.catalog.registerFunction(
+        catalogFunction, overrideIfExists = false, functionBuilder = Some(functionBuilder))
+      val query = s"SELECT $funcName(max(a), min(a)) FROM VALUES (1), (2), (3) t(a)"
+      val viewName = createView("tempView", query)
+      withView(viewName) {
+        checkViewOutput(viewName, sql(query).collect())
+      }
+    }
+  }
+}
+
+class LocalTempViewTestSuite extends TempViewTestSuite with SharedSparkSession {
   override protected def viewTypeString: String = "TEMPORARY VIEW"
   override protected def formattedViewName(viewName: String): String = viewName
   override protected def tableIdentifier(viewName: String): TableIdentifier = {
@@ -386,7 +406,7 @@ class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
   }
 }
 
-class GlobalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
+class GlobalTempViewTestSuite extends TempViewTestSuite with SharedSparkSession {
   private def db: String = spark.sharedState.globalTempViewManager.database
   override protected def viewTypeString: String = "GLOBAL TEMPORARY VIEW"
   override protected def formattedViewName(viewName: String): String = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 2c2f833..85ba14f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -26,7 +26,7 @@ import org.mockito.invocation.InvocationOnMock
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{AnsiCast, AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral}
 import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
@@ -2307,7 +2307,7 @@ class PlanResolutionSuite extends AnalysisTest {
     val cmdNotAnalyzed = DummyAnalysisOnlyCommand(isAnalyzed = false, childrenToAnalyze = Seq(null))
     assert(cmdNotAnalyzed.innerChildren.isEmpty)
     assert(cmdNotAnalyzed.children.length == 1)
-    val cmdAnalyzed = cmdNotAnalyzed.markAsAnalyzed()
+    val cmdAnalyzed = cmdNotAnalyzed.markAsAnalyzed(AnalysisContext.get)
     assert(cmdAnalyzed.innerChildren.length == 1)
     assert(cmdAnalyzed.children.isEmpty)
   }
@@ -2325,7 +2325,7 @@ object AsDataSourceV2Relation {
 case class DummyAnalysisOnlyCommand(
     isAnalyzed: Boolean,
     childrenToAnalyze: Seq[LogicalPlan]) extends AnalysisOnlyCommand {
-  override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+  override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
   override protected def withNewChildrenInternal(
       newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = {
     copy(childrenToAnalyze = newChildren)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org