You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/11/12 14:31:15 UTC

[spark] branch branch-3.2 updated: [SPARK-37702][SQL] Use AnalysisContext to track referred temp functions

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new b7ec10a  [SPARK-37702][SQL] Use AnalysisContext to track referred temp functions
b7ec10a is described below

commit b7ec10afec4bd8a53e0ec06f90be1216b44e9538
Author: Linhong Liu <li...@databricks.com>
AuthorDate: Fri Nov 12 22:20:02 2021 +0800

    [SPARK-37702][SQL] Use AnalysisContext to track referred temp functions
    
    This PR uses `AnalysisContext` to track the referred temp functions in order to fix a temp
    function resolution issue when it's registered with a `FunctionBuilder` and referred by a temp view.
    
    During temporary view creation, we need to collect all the temp functions and save them
    to the metadata. So that next time when resolving the view SQL text, the functions can be
    resolved correctly. But if the temp function is registered with a `FunctionBuilder`, it's not a
    `UserDefinedExpression` so it cannot be collected as a temp function. As a result, the next time
    when the analyzer resolves a temp view, the registered function couldn't be found.
    
    Example:
    ```scala
    val func = CatalogFunction(FunctionIdentifier("tempFunc", None), ...)
    val builder = (e: Seq[Expression]) => e.head
    spark.sessionState.catalog.registerFunction(func, Some(builder))
    sql("create temp view tv as select tempFunc(a, b) from values (1, 2) t(a, b)")
    sql("select * from tv").collect()
    ```
    
    bug-fix
    
    no
    
    newly added test cases.
    
    Closes #34546 from linhongliu-db/SPARK-37702-ver3.
    
    Authored-by: Linhong Liu <li...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 68a0ab5960e847e0fa1a59da0316d0c111574af4)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 21 ++++--
 .../sql/catalyst/catalog/SessionCatalog.scala      |  5 ++
 .../spark/sql/catalyst/plans/logical/Command.scala |  5 +-
 .../sql/catalyst/plans/logical/v2Commands.scala    |  8 +--
 .../apache/spark/sql/execution/command/views.scala | 82 ++++++++++++----------
 .../spark/sql/execution/datasources/ddl.scala      |  6 +-
 .../spark/sql/SparkSessionExtensionSuite.scala     | 10 +++
 .../spark/sql/execution/SQLViewTestSuite.scala     | 26 ++++++-
 .../execution/command/PlanResolutionSuite.scala    |  6 +-
 9 files changed, 111 insertions(+), 58 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index fa6b247..89c7b5f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -125,7 +125,11 @@ case class AnalysisContext(
     maxNestedViewDepth: Int = -1,
     relationCache: mutable.Map[Seq[String], LogicalPlan] = mutable.Map.empty,
     referredTempViewNames: Seq[Seq[String]] = Seq.empty,
-    referredTempFunctionNames: Seq[String] = Seq.empty,
+    // 1. If we are resolving a view, this field will be restored from the view metadata,
+    //    by calling `AnalysisContext.withAnalysisContext(viewDesc)`.
+    // 2. If we are not resolving a view, this field will be updated everytime the analyzer
+    //    lookup a temporary function. And export to the view metadata.
+    referredTempFunctionNames: mutable.Set[String] = mutable.Set.empty,
     outerPlan: Option[LogicalPlan] = None)
 
 object AnalysisContext {
@@ -152,11 +156,17 @@ object AnalysisContext {
       maxNestedViewDepth,
       originContext.relationCache,
       viewDesc.viewReferredTempViewNames,
-      viewDesc.viewReferredTempFunctionNames)
+      mutable.Set(viewDesc.viewReferredTempFunctionNames: _*))
     set(context)
     try f finally { set(originContext) }
   }
 
+  def withNewAnalysisContext[A](f: => A): A = {
+    val originContext = value.get()
+    reset()
+    try f finally { set(originContext) }
+  }
+
   def withOuterPlan[A](outerPlan: LogicalPlan)(f: => A): A = {
     val originContext = value.get()
     val context = originContext.copy(outerPlan = Some(outerPlan))
@@ -204,11 +214,8 @@ class Analyzer(override val catalogManager: CatalogManager)
   }
 
   override def execute(plan: LogicalPlan): LogicalPlan = {
-    AnalysisContext.reset()
-    try {
+    AnalysisContext.withNewAnalysisContext {
       executeSameContext(plan)
-    } finally {
-      AnalysisContext.reset()
     }
   }
 
@@ -3741,7 +3748,7 @@ class Analyzer(override val catalogManager: CatalogManager)
       _.containsPattern(COMMAND)) {
       case c: AnalysisOnlyCommand if c.resolved =>
         checkAnalysis(c)
-        c.markAsAnalyzed()
+        c.markAsAnalyzed(AnalysisContext.get)
     }
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 8bba6bd..a4694a6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1643,6 +1643,11 @@ class SessionCatalog(
       if (!isResolvingView ||
           !isTemporaryFunction(name) ||
           referredTempFunctionNames.contains(name.funcName)) {
+        // We are not resolving a view and the function is a temp one, add it to `AnalysisContext`,
+        // so during the view creation, we can save all referred temp functions to view metadata
+        if (!isResolvingView && isTemporaryFunction(name)) {
+          AnalysisContext.get.referredTempFunctionNames.add(name.funcName)
+        }
         // This function has been already loaded into the function registry.
         return registry.lookupFunction(name, children)
       }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
index 92b6b93..8e23c2e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
+import org.apache.spark.sql.catalyst.analysis.AnalysisContext
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, UnaryLike}
@@ -50,5 +51,7 @@ trait AnalysisOnlyCommand extends Command {
   def childrenToAnalyze: Seq[LogicalPlan]
   override final def children: Seq[LogicalPlan] = if (isAnalyzed) Nil else childrenToAnalyze
   override def innerChildren: Seq[QueryPlan[_]] = if (isAnalyzed) childrenToAnalyze else Nil
-  def markAsAnalyzed(): LogicalPlan
+  // After the analysis finished, we give the command a chance to update it's state based
+  // on the `AnalysisContext`
+  def markAsAnalyzed(analysisContext: AnalysisContext): LogicalPlan
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 195bb8c..06ca759 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, NamedRelation, PartitionSpec, UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
 import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
@@ -987,7 +987,7 @@ case class CacheTable(
 
   override def childrenToAnalyze: Seq[LogicalPlan] = table :: Nil
 
-  override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+  override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
 }
 
 /**
@@ -1008,7 +1008,7 @@ case class CacheTableAsSelect(
 
   override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil
 
-  override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+  override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
 }
 
 /**
@@ -1026,5 +1026,5 @@ case class UncacheTable(
 
   override def childrenToAnalyze: Seq[LogicalPlan] = table :: Nil
 
-  override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+  override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 2eb5d76..597547b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -24,10 +24,10 @@ import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, ViewType}
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression}
+import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, PersistedView, ViewType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
@@ -67,7 +67,9 @@ case class CreateViewCommand(
     allowExisting: Boolean,
     replace: Boolean,
     viewType: ViewType,
-    isAnalyzed: Boolean = false) extends RunnableCommand with AnalysisOnlyCommand {
+    isAnalyzed: Boolean = false,
+    referredTempFunctions: Seq[String] = Seq.empty)
+  extends RunnableCommand with AnalysisOnlyCommand {
 
   import ViewHelper._
 
@@ -80,7 +82,12 @@ case class CreateViewCommand(
   // `plan` needs to be analyzed, but shouldn't be optimized so that caching works correctly.
   override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil
 
-  def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+  def markAsAnalyzed(analysisContext: AnalysisContext): LogicalPlan = {
+    copy(
+      isAnalyzed = true,
+      // Collect the referred temporary functions from AnalysisContext
+      referredTempFunctions = analysisContext.referredTempFunctionNames.toSeq)
+  }
 
   if (viewType == PersistedView) {
     require(originalText.isDefined, "'originalText' must be provided to create permanent view")
@@ -119,7 +126,7 @@ case class CreateViewCommand(
 
     // When creating a permanent view, not allowed to reference temporary objects.
     // This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved)
-    verifyTemporaryObjectsNotExists(catalog, isTemporary, name, analyzedPlan)
+    verifyTemporaryObjectsNotExists(isTemporary, name, analyzedPlan, referredTempFunctions)
     verifyAutoGeneratedAliasesNotExists(analyzedPlan, isTemporary, name)
 
     if (viewType == LocalTempView) {
@@ -131,7 +138,8 @@ case class CreateViewCommand(
         catalog.getRawTempView,
         originalText,
         analyzedPlan,
-        aliasedPlan)
+        aliasedPlan,
+        referredTempFunctions)
       catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace)
     } else if (viewType == GlobalTempView) {
       val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
@@ -144,7 +152,8 @@ case class CreateViewCommand(
         catalog.getRawGlobalTempView,
         originalText,
         analyzedPlan,
-        aliasedPlan)
+        aliasedPlan,
+        referredTempFunctions)
       catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace)
     } else if (catalog.tableExists(name)) {
       val tableMetadata = catalog.getTableMetadata(name)
@@ -239,7 +248,9 @@ case class AlterViewAsCommand(
     name: TableIdentifier,
     originalText: String,
     query: LogicalPlan,
-    isAnalyzed: Boolean = false) extends RunnableCommand with AnalysisOnlyCommand {
+    isAnalyzed: Boolean = false,
+    referredTempFunctions: Seq[String] = Seq.empty)
+  extends RunnableCommand with AnalysisOnlyCommand {
 
   import ViewHelper._
 
@@ -251,11 +262,16 @@ case class AlterViewAsCommand(
 
   override def childrenToAnalyze: Seq[LogicalPlan] = query :: Nil
 
-  def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+  def markAsAnalyzed(analysisContext: AnalysisContext): LogicalPlan = {
+    copy(
+      isAnalyzed = true,
+      // Collect the referred temporary functions from AnalysisContext
+      referredTempFunctions = analysisContext.referredTempFunctionNames.toSeq)
+  }
 
   override def run(session: SparkSession): Seq[Row] = {
     val isTemporary = session.sessionState.catalog.isTempView(name)
-    verifyTemporaryObjectsNotExists(session.sessionState.catalog, isTemporary, name, query)
+    verifyTemporaryObjectsNotExists(isTemporary, name, query, referredTempFunctions)
     verifyAutoGeneratedAliasesNotExists(query, isTemporary, name)
     if (isTemporary) {
       alterTemporaryView(session, query)
@@ -279,7 +295,8 @@ case class AlterViewAsCommand(
       getRawTempView,
       Some(originalText),
       analyzedPlan,
-      aliasedPlan = analyzedPlan)
+      aliasedPlan = analyzedPlan,
+      referredTempFunctions)
     session.sessionState.catalog.alterTempViewDefinition(name, tableDefinition)
   }
 
@@ -549,18 +566,18 @@ object ViewHelper extends SQLConfHelper with Logging {
    * Permanent views are not allowed to reference temp objects, including temp function and views
    */
   def verifyTemporaryObjectsNotExists(
-      catalog: SessionCatalog,
       isTemporary: Boolean,
       name: TableIdentifier,
-      child: LogicalPlan): Unit = {
+      child: LogicalPlan,
+      referredTempFunctions: Seq[String]): Unit = {
     import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
     if (!isTemporary) {
-      val (tempViews, tempFunctions) = collectTemporaryObjects(catalog, child)
+      val tempViews = collectTemporaryViews(child)
       tempViews.foreach { nameParts =>
         throw QueryCompilationErrors.notAllowedToCreatePermanentViewByReferencingTempViewError(
           name, nameParts.quoted)
       }
-      tempFunctions.foreach { funcName =>
+      referredTempFunctions.foreach { funcName =>
         throw QueryCompilationErrors.notAllowedToCreatePermanentViewByReferencingTempFuncError(
           name, funcName)
       }
@@ -568,10 +585,9 @@ object ViewHelper extends SQLConfHelper with Logging {
   }
 
   /**
-   * Collect all temporary views and functions and return the identifiers separately.
+   * Collect all temporary views and return the identifiers separately.
    */
-  private def collectTemporaryObjects(
-      catalog: SessionCatalog, child: LogicalPlan): (Seq[Seq[String]], Seq[String]) = {
+  private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = {
     def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = {
       child.flatMap {
         case view: View if view.isTempView =>
@@ -583,20 +599,7 @@ object ViewHelper extends SQLConfHelper with Logging {
         })
       }.distinct
     }
-
-    def collectTempFunctions(child: LogicalPlan): Seq[String] = {
-      child.flatMap {
-        case plan =>
-          plan.expressions.flatMap(_.flatMap {
-            case e: SubqueryExpression => collectTempFunctions(e.plan)
-            case e: UserDefinedExpression
-                if catalog.isTemporaryFunction(FunctionIdentifier(e.name)) =>
-              Seq(e.name)
-            case _ => Seq.empty
-          })
-      }.distinct
-    }
-    (collectTempViews(child), collectTempFunctions(child))
+    collectTempViews(child)
   }
 
   /**
@@ -623,7 +626,8 @@ object ViewHelper extends SQLConfHelper with Logging {
       getRawTempView: String => Option[TemporaryViewRelation],
       originalText: Option[String],
       analyzedPlan: LogicalPlan,
-      aliasedPlan: LogicalPlan): TemporaryViewRelation = {
+      aliasedPlan: LogicalPlan,
+      referredTempFunctions: Seq[String]): TemporaryViewRelation = {
     val uncache = getRawTempView(name.table).map { r =>
       needsToUncache(r, aliasedPlan)
     }.getOrElse(false)
@@ -639,7 +643,8 @@ object ViewHelper extends SQLConfHelper with Logging {
           session,
           analyzedPlan,
           aliasedPlan.schema,
-          originalText.get))
+          originalText.get,
+          referredTempFunctions))
     } else {
       TemporaryViewRelation(
         prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan),
@@ -671,10 +676,11 @@ object ViewHelper extends SQLConfHelper with Logging {
       session: SparkSession,
       analyzedPlan: LogicalPlan,
       viewSchema: StructType,
-      originalText: String): CatalogTable = {
+      originalText: String,
+      tempFunctions: Seq[String]): CatalogTable = {
 
     val catalog = session.sessionState.catalog
-    val (tempViews, tempFunctions) = collectTemporaryObjects(catalog, analyzedPlan)
+    val tempViews = collectTemporaryViews(analyzedPlan)
     // TBLPROPERTIES is not allowed for temporary view, so we don't use it for
     // generating temporary view properties
     val newProperties = generateViewProperties(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 22b98da..f4470a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -107,7 +107,8 @@ case class CreateTempViewUsing(
         catalog.getRawGlobalTempView,
         originalText = None,
         analyzedPlan,
-        aliasedPlan = analyzedPlan)
+        aliasedPlan = analyzedPlan,
+        referredTempFunctions = Seq.empty)
       catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace)
     } else {
       val viewDefinition = createTemporaryViewRelation(
@@ -117,7 +118,8 @@ case class CreateTempViewUsing(
         catalog.getRawTempView,
         originalText = None,
         analyzedPlan,
-        aliasedPlan = analyzedPlan)
+        aliasedPlan = analyzedPlan,
+        referredTempFunctions = Seq.empty)
       catalog.createTempView(tableIdent.table, viewDefinition, replace)
     }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index b1c3fd5..f8a155e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -396,6 +396,16 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
         Nil)
     }
   }
+
+  test("SPARK-37202: temp view refers a inject function") {
+    val extensions = create { extensions =>
+      extensions.injectFunction(MyExtensions.myFunction)
+    }
+    withSession(extensions) { session =>
+      session.sql("CREATE TEMP VIEW v AS SELECT myFunction(a) FROM VALUES(1), (2) t(a)")
+      session.sql("SELECT * FROM v")
+    }
+  }
 }
 
 case class MyRule(spark: SparkSession) extends Rule[LogicalPlan] {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
index 8383d44..6ed9798 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
@@ -20,7 +20,9 @@ package org.apache.spark.sql.execution
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.Repartition
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.internal.SQLConf._
@@ -378,7 +380,25 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils {
   }
 }
 
-class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
+abstract class TempViewTestSuite extends SQLViewTestSuite {
+  test("SPARK-37202: temp view should capture the function registered by catalog API") {
+    val funcName = "tempFunc"
+    withUserDefinedFunction(funcName -> true) {
+      val catalogFunction = CatalogFunction(
+        FunctionIdentifier(funcName, None), "org.apache.spark.myFunc", Seq.empty)
+      val functionBuilder = (e: Seq[Expression]) => e.head
+      spark.sessionState.catalog.registerFunction(
+        catalogFunction, overrideIfExists = false, functionBuilder = Some(functionBuilder))
+      val query = s"SELECT $funcName(max(a), min(a)) FROM VALUES (1), (2), (3) t(a)"
+      val viewName = createView("tempView", query)
+      withView(viewName) {
+        checkViewOutput(viewName, sql(query).collect())
+      }
+    }
+  }
+}
+
+class LocalTempViewTestSuite extends TempViewTestSuite with SharedSparkSession {
   override protected def viewTypeString: String = "TEMPORARY VIEW"
   override protected def formattedViewName(viewName: String): String = viewName
   override protected def tableIdentifier(viewName: String): TableIdentifier = {
@@ -386,7 +406,7 @@ class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
   }
 }
 
-class GlobalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
+class GlobalTempViewTestSuite extends TempViewTestSuite with SharedSparkSession {
   private def db: String = spark.sharedState.globalTempViewManager.database
   override protected def viewTypeString: String = "GLOBAL TEMPORARY VIEW"
   override protected def formattedViewName(viewName: String): String = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 2c2f833..85ba14f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -26,7 +26,7 @@ import org.mockito.invocation.InvocationOnMock
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{AnsiCast, AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral}
 import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
@@ -2307,7 +2307,7 @@ class PlanResolutionSuite extends AnalysisTest {
     val cmdNotAnalyzed = DummyAnalysisOnlyCommand(isAnalyzed = false, childrenToAnalyze = Seq(null))
     assert(cmdNotAnalyzed.innerChildren.isEmpty)
     assert(cmdNotAnalyzed.children.length == 1)
-    val cmdAnalyzed = cmdNotAnalyzed.markAsAnalyzed()
+    val cmdAnalyzed = cmdNotAnalyzed.markAsAnalyzed(AnalysisContext.get)
     assert(cmdAnalyzed.innerChildren.length == 1)
     assert(cmdAnalyzed.children.isEmpty)
   }
@@ -2325,7 +2325,7 @@ object AsDataSourceV2Relation {
 case class DummyAnalysisOnlyCommand(
     isAnalyzed: Boolean,
     childrenToAnalyze: Seq[LogicalPlan]) extends AnalysisOnlyCommand {
-  override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+  override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
   override protected def withNewChildrenInternal(
       newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = {
     copy(childrenToAnalyze = newChildren)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org