You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/01/12 05:18:33 UTC

[spark] branch master updated: [SPARK-37731][SQL] Refactor and cleanup function lookup in Analyzer

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 371ab5a  [SPARK-37731][SQL] Refactor and cleanup function lookup in Analyzer
371ab5a is described below

commit 371ab5a07c18cc456cc7ee5b8fa051d46e11b363
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Jan 12 13:17:32 2022 +0800

    [SPARK-37731][SQL] Refactor and cleanup function lookup in Analyzer
    
    ### What changes were proposed in this pull request?
    
    Today, the function lookup code path is pretty hard to understand as it spreads over many places:
    1. lookup v1 function
    2. lookup v2 function
    3. lookup higher-order function
    4. lookup table function
    5. lookup functions in different levels: builtin, temp and persistent.
    
    This PR is a major refactor of the function lookup code path and cleans it up quite a bit. In general, it follows the idea of table lookup:
    1. Analyzer looks up built-in or temp functions first.
    2. Analyzer qualifies the function name with current catalog and namespace, or with view catalog/namespace if we are resolving a view.
    3. Analyzer calls v1 sessin catalog if the catalog is `spark_catalog`, otherwise call the v2 catalog.
    
    With this refactor, the analyzer is kind of the router and the v1 session catalog can just have some small functions with very specific goals.
    
    The function DDL commands also follow similar table/view commands and can fail automatically if the command requires persistent functions but the resolved function is built-in/temp. After this change, it should be simpler to add v2 function DDL commands.
    
    Note that, table function lookup is still in its own rule as it has a dedicated function registry and doesn't share the namespace of the scalar functions.
    
    ### Why are the changes needed?
    
    code cleanup to improve readability.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Since Spark 3.3, DESCRIBE FUNCTION fails if the function does not exist. In Spark 3.2 or earlier, DESCRIBE FUNCTION can still run and print "Function: func_name not found".
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #35004 from cloud-fan/func.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 docs/sql-migration-guide.md                        |   2 +
 .../sql/connector/catalog/CatalogExtension.java    |   2 +-
 .../catalog/DelegatingCatalogExtension.java        |  29 +-
 .../sql/connector/catalog/FunctionCatalog.java     |  13 +
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 385 ++++++++++++---------
 .../sql/catalyst/analysis/CheckAnalysis.scala      |   4 +
 .../sql/catalyst/analysis/FunctionRegistry.scala   |  31 ++
 .../analysis/ResolveCommandsWithIfExists.scala     |   4 +-
 .../catalyst/analysis/higherOrderFunctions.scala   |  42 ---
 .../sql/catalyst/analysis/v2ResolutionPlans.scala  |  32 +-
 .../sql/catalyst/catalog/SessionCatalog.scala      | 310 ++++++++++-------
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  30 +-
 .../sql/catalyst/plans/logical/v2Commands.scala    |   3 +-
 .../sql/connector/catalog/CatalogV2Implicits.scala |   6 +-
 .../sql/connector/catalog/CatalogV2Util.scala      |  13 +-
 .../sql/connector/catalog/LookupCatalog.scala      |  49 +--
 .../spark/sql/errors/QueryCompilationErrors.scala  |  44 ++-
 .../spark/sql/errors/QueryParsingErrors.scala      |   6 +
 .../spark/sql/internal/connector/V1Function.scala  |  30 ++
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala |   2 +-
 .../catalyst/analysis/LookupFunctionsSuite.scala   |  98 ++++--
 .../sql/catalyst/catalog/SessionCatalogSuite.scala |  30 --
 .../spark/sql/catalyst/parser/DDLParserSuite.scala |  38 +-
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  45 ++-
 .../spark/sql/execution/SparkSqlParser.scala       |  34 +-
 .../spark/sql/execution/command/functions.scala    |  75 +---
 .../datasources/v2/V2SessionCatalog.scala          |  46 ++-
 .../sql-tests/results/ansi/interval.sql.out        |   8 +-
 .../sql-tests/results/inline-table.sql.out         |   2 +-
 .../resources/sql-tests/results/interval.sql.out   |   8 +-
 .../sql-tests/results/udf/udf-inline-table.sql.out |   2 +-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |   9 +-
 .../sql/connector/DataSourceV2FunctionSuite.scala  |  65 +++-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |  61 +---
 .../sql/execution/command/DDLParserSuite.scala     |  29 +-
 .../spark/sql/execution/command/DDLSuite.scala     |  37 +-
 .../datasources/v2/V2SessionCatalogSuite.scala     |   6 +-
 .../hive/execution/HiveCompatibilitySuite.scala    |   4 +
 .../spark/sql/hive/execution/HiveUDFSuite.scala    |   7 +-
 .../spark/sql/hive/execution/SQLQuerySuite.scala   |   8 +-
 40 files changed, 951 insertions(+), 698 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index d9659ca..5edf839 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -54,6 +54,8 @@ license: |
 
   - Since Spark 3.3, nulls are written as empty strings in CSV data source by default. In Spark 3.2 or earlier, nulls were written as empty strings as quoted empty strings, `""`. To restore the previous behavior, set `nullValue` to `""`.
 
+  - Since Spark 3.3, DESCRIBE FUNCTION fails if the function does not exist. In Spark 3.2 or earlier, DESCRIBE FUNCTION can still run and print "Function: func_name not found".
+
 ## Upgrading from Spark SQL 3.1 to 3.2
 
   - Since Spark 3.2, ADD FILE/JAR/ARCHIVE commands require each path to be enclosed by `"` or `'` if the path contains whitespaces.
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogExtension.java
index 155dca5..bd870c9 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogExtension.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogExtension.java
@@ -30,7 +30,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
  * @since 3.0.0
  */
 @Evolving
-public interface CatalogExtension extends TableCatalog, SupportsNamespaces {
+public interface CatalogExtension extends TableCatalog, FunctionCatalog, SupportsNamespaces {
 
   /**
    * This will be called only once by Spark to pass in the Spark built-in session catalog, after
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
index 359bc00..48a859a 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
@@ -20,10 +20,8 @@ package org.apache.spark.sql.connector.catalog;
 import java.util.Map;
 
 import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.*;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -161,11 +159,30 @@ public abstract class DelegatingCatalogExtension implements CatalogExtension {
     return asNamespaceCatalog().dropNamespace(namespace);
   }
 
+  @Override
+  public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
+    return asFunctionCatalog().loadFunction(ident);
+  }
+
+  @Override
+  public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException {
+    return asFunctionCatalog().listFunctions(namespace);
+  }
+
+  @Override
+  public boolean functionExists(Identifier ident) {
+    return asFunctionCatalog().functionExists(ident);
+  }
+
   private TableCatalog asTableCatalog() {
-    return (TableCatalog)delegate;
+    return (TableCatalog) delegate;
   }
 
   private SupportsNamespaces asNamespaceCatalog() {
-    return (SupportsNamespaces)delegate;
+    return (SupportsNamespaces) delegate;
+  }
+
+  private FunctionCatalog asFunctionCatalog() {
+    return (FunctionCatalog) delegate;
   }
 }
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/FunctionCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/FunctionCatalog.java
index ce725d1..de45590 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/FunctionCatalog.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/FunctionCatalog.java
@@ -50,4 +50,17 @@ public interface FunctionCatalog extends CatalogPlugin {
    */
   UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException;
 
+  /**
+   * Returns true if the function exists, false otherwise.
+   *
+   * @since 3.3.0
+   */
+  default boolean functionExists(Identifier ident) {
+    try {
+      loadFunction(ident);
+      return true;
+    } catch (NoSuchFunctionException e) {
+      return false;
+    }
+  }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e80854b..182e599 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -45,13 +45,14 @@ import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils}
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 import org.apache.spark.sql.connector.catalog.TableChange.{After, ColumnPosition}
-import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => V2AggregateFunction, BoundFunction, ScalarFunction}
+import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => V2AggregateFunction, BoundFunction, ScalarFunction, UnboundFunction}
 import org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME
 import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.{PartitionOverwriteMode, StoreAssignmentPolicy}
+import org.apache.spark.sql.internal.connector.V1Function
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.DayTimeIntervalType.DAY
 import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
@@ -75,7 +76,7 @@ object SimpleAnalyzer extends Analyzer(
   override def resolver: Resolver = caseSensitiveResolution
 }
 
-object FakeV2SessionCatalog extends TableCatalog {
+object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog {
   private def fail() = throw new UnsupportedOperationException
   override def listTables(namespace: Array[String]): Array[Identifier] = fail()
   override def loadTable(ident: Identifier): Table = {
@@ -91,6 +92,8 @@ object FakeV2SessionCatalog extends TableCatalog {
   override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = fail()
   override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = fail()
   override def name(): String = CatalogManager.SESSION_CATALOG_NAME
+  override def listFunctions(namespace: Array[String]): Array[Identifier] = fail()
+  override def loadFunction(ident: Identifier): UnboundFunction = fail()
 }
 
 /**
@@ -308,7 +311,6 @@ class Analyzer(override val catalogManager: CatalogManager)
       TimeWindowing ::
       SessionWindowing ::
       ResolveInlineTables ::
-      ResolveHigherOrderFunctions(catalogManager) ::
       ResolveLambdaVariables ::
       ResolveTimeZone ::
       ResolveRandomSeed ::
@@ -2005,42 +2007,33 @@ class Analyzer(override val catalogManager: CatalogManager)
    */
   object LookupFunctions extends Rule[LogicalPlan] {
     override def apply(plan: LogicalPlan): LogicalPlan = {
-      val externalFunctionNameSet = new mutable.HashSet[FunctionIdentifier]()
+      val externalFunctionNameSet = new mutable.HashSet[Seq[String]]()
+
       plan.resolveExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
-        case f @ UnresolvedFunction(AsFunctionIdentifier(ident), _, _, _, _) =>
-          if (externalFunctionNameSet.contains(normalizeFuncName(ident)) ||
-            v1SessionCatalog.isRegisteredFunction(ident)) {
-            f
-          } else if (v1SessionCatalog.isPersistentFunction(ident)) {
-            externalFunctionNameSet.add(normalizeFuncName(ident))
+        case f @ UnresolvedFunction(nameParts, _, _, _, _) =>
+          if (ResolveFunctions.lookupBuiltinOrTempFunction(nameParts).isDefined) {
             f
           } else {
-            withPosition(f) {
-              throw new NoSuchFunctionException(
-                ident.database.getOrElse(v1SessionCatalog.getCurrentDatabase),
-                ident.funcName)
+            val CatalogAndIdentifier(catalog, ident) = expandIdentifier(nameParts)
+            val fullName = normalizeFuncName(catalog.name +: ident.namespace :+ ident.name)
+            if (externalFunctionNameSet.contains(fullName)) {
+              f
+            } else if (catalog.asFunctionCatalog.functionExists(ident)) {
+              externalFunctionNameSet.add(fullName)
+              f
+            } else {
+              throw QueryCompilationErrors.noSuchFunctionError(nameParts, f, Some(fullName))
             }
           }
       }
     }
 
-    def normalizeFuncName(name: FunctionIdentifier): FunctionIdentifier = {
-      val funcName = if (conf.caseSensitiveAnalysis) {
-        name.funcName
+    def normalizeFuncName(name: Seq[String]): Seq[String] = {
+      if (conf.caseSensitiveAnalysis) {
+        name
       } else {
-        name.funcName.toLowerCase(Locale.ROOT)
-      }
-
-      val databaseName = name.database match {
-        case Some(a) => formatDatabaseName(a)
-        case None => v1SessionCatalog.getCurrentDatabase
+        name.map(_.toLowerCase(Locale.ROOT))
       }
-
-      FunctionIdentifier(funcName, Some(databaseName))
-    }
-
-    protected def formatDatabaseName(name: String): String = {
-      if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
     }
   }
 
@@ -2050,164 +2043,228 @@ class Analyzer(override val catalogManager: CatalogManager)
    */
   object ResolveFunctions extends Rule[LogicalPlan] {
     val trimWarningEnabled = new AtomicBoolean(true)
+
     def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
       _.containsAnyPattern(UNRESOLVED_FUNC, UNRESOLVED_FUNCTION, GENERATOR), ruleId) {
       // Resolve functions with concrete relations from v2 catalog.
-      case UnresolvedFunc(multipartIdent) =>
-        val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent)
-        ResolvedFunc(Identifier.of(funcIdent.database.toArray, funcIdent.funcName))
+      case u @ UnresolvedFunc(nameParts, cmd, requirePersistentFunc, mismatchHint, _) =>
+        lookupBuiltinOrTempFunction(nameParts).map { info =>
+          if (requirePersistentFunc) {
+            throw QueryCompilationErrors.expectPersistentFuncError(
+              nameParts.head, cmd, mismatchHint, u)
+          } else {
+            ResolvedNonPersistentFunc(nameParts.head, V1Function(info))
+          }
+        }.getOrElse {
+          val CatalogAndIdentifier(catalog, ident) = expandIdentifier(nameParts)
+          val fullName = catalog.name +: ident.namespace :+ ident.name
+          CatalogV2Util.loadFunction(catalog, ident).map { func =>
+            ResolvedPersistentFunc(catalog.asFunctionCatalog, ident, func)
+          }.getOrElse(u.copy(possibleQualifiedName = Some(fullName)))
+        }
 
       case q: LogicalPlan =>
         q.transformExpressionsWithPruning(
           _.containsAnyPattern(UNRESOLVED_FUNCTION, GENERATOR), ruleId) {
+          case u @ UnresolvedFunction(nameParts, arguments, _, _, _)
+              if hasLambdaAndResolvedArguments(arguments) => withPosition(u) {
+            resolveBuiltinOrTempFunction(nameParts, arguments, Some(u)).map {
+              case func: HigherOrderFunction => func
+              case other => other.failAnalysis(
+                "A lambda function should only be used in a higher order function. However, " +
+                  s"its class is ${other.getClass.getCanonicalName}, which is not a " +
+                  s"higher order function.")
+              // We don't support persistent high-order functions yet.
+            }.getOrElse(throw QueryCompilationErrors.noSuchFunctionError(nameParts, u))
+          }
+
           case u if !u.childrenResolved => u // Skip until children are resolved.
-          case u @ UnresolvedGenerator(name, children) =>
-            withPosition(u) {
-              v1SessionCatalog.lookupFunction(name, children) match {
-                case generator: Generator => generator
-                case other => throw QueryCompilationErrors.generatorNotExpectedError(
-                  name, other.getClass.getCanonicalName)
-              }
+
+          case u @ UnresolvedGenerator(name, arguments) => withPosition(u) {
+            resolveBuiltinOrTempFunction(name.asMultipart, arguments, None).getOrElse {
+              // For generator function, the parser only accepts v1 function name and creates
+              // `FunctionIdentifier`.
+              v1SessionCatalog.resolvePersistentFunction(name, arguments)
             }
+          }
 
-          case u @ UnresolvedFunction(AsFunctionIdentifier(ident), arguments, isDistinct, filter,
-            ignoreNulls) => withPosition(u) {
-              v1SessionCatalog.lookupFunction(ident, arguments) match {
-                // AggregateWindowFunctions are AggregateFunctions that can only be evaluated within
-                // the context of a Window clause. They do not need to be wrapped in an
-                // AggregateExpression.
-                case wf: AggregateWindowFunction =>
-                  if (isDistinct) {
-                    throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
-                      wf.prettyName, "DISTINCT")
-                  } else if (filter.isDefined) {
-                    throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
-                      wf.prettyName, "FILTER clause")
-                  } else if (ignoreNulls) {
-                    wf match {
-                      case nthValue: NthValue =>
-                        nthValue.copy(ignoreNulls = ignoreNulls)
-                      case _ =>
-                        throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
-                          wf.prettyName, "IGNORE NULLS")
-                    }
-                  } else {
-                    wf
-                  }
-                case owf: FrameLessOffsetWindowFunction =>
-                  if (isDistinct) {
-                    throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
-                      owf.prettyName, "DISTINCT")
-                  } else if (filter.isDefined) {
-                    throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
-                      owf.prettyName, "FILTER clause")
-                  } else if (ignoreNulls) {
-                    owf match {
-                      case lead: Lead =>
-                        lead.copy(ignoreNulls = ignoreNulls)
-                      case lag: Lag =>
-                        lag.copy(ignoreNulls = ignoreNulls)
-                    }
-                  } else {
-                    owf
-                  }
-                // We get an aggregate function, we need to wrap it in an AggregateExpression.
-                case agg: AggregateFunction =>
-                  if (filter.isDefined && !filter.get.deterministic) {
-                    throw QueryCompilationErrors.nonDeterministicFilterInAggregateError
-                  }
-                  if (ignoreNulls) {
-                    val aggFunc = agg match {
-                      case first: First => first.copy(ignoreNulls = ignoreNulls)
-                      case last: Last => last.copy(ignoreNulls = ignoreNulls)
-                      case _ =>
-                        throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
-                          agg.prettyName, "IGNORE NULLS")
-                    }
-                    AggregateExpression(aggFunc, Complete, isDistinct, filter)
-                  } else {
-                    AggregateExpression(agg, Complete, isDistinct, filter)
-                  }
-                // This function is not an aggregate function, just return the resolved one.
-                case other if isDistinct =>
-                  throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
-                    other.prettyName, "DISTINCT")
-                case other if filter.isDefined =>
-                  throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
-                    other.prettyName, "FILTER clause")
-                case other if ignoreNulls =>
-                  throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
-                    other.prettyName, "IGNORE NULLS")
-                case e: String2TrimExpression if arguments.size == 2 =>
-                  if (trimWarningEnabled.get) {
-                    log.warn("Two-parameter TRIM/LTRIM/RTRIM function signatures are deprecated." +
-                      " Use SQL syntax `TRIM((BOTH | LEADING | TRAILING)? trimStr FROM str)`" +
-                      " instead.")
-                    trimWarningEnabled.set(false)
-                  }
-                  e
-                case other =>
-                  other
+          case u @ UnresolvedFunction(nameParts, arguments, _, _, _) => withPosition(u) {
+            resolveBuiltinOrTempFunction(nameParts, arguments, Some(u)).getOrElse {
+              val CatalogAndIdentifier(catalog, ident) = expandIdentifier(nameParts)
+              if (CatalogV2Util.isSessionCatalog(catalog)) {
+                resolveV1Function(ident.asFunctionIdentifier, arguments, u)
+              } else {
+                resolveV2Function(catalog.asFunctionCatalog, ident, arguments, u)
               }
+            }
           }
+        }
+    }
 
-          case u @ UnresolvedFunction(nameParts, arguments, isDistinct, filter, ignoreNulls) =>
-            withPosition(u) {
-              expandIdentifier(nameParts) match {
-                case NonSessionCatalogAndIdentifier(catalog, ident) =>
-                  if (!catalog.isFunctionCatalog) {
-                    throw QueryCompilationErrors.lookupFunctionInNonFunctionCatalogError(
-                      ident, catalog)
-                  }
+    /**
+     * Check if the arguments of a function are either resolved or a lambda function.
+     */
+    private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = {
+      val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction])
+      lambdas.nonEmpty && others.forall(_.resolved)
+    }
 
-                  val unbound = catalog.asFunctionCatalog.loadFunction(ident)
-                  val inputType = StructType(arguments.zipWithIndex.map {
-                    case (exp, pos) => StructField(s"_$pos", exp.dataType, exp.nullable)
-                  })
-                  val bound = try {
-                    unbound.bind(inputType)
-                  } catch {
-                    case unsupported: UnsupportedOperationException =>
-                      throw QueryCompilationErrors.functionCannotProcessInputError(
-                        unbound, arguments, unsupported)
-                  }
+    def lookupBuiltinOrTempFunction(name: Seq[String]): Option[ExpressionInfo] = {
+      if (name.length == 1) {
+        v1SessionCatalog.lookupBuiltinOrTempFunction(name.head)
+      } else {
+        None
+      }
+    }
 
-                  if (bound.inputTypes().length != arguments.length) {
-                    throw QueryCompilationErrors.v2FunctionInvalidInputTypeLengthError(
-                      bound, arguments)
-                  }
+    private def resolveBuiltinOrTempFunction(
+        name: Seq[String],
+        arguments: Seq[Expression],
+        u: Option[UnresolvedFunction]): Option[Expression] = {
+      if (name.length == 1) {
+        v1SessionCatalog.resolveBuiltinOrTempFunction(name.head, arguments).map { func =>
+          if (u.isDefined) validateFunction(func, arguments.length, u.get) else func
+        }
+      } else {
+        None
+      }
+    }
 
-                  bound match {
-                    case scalarFunc: ScalarFunction[_] =>
-                      processV2ScalarFunction(scalarFunc, arguments, isDistinct,
-                        filter, ignoreNulls)
-                    case aggFunc: V2AggregateFunction[_, _] =>
-                      processV2AggregateFunction(aggFunc, arguments, isDistinct, filter,
-                        ignoreNulls)
-                    case _ =>
-                      failAnalysis(s"Function '${bound.name()}' does not implement ScalarFunction" +
-                        s" or AggregateFunction")
-                  }
+    private def resolveV1Function(
+        ident: FunctionIdentifier,
+        arguments: Seq[Expression],
+        u: UnresolvedFunction): Expression = {
+      val func = v1SessionCatalog.resolvePersistentFunction(ident, arguments)
+      validateFunction(func, arguments.length, u)
+    }
 
-                case _ => u
-              }
+    private def validateFunction(
+        func: Expression,
+        numArgs: Int,
+        u: UnresolvedFunction): Expression = {
+      func match {
+        // AggregateWindowFunctions are AggregateFunctions that can only be evaluated within
+        // the context of a Window clause. They do not need to be wrapped in an
+        // AggregateExpression.
+        case wf: AggregateWindowFunction =>
+          if (u.isDistinct) {
+            throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+              wf.prettyName, "DISTINCT")
+          } else if (u.filter.isDefined) {
+            throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+              wf.prettyName, "FILTER clause")
+          } else if (u.ignoreNulls) {
+            wf match {
+              case nthValue: NthValue =>
+                nthValue.copy(ignoreNulls = u.ignoreNulls)
+              case _ =>
+                throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+                  wf.prettyName, "IGNORE NULLS")
             }
-        }
+          } else {
+            wf
+          }
+        case owf: FrameLessOffsetWindowFunction =>
+          if (u.isDistinct) {
+            throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+              owf.prettyName, "DISTINCT")
+          } else if (u.filter.isDefined) {
+            throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+              owf.prettyName, "FILTER clause")
+          } else if (u.ignoreNulls) {
+            owf match {
+              case lead: Lead =>
+                lead.copy(ignoreNulls = u.ignoreNulls)
+              case lag: Lag =>
+                lag.copy(ignoreNulls = u.ignoreNulls)
+            }
+          } else {
+            owf
+          }
+        // We get an aggregate function, we need to wrap it in an AggregateExpression.
+        case agg: AggregateFunction =>
+          if (u.filter.isDefined && !u.filter.get.deterministic) {
+            throw QueryCompilationErrors.nonDeterministicFilterInAggregateError
+          }
+          if (u.ignoreNulls) {
+            val aggFunc = agg match {
+              case first: First => first.copy(ignoreNulls = u.ignoreNulls)
+              case last: Last => last.copy(ignoreNulls = u.ignoreNulls)
+              case _ =>
+                throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+                  agg.prettyName, "IGNORE NULLS")
+            }
+            AggregateExpression(aggFunc, Complete, u.isDistinct, u.filter)
+          } else {
+            AggregateExpression(agg, Complete, u.isDistinct, u.filter)
+          }
+        // This function is not an aggregate function, just return the resolved one.
+        case other if u.isDistinct =>
+          throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+            other.prettyName, "DISTINCT")
+        case other if u.filter.isDefined =>
+          throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+            other.prettyName, "FILTER clause")
+        case other if u.ignoreNulls =>
+          throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+            other.prettyName, "IGNORE NULLS")
+        case e: String2TrimExpression if numArgs == 2 =>
+          if (trimWarningEnabled.get) {
+            log.warn("Two-parameter TRIM/LTRIM/RTRIM function signatures are deprecated." +
+              " Use SQL syntax `TRIM((BOTH | LEADING | TRAILING)? trimStr FROM str)`" +
+              " instead.")
+            trimWarningEnabled.set(false)
+          }
+          e
+        case other =>
+          other
+      }
+    }
+
+    private def resolveV2Function(
+        catalog: FunctionCatalog,
+        ident: Identifier,
+        arguments: Seq[Expression],
+        u: UnresolvedFunction): Expression = {
+      val unbound = catalog.loadFunction(ident)
+      val inputType = StructType(arguments.zipWithIndex.map {
+        case (exp, pos) => StructField(s"_$pos", exp.dataType, exp.nullable)
+      })
+      val bound = try {
+        unbound.bind(inputType)
+      } catch {
+        case unsupported: UnsupportedOperationException =>
+          throw QueryCompilationErrors.functionCannotProcessInputError(
+            unbound, arguments, unsupported)
+      }
+
+      if (bound.inputTypes().length != arguments.length) {
+        throw QueryCompilationErrors.v2FunctionInvalidInputTypeLengthError(
+          bound, arguments)
+      }
+
+      bound match {
+        case scalarFunc: ScalarFunction[_] =>
+          processV2ScalarFunction(scalarFunc, arguments, u)
+        case aggFunc: V2AggregateFunction[_, _] =>
+          processV2AggregateFunction(aggFunc, arguments, u)
+        case _ =>
+          failAnalysis(s"Function '${bound.name()}' does not implement ScalarFunction" +
+            s" or AggregateFunction")
+      }
     }
 
     private def processV2ScalarFunction(
         scalarFunc: ScalarFunction[_],
         arguments: Seq[Expression],
-        isDistinct: Boolean,
-        filter: Option[Expression],
-        ignoreNulls: Boolean): Expression = {
-      if (isDistinct) {
+        u: UnresolvedFunction): Expression = {
+      if (u.isDistinct) {
         throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
           scalarFunc.name(), "DISTINCT")
-      } else if (filter.isDefined) {
+      } else if (u.filter.isDefined) {
         throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
           scalarFunc.name(), "FILTER clause")
-      } else if (ignoreNulls) {
+      } else if (u.ignoreNulls) {
         throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
           scalarFunc.name(), "IGNORE NULLS")
       } else {
@@ -2242,15 +2299,13 @@ class Analyzer(override val catalogManager: CatalogManager)
     private def processV2AggregateFunction(
         aggFunc: V2AggregateFunction[_, _],
         arguments: Seq[Expression],
-        isDistinct: Boolean,
-        filter: Option[Expression],
-        ignoreNulls: Boolean): Expression = {
-      if (ignoreNulls) {
+        u: UnresolvedFunction): Expression = {
+      if (u.ignoreNulls) {
         throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
           aggFunc.name(), "IGNORE NULLS")
       }
       val aggregator = V2Aggregator(aggFunc, arguments)
-      AggregateExpression(aggregator, Complete, isDistinct, filter)
+      AggregateExpression(aggregator, Complete, u.isDistinct, u.filter)
     }
 
     /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index af60a82..d06996a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -125,6 +125,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
       case u: UnresolvedRelation =>
         u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")
 
+      case u: UnresolvedFunc =>
+        throw QueryCompilationErrors.noSuchFunctionError(
+          u.multipartIdentifier, u, u.possibleQualifiedName)
+
       case u: UnresolvedHint =>
         u.failAnalysis(s"Hint not found: ${u.name}")
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 13e755c..c995ff8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -743,6 +743,37 @@ object FunctionRegistry {
 
   val functionSet: Set[FunctionIdentifier] = builtin.listFunction().toSet
 
+  private def makeExprInfoForVirtualOperator(name: String, usage: String): ExpressionInfo = {
+    new ExpressionInfo(
+      null,
+      null,
+      name,
+      usage,
+      "",
+      "",
+      "",
+      "",
+      "",
+      "",
+      "built-in")
+  }
+
+  val builtinOperators: Map[String, ExpressionInfo] = Map(
+    "<>" -> makeExprInfoForVirtualOperator("<>",
+      "expr1 <> expr2 - Returns true if `expr1` is not equal to `expr2`."),
+    "!=" -> makeExprInfoForVirtualOperator("!=",
+      "expr1 != expr2 - Returns true if `expr1` is not equal to `expr2`."),
+    "between" -> makeExprInfoForVirtualOperator("between",
+      "expr1 [NOT] BETWEEN expr2 AND expr3 - " +
+        "evaluate if `expr1` is [not] in between `expr2` and `expr3`."),
+    "case" -> makeExprInfoForVirtualOperator("case",
+      "CASE expr1 WHEN expr2 THEN expr3 [WHEN expr4 THEN expr5]* [ELSE expr6] END " +
+        "- When `expr1` = `expr2`, returns `expr3`; when `expr1` = `expr4`, return `expr5`; " +
+        "else return `expr6`."),
+    "||" -> makeExprInfoForVirtualOperator("||",
+      "expr1 || expr2 - Returns the concatenation of `expr1` and `expr2`.")
+  )
+
   /**
    * Create a SQL function builder and corresponding `ExpressionInfo`.
    * @param name The function name.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala
index 0f46024..a737025 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.sql.catalyst.plans.logical.{DropTable, DropView, LogicalPlan, NoopCommand, UncacheTable}
+import org.apache.spark.sql.catalyst.plans.logical.{DropFunction, DropTable, DropView, LogicalPlan, NoopCommand, UncacheTable}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
 
@@ -35,5 +35,7 @@ object ResolveCommandsWithIfExists extends Rule[LogicalPlan] {
       NoopCommand("DROP VIEW", u.multipartIdentifier)
     case UncacheTable(u: UnresolvedRelation, ifExists, _) if ifExists =>
       NoopCommand("UNCACHE TABLE", u.multipartIdentifier)
+    case DropFunction(u: UnresolvedFunc, ifExists) if ifExists =>
+      NoopCommand("DROP FUNCTION", u.multipartIdentifier)
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
index 9a66170..49c6828 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
@@ -21,51 +21,9 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern._
-import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
-import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.DataType
 
 /**
- * Resolve a higher order functions from the catalog. This is different from regular function
- * resolution because lambda functions can only be resolved after the function has been resolved;
- * so we need to resolve higher order function when all children are either resolved or a lambda
- * function.
- */
-case class ResolveHigherOrderFunctions(catalogManager: CatalogManager)
-  extends Rule[LogicalPlan] with LookupCatalog {
-
-  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveExpressionsWithPruning(
-    _.containsPattern(LAMBDA_FUNCTION), ruleId) {
-    case u @ UnresolvedFunction(AsFunctionIdentifier(ident), children, false, filter, ignoreNulls)
-        if hasLambdaAndResolvedArguments(children) =>
-      withPosition(u) {
-        catalogManager.v1SessionCatalog.lookupFunction(ident, children) match {
-          case func: HigherOrderFunction =>
-            filter.foreach(_.failAnalysis("FILTER predicate specified, " +
-              s"but ${func.prettyName} is not an aggregate function"))
-            if (ignoreNulls) {
-              throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
-                func.prettyName, "IGNORE NULLS")
-            }
-            func
-          case other => other.failAnalysis(
-            "A lambda function should only be used in a higher order function. However, " +
-              s"its class is ${other.getClass.getCanonicalName}, which is not a " +
-              s"higher order function.")
-        }
-      }
-  }
-
-  /**
-   * Check if the arguments of a function are either resolved or a lambda function.
-   */
-  private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = {
-    val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction])
-    lambdas.nonEmpty && others.forall(_.resolved)
-  }
-}
-
-/**
  * Resolve the lambda variables exposed by a higher order functions.
  *
  * This rule works in two steps:
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index 9ddaebf..4cffead 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -23,9 +23,10 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, LeafExpression, Une
 import org.apache.spark.sql.catalyst.plans.logical.LeafNode
 import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, UNRESOLVED_FUNC}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, Table, TableCatalog}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.types.{DataType, StructField}
 
 /**
@@ -52,7 +53,7 @@ case class UnresolvedTable(
 }
 
 /**
- * Holds the name of a view that has yet to be looked up in a catalog. It will be resolved to
+ * Holds the name of a view that has yet to be looked up. It will be resolved to
  * [[ResolvedView]] during analysis.
  */
 case class UnresolvedView(
@@ -115,10 +116,15 @@ case class UnresolvedFieldPosition(position: ColumnPosition) extends FieldPositi
 }
 
 /**
- * Holds the name of a function that has yet to be looked up in a catalog. It will be resolved to
- * [[ResolvedFunc]] during analysis.
+ * Holds the name of a function that has yet to be looked up. It will be resolved to
+ * [[ResolvedPersistentFunc]] or [[ResolvedNonPersistentFunc]] during analysis.
  */
-case class UnresolvedFunc(multipartIdentifier: Seq[String]) extends LeafNode {
+case class UnresolvedFunc(
+    multipartIdentifier: Seq[String],
+    commandName: String,
+    requirePersistent: Boolean,
+    funcTypeMismatchHint: Option[String],
+    possibleQualifiedName: Option[Seq[String]] = None) extends LeafNode {
   override lazy val resolved: Boolean = false
   override def output: Seq[Attribute] = Nil
   final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_FUNC)
@@ -190,16 +196,24 @@ case class ResolvedView(identifier: Identifier, isTemp: Boolean) extends LeafNod
 }
 
 /**
- * A plan containing resolved function.
+ * A plan containing resolved persistent function.
  */
-// TODO: create a generic representation for v1, v2 function, after we add function
-//       support to v2 catalog. For now we only need the identifier to fallback to v1 command.
-case class ResolvedFunc(identifier: Identifier)
+case class ResolvedPersistentFunc(
+    catalog: FunctionCatalog,
+    identifier: Identifier,
+    func: UnboundFunction)
   extends LeafNode {
   override def output: Seq[Attribute] = Nil
 }
 
 /**
+ * A plan containing resolved non-persistent (temp or built-in) function.
+ */
+case class ResolvedNonPersistentFunc(name: String, func: UnboundFunction) extends LeafNode {
+  override def output: Seq[Attribute] = Nil
+}
+
+/**
  * A plan containing resolved database object name with catalog determined.
  */
 case class ResolvedDBObjectName(catalog: CatalogPlugin, nameParts: Seq[String]) extends LeafNode {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 60f68fb..c712d2c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1433,10 +1433,16 @@ class SessionCatalog(
   // ----------------------------------------------------------------
 
   /**
-   * Constructs a [[FunctionBuilder]] based on the provided class that represents a function.
+   * Constructs a [[FunctionBuilder]] based on the provided function metadata.
    */
-  private def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
-    val clazz = Utils.classForName(functionClassName)
+  private def makeFunctionBuilder(func: CatalogFunction): FunctionBuilder = {
+    val className = func.className
+    if (!Utils.classIsLoadable(className)) {
+      throw QueryCompilationErrors.cannotLoadClassWhenRegisteringFunctionError(
+        className, func.identifier)
+    }
+    val clazz = Utils.classForName(className)
+    val name = func.identifier.unquotedString
     (input: Seq[Expression]) => functionExpressionBuilder.makeExpression(name, clazz, input)
   }
 
@@ -1449,20 +1455,34 @@ class SessionCatalog(
   }
 
   /**
-   * Registers a temporary or permanent function into a session-specific [[FunctionRegistry]]
+   * Registers a temporary or permanent scalar function into a session-specific [[FunctionRegistry]]
    */
   def registerFunction(
       funcDefinition: CatalogFunction,
       overrideIfExists: Boolean,
       functionBuilder: Option[FunctionBuilder] = None): Unit = {
+    val builder = functionBuilder.getOrElse(makeFunctionBuilder(funcDefinition))
+    registerFunction(funcDefinition, overrideIfExists, functionRegistry, builder)
+  }
+
+  private def registerFunction[T](
+      funcDefinition: CatalogFunction,
+      overrideIfExists: Boolean,
+      registry: FunctionRegistryBase[T],
+      functionBuilder: FunctionRegistryBase[T]#FunctionBuilder): Unit = {
     val func = funcDefinition.identifier
-    if (functionRegistry.functionExists(func) && !overrideIfExists) {
+    if (registry.functionExists(func) && !overrideIfExists) {
       throw QueryCompilationErrors.functionAlreadyExistsError(func)
     }
-    val info = new ExpressionInfo(
-      funcDefinition.className,
-      func.database.orNull,
-      func.funcName,
+    val info = makeExprInfoForHiveFunction(funcDefinition)
+    registry.registerFunction(func, info, functionBuilder)
+  }
+
+  private def makeExprInfoForHiveFunction(func: CatalogFunction): ExpressionInfo = {
+    new ExpressionInfo(
+      func.className,
+      func.identifier.database.orNull,
+      func.identifier.funcName,
       null,
       "",
       "",
@@ -1471,15 +1491,6 @@ class SessionCatalog(
       "",
       "",
       "hive")
-    val builder =
-      functionBuilder.getOrElse {
-        val className = funcDefinition.className
-        if (!Utils.classIsLoadable(className)) {
-          throw QueryCompilationErrors.cannotLoadClassWhenRegisteringFunctionError(className, func)
-        }
-        makeFunctionBuilder(func.unquotedString, className)
-      }
-    functionRegistry.registerFunction(func, info, builder)
   }
 
   /**
@@ -1507,14 +1518,7 @@ class SessionCatalog(
   def isTemporaryFunction(name: FunctionIdentifier): Boolean = {
     // A temporary function is a function that has been registered in functionRegistry
     // without a database name, and is neither a built-in function nor a Hive function
-    name.database.isEmpty &&
-      (functionRegistry.functionExists(name) || tableFunctionRegistry.functionExists(name)) &&
-      !FunctionRegistry.builtin.functionExists(name) &&
-      !TableFunctionRegistry.builtin.functionExists(name)
-  }
-
-  def isTempFunction(name: String): Boolean = {
-    isTemporaryFunction(FunctionIdentifier(name))
+    name.database.isEmpty && isRegisteredFunction(name) && !isBuiltinFunction(name)
   }
 
   /**
@@ -1533,6 +1537,14 @@ class SessionCatalog(
     databaseExists(db) && externalCatalog.functionExists(db, name.funcName)
   }
 
+  /**
+   * Returns whether it is a built-in function.
+   */
+  def isBuiltinFunction(name: FunctionIdentifier): Boolean = {
+    FunctionRegistry.builtin.functionExists(name) ||
+      TableFunctionRegistry.builtin.functionExists(name)
+  }
+
   protected[sql] def failFunctionLookup(
       name: FunctionIdentifier, cause: Option[Throwable] = None): Nothing = {
     throw new NoSuchFunctionException(
@@ -1540,32 +1552,97 @@ class SessionCatalog(
   }
 
   /**
-   * Look up the [[ExpressionInfo]] associated with the specified function, assuming it exists.
+   * Look up the `ExpressionInfo` of the given function by name if it's a built-in or temp function.
+   * This supports both scalar and table functions.
    */
-  def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = synchronized {
-    // TODO: just make function registry take in FunctionIdentifier instead of duplicating this
+  def lookupBuiltinOrTempFunction(name: String): Option[ExpressionInfo] = {
+    FunctionRegistry.builtinOperators.get(name.toLowerCase(Locale.ROOT)).orElse {
+      def lookup(ident: FunctionIdentifier): Option[ExpressionInfo] = {
+        functionRegistry.lookupFunction(ident).orElse(
+          tableFunctionRegistry.lookupFunction(ident))
+      }
+      synchronized(lookupTempFuncWithViewContext(name, isBuiltinFunction, lookup))
+    }
+  }
+
+  /**
+   * Look up a built-in or temp scalar function by name and resolves it to an Expression if such
+   * a function exists.
+   */
+  def resolveBuiltinOrTempFunction(name: String, arguments: Seq[Expression]): Option[Expression] = {
+    resolveBuiltinOrTempFunctionInternal(
+      name, arguments, FunctionRegistry.builtin.functionExists, functionRegistry)
+  }
+
+  /**
+   * Look up a built-in or temp table function by name and resolves it to a LogicalPlan if such
+   * a function exists.
+   */
+  def resolveBuiltinOrTempTableFunction(
+      name: String, arguments: Seq[Expression]): Option[LogicalPlan] = {
+    resolveBuiltinOrTempFunctionInternal(
+      name, arguments, TableFunctionRegistry.builtin.functionExists, tableFunctionRegistry)
+  }
+
+  private def resolveBuiltinOrTempFunctionInternal[T](
+      name: String,
+      arguments: Seq[Expression],
+      isBuiltin: FunctionIdentifier => Boolean,
+      registry: FunctionRegistryBase[T]): Option[T] = synchronized {
+    val funcIdent = FunctionIdentifier(name)
+    if (!registry.functionExists(funcIdent)) {
+      None
+    } else {
+      lookupTempFuncWithViewContext(
+        name, isBuiltin, ident => Option(registry.lookupFunction(ident, arguments)))
+    }
+  }
+
+  private def lookupTempFuncWithViewContext[T](
+      name: String,
+      isBuiltin: FunctionIdentifier => Boolean,
+      lookupFunc: FunctionIdentifier => Option[T]): Option[T] = {
+    val funcIdent = FunctionIdentifier(name)
+    if (isBuiltin(funcIdent)) {
+      lookupFunc(funcIdent)
+    } else {
+      val isResolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty
+      val referredTempFunctionNames = AnalysisContext.get.referredTempFunctionNames
+      if (isResolvingView) {
+        // When resolving a view, only return a temp function if it's referred by this view.
+        if (referredTempFunctionNames.contains(name)) {
+          lookupFunc(funcIdent)
+        } else {
+          None
+        }
+      } else {
+        val result = lookupFunc(funcIdent)
+        if (result.isDefined) {
+          // We are not resolving a view and the function is a temp one, add it to
+          // `AnalysisContext`, so during the view creation, we can save all referred temp
+          // functions to view metadata.
+          AnalysisContext.get.referredTempFunctionNames.add(name)
+        }
+        result
+      }
+    }
+  }
+
+  /**
+   * Look up the `ExpressionInfo` of the given function by name if it's a persistent function.
+   * This supports both scalar and table functions.
+   */
+  def lookupPersistentFunction(name: FunctionIdentifier): ExpressionInfo = {
     val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName)
     val qualifiedName = name.copy(database = database)
-    functionRegistry.lookupFunction(name)
-      .orElse(functionRegistry.lookupFunction(qualifiedName))
-      .orElse(tableFunctionRegistry.lookupFunction(name))
+    functionRegistry.lookupFunction(qualifiedName)
+      .orElse(tableFunctionRegistry.lookupFunction(qualifiedName))
       .getOrElse {
         val db = qualifiedName.database.get
         requireDbExists(db)
         if (externalCatalog.functionExists(db, name.funcName)) {
           val metadata = externalCatalog.getFunction(db, name.funcName)
-          new ExpressionInfo(
-            metadata.className,
-            qualifiedName.database.orNull,
-            qualifiedName.identifier,
-            null,
-            "",
-            "",
-            "",
-            "",
-            "",
-            "",
-            "hive")
+          makeExprInfoForHiveFunction(metadata.copy(identifier = qualifiedName))
         } else {
           failFunctionLookup(name)
         }
@@ -1573,97 +1650,90 @@ class SessionCatalog(
   }
 
   /**
-   * Look up a specific function, assuming it exists.
-   *
-   * For a temporary function or a permanent function that has been loaded,
-   * this method will simply lookup the function through the
-   * FunctionRegistry and create an expression based on the builder.
-   *
-   * For a permanent function that has not been loaded, we will first fetch its metadata
-   * from the underlying external catalog. Then, we will load all resources associated
-   * with this function (i.e. jars and files). Finally, we create a function builder
-   * based on the function class and put the builder into the FunctionRegistry.
-   * The name of this function in the FunctionRegistry will be `databaseName.functionName`.
+   * Look up a persistent scalar function by name and resolves it to an Expression.
    */
-  private def lookupFunction[T](
-      name: FunctionIdentifier,
-      children: Seq[Expression],
-      registry: FunctionRegistryBase[T]): T = synchronized {
-    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
-
-    // Note: the implementation of this function is a little bit convoluted.
-    // We probably shouldn't use a single FunctionRegistry to register all three kinds of functions
-    // (built-in, temp, and external).
-    if (name.database.isEmpty && registry.functionExists(name)) {
-      val referredTempFunctionNames = AnalysisContext.get.referredTempFunctionNames
-      val isResolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty
-      // Lookup the function as a temporary or a built-in function (i.e. without database) and
-      // 1. if we are not resolving view, we don't care about the function type and just return it.
-      // 2. if we are resolving view, only return a temp function if it's referred by this view.
-      if (!isResolvingView ||
-          !isTemporaryFunction(name) ||
-          referredTempFunctionNames.contains(name.funcName)) {
-        // We are not resolving a view and the function is a temp one, add it to `AnalysisContext`,
-        // so during the view creation, we can save all referred temp functions to view metadata
-        if (!isResolvingView && isTemporaryFunction(name)) {
-          AnalysisContext.get.referredTempFunctionNames.add(name.funcName)
-        }
-        // This function has been already loaded into the function registry.
-        return registry.lookupFunction(name, children)
-      }
-    }
+  def resolvePersistentFunction(
+      name: FunctionIdentifier, arguments: Seq[Expression]): Expression = {
+    resolvePersistentFunctionInternal(name, arguments, functionRegistry, makeFunctionBuilder)
+  }
 
-    // Get the database from AnalysisContext if it's defined, otherwise, use current database
-    val currentDatabase = AnalysisContext.get.catalogAndNamespace match {
-      case Seq() => getCurrentDatabase
-      case Seq(_, db) => db
-      case Seq(catalog, namespace @ _*) =>
-        throw new IllegalStateException(s"[BUG] unexpected v2 catalog: $catalog, and " +
-          s"namespace: ${namespace.quoted} in v1 function lookup")
-    }
+  /**
+   * Look up a persistent table function by name and resolves it to a LogicalPlan.
+   */
+  def resolvePersistentTableFunction(
+      name: FunctionIdentifier,
+      arguments: Seq[Expression]): LogicalPlan = {
+    // We don't support persistent table functions yet.
+    val builder = (func: CatalogFunction) => failFunctionLookup(name)
+    resolvePersistentFunctionInternal(name, arguments, tableFunctionRegistry, builder)
+  }
 
-    // If the name itself is not qualified, add the current database to it.
-    val database = formatDatabaseName(name.database.getOrElse(currentDatabase))
+  private def resolvePersistentFunctionInternal[T](
+      name: FunctionIdentifier,
+      arguments: Seq[Expression],
+      registry: FunctionRegistryBase[T],
+      createFunctionBuilder: CatalogFunction => FunctionRegistryBase[T]#FunctionBuilder): T = {
+    val database = formatDatabaseName(name.database.getOrElse(currentDb))
     val qualifiedName = name.copy(database = Some(database))
-
     if (registry.functionExists(qualifiedName)) {
       // This function has been already loaded into the function registry.
-      // Unlike the above block, we find this function by using the qualified name.
-      return registry.lookupFunction(qualifiedName, children)
-    }
-
-    // The function has not been loaded to the function registry, which means
-    // that the function is a permanent function (if it actually has been registered
-    // in the metastore). We need to first put the function in the FunctionRegistry.
-    // TODO: why not just check whether the function exists first?
-    val catalogFunction = try {
-      externalCatalog.getFunction(database, name.funcName)
-    } catch {
-      case _: AnalysisException => failFunctionLookup(name)
+      registry.lookupFunction(qualifiedName, arguments)
+    } else {
+      // The function has not been loaded to the function registry, which means
+      // that the function is a persistent function (if it actually has been registered
+      // in the metastore). We need to first put the function in the function registry.
+      val catalogFunction = try {
+        externalCatalog.getFunction(database, qualifiedName.funcName)
+      } catch {
+        case _: AnalysisException => failFunctionLookup(qualifiedName)
+      }
+      loadFunctionResources(catalogFunction.resources)
+      // Please note that qualifiedName is provided by the user. However,
+      // catalogFunction.identifier.unquotedString is returned by the underlying
+      // catalog. So, it is possible that qualifiedName is not exactly the same as
+      // catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
+      // At here, we preserve the input from the user.
+      val funcMetadata = catalogFunction.copy(identifier = qualifiedName)
+      registerFunction(
+        funcMetadata,
+        overrideIfExists = false,
+        registry = registry,
+        functionBuilder = createFunctionBuilder(funcMetadata))
+      // Now, we need to create the Expression.
+      registry.lookupFunction(qualifiedName, arguments)
     }
-    loadFunctionResources(catalogFunction.resources)
-    // Please note that qualifiedName is provided by the user. However,
-    // catalogFunction.identifier.unquotedString is returned by the underlying
-    // catalog. So, it is possible that qualifiedName is not exactly the same as
-    // catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
-    // At here, we preserve the input from the user.
-    registerFunction(catalogFunction.copy(identifier = qualifiedName), overrideIfExists = false)
-    // Now, we need to create the Expression.
-    registry.lookupFunction(qualifiedName, children)
   }
 
   /**
-   * Return an [[Expression]] that represents the specified function, assuming it exists.
+   * Look up the [[ExpressionInfo]] associated with the specified function, assuming it exists.
    */
+  def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = synchronized {
+    if (name.database.isEmpty) {
+      lookupBuiltinOrTempFunction(name.funcName).getOrElse(lookupPersistentFunction(name))
+    } else {
+      lookupPersistentFunction(name)
+    }
+  }
+
+  // Test only. The actual function lookup logic looks up temp/built-in function first, then
+  // persistent function from either v1 or v2 catalog. This method only look up v1 catalog and is
+  // no longer valid.
   def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = {
-    lookupFunction[Expression](name, children, functionRegistry)
+    if (name.database.isEmpty) {
+      resolveBuiltinOrTempFunction(name.funcName, children)
+        .getOrElse(resolvePersistentFunction(name, children))
+    } else {
+      resolvePersistentFunction(name, children)
+    }
   }
 
-  /**
-   * Return a [[LogicalPlan]] that represents the specified function, assuming it exists.
-   */
   def lookupTableFunction(name: FunctionIdentifier, children: Seq[Expression]): LogicalPlan = {
-    lookupFunction[LogicalPlan](name, children, tableFunctionRegistry)
+    if (name.database.isEmpty) {
+      resolveBuiltinOrTempTableFunction(name.funcName, children)
+        .getOrElse(resolvePersistentTableFunction(name, children))
+    } else {
+      resolvePersistentTableFunction(name, children)
+    }
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 587ea2e..6a509db 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -4390,7 +4390,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
       } else {
         Seq(describeFuncName.getText)
       }
-    DescribeFunction(UnresolvedFunc(functionName), EXTENDED != null)
+    DescribeFunction(
+      UnresolvedFunc(
+        functionName,
+        "DESCRIBE FUNCTION",
+        requirePersistent = false,
+        funcTypeMismatchHint = None),
+      EXTENDED != null)
   }
 
   /**
@@ -4421,25 +4427,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
     ShowFunctions(nsPlan, userScope, systemScope, pattern)
   }
 
-  /**
-   * Create a DROP FUNCTION statement.
-   *
-   * For example:
-   * {{{
-   *   DROP [TEMPORARY] FUNCTION [IF EXISTS] function;
-   * }}}
-   */
-  override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = withOrigin(ctx) {
-    val functionName = visitMultipartIdentifier(ctx.multipartIdentifier)
-    DropFunction(
-      UnresolvedFunc(functionName),
-      ctx.EXISTS != null,
-      ctx.TEMPORARY != null)
-  }
-
   override def visitRefreshFunction(ctx: RefreshFunctionContext): LogicalPlan = withOrigin(ctx) {
     val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier)
-    RefreshFunction(UnresolvedFunc(functionIdentifier))
+    RefreshFunction(UnresolvedFunc(
+      functionIdentifier,
+      "REFRESH FUNCTION",
+      requirePersistent = true,
+      funcTypeMismatchHint = None))
   }
 
   override def visitCommentNamespace(ctx: CommentNamespaceContext): LogicalPlan = withOrigin(ctx) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 8ea7487..edf3abf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -744,8 +744,7 @@ case class CreateFunction(
  */
 case class DropFunction(
     child: LogicalPlan,
-    ifExists: Boolean,
-    isTemp: Boolean) extends UnaryCommand {
+    ifExists: Boolean) extends UnaryCommand {
   override protected def withNewChildInternal(newChild: LogicalPlan): DropFunction =
     copy(child = newChild)
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index 185a1a2..dbc4bd3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -72,14 +72,14 @@ private[sql] object CatalogV2Implicits {
       case tableCatalog: TableCatalog =>
         tableCatalog
       case _ =>
-        throw QueryCompilationErrors.cannotUseCatalogError(plugin, "not a TableCatalog")
+        throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "tables")
     }
 
     def asNamespaceCatalog: SupportsNamespaces = plugin match {
       case namespaceCatalog: SupportsNamespaces =>
         namespaceCatalog
       case _ =>
-        throw QueryCompilationErrors.cannotUseCatalogError(plugin, "does not support namespaces")
+        throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "namespaces")
     }
 
     def isFunctionCatalog: Boolean = plugin match {
@@ -91,7 +91,7 @@ private[sql] object CatalogV2Implicits {
       case functionCatalog: FunctionCatalog =>
         functionCatalog
       case _ =>
-        throw QueryCompilationErrors.cannotUseCatalogError(plugin, "not a FunctionCatalog")
+        throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "functions")
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index e29019c..597b3c3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -22,9 +22,10 @@ import java.util.Collections
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
+import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
 import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
 import org.apache.spark.sql.connector.catalog.TableChange._
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -297,6 +298,16 @@ private[sql] object CatalogV2Util {
       case _: NoSuchNamespaceException => None
     }
 
+  def loadFunction(catalog: CatalogPlugin, ident: Identifier): Option[UnboundFunction] = {
+    try {
+      Option(catalog.asFunctionCatalog.loadFunction(ident))
+    } catch {
+      case _: NoSuchFunctionException => None
+      case _: NoSuchDatabaseException => None
+      case _: NoSuchNamespaceException => None
+    }
+  }
+
   def loadRelation(catalog: CatalogPlugin, ident: Identifier): Option[NamedRelation] = {
     loadTable(catalog, ident).map(DataSourceV2Relation.create(_, Some(catalog), Some(ident)))
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
index 0362caf..21f4258 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
@@ -18,8 +18,7 @@
 package org.apache.spark.sql.connector.catalog
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 
 /**
@@ -153,50 +152,4 @@ private[sql] trait LookupCatalog extends Logging {
       }
     }
   }
-
-  object AsFunctionIdentifier {
-    def unapply(parts: Seq[String]): Option[FunctionIdentifier] = {
-      def namesToFunctionIdentifier(names: Seq[String]): Option[FunctionIdentifier] = names match {
-        case Seq(name) => Some(FunctionIdentifier(name))
-        case Seq(database, name) => Some(FunctionIdentifier(name, Some(database)))
-        case _ => None
-      }
-      parts match {
-        case Seq(name)
-          if catalogManager.v1SessionCatalog.isRegisteredFunction(FunctionIdentifier(name)) =>
-          Some(FunctionIdentifier(name))
-        case CatalogAndMultipartIdentifier(None, names)
-          if CatalogV2Util.isSessionCatalog(currentCatalog) =>
-          namesToFunctionIdentifier(names)
-        case CatalogAndMultipartIdentifier(Some(catalog), names)
-          if CatalogV2Util.isSessionCatalog(catalog) =>
-          namesToFunctionIdentifier(names)
-        case _ => None
-      }
-    }
-  }
-
-  def parseSessionCatalogFunctionIdentifier(nameParts: Seq[String]): FunctionIdentifier = {
-    if (nameParts.length == 1 && catalogManager.v1SessionCatalog.isTempFunction(nameParts.head)) {
-      return FunctionIdentifier(nameParts.head)
-    }
-
-    nameParts match {
-      case SessionCatalogAndIdentifier(_, ident) =>
-        if (nameParts.length == 1) {
-          // If there is only one name part, it means the current catalog is the session catalog.
-          // Here we don't fill the default database, to keep the error message unchanged for
-          // v1 commands.
-          FunctionIdentifier(nameParts.head, None)
-        } else {
-          ident.namespace match {
-            case Array(db) => FunctionIdentifier(ident.name, Some(db))
-            case other =>
-              throw QueryCompilationErrors.requiresSinglePartNamespaceError(other)
-          }
-        }
-
-      case _ => throw QueryCompilationErrors.functionUnsupportedInV2CatalogError()
-    }
-  }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 846f285..fcbcb54 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -258,6 +258,14 @@ object QueryCompilationErrors {
       t.origin.line, t.origin.startPosition)
   }
 
+  def expectPersistentFuncError(
+      name: String, cmd: String, mismatchHint: Option[String], t: TreeNode[_]): Throwable = {
+    val hintStr = mismatchHint.map(" " + _).getOrElse("")
+    new AnalysisException(
+      s"$name is a built-in/temporary function. '$cmd' expects a persistent function.$hintStr",
+      t.origin.line, t.origin.startPosition)
+  }
+
   def permanentViewNotSupportedByStreamingReadingAPIError(quoted: String): Throwable = {
     new AnalysisException(s"$quoted is a permanent view, which is not supported by " +
       "streaming reading API such as `DataStreamReader.table` yet.")
@@ -507,10 +515,6 @@ object QueryCompilationErrors {
         s"'${db.head}' != '${v1TableName.database.get}'")
   }
 
-  def sqlOnlySupportedWithV1CatalogError(sql: String, catalog: String): Throwable = {
-    new AnalysisException(s"Catalog $catalog does not support $sql.")
-  }
-
   def sqlOnlySupportedWithV1TablesError(sql: String): Throwable = {
     new AnalysisException(s"$sql is only supported with v1 tables.")
   }
@@ -1374,8 +1378,8 @@ object QueryCompilationErrors {
     new AnalysisException(s"Cannot partition by nested column: $reference")
   }
 
-  def cannotUseCatalogError(plugin: CatalogPlugin, msg: String): Throwable = {
-    new AnalysisException(s"Cannot use catalog ${plugin.name}: $msg")
+  def missingCatalogAbilityError(plugin: CatalogPlugin, ability: String): Throwable = {
+    new AnalysisException(s"Catalog ${plugin.name} does not support $ability")
   }
 
   def identifierHavingMoreThanTwoNamePartsError(
@@ -1387,10 +1391,6 @@ object QueryCompilationErrors {
     new AnalysisException("multi-part identifier cannot be empty.")
   }
 
-  def functionUnsupportedInV2CatalogError(): Throwable = {
-    new AnalysisException("function is only supported in v1 catalog")
-  }
-
   def cannotOperateOnHiveDataSourceFilesError(operation: String): Throwable = {
     new AnalysisException("Hive data source can only be used with tables, you can not " +
       s"$operation files of Hive data source directly.")
@@ -1758,6 +1758,21 @@ object QueryCompilationErrors {
     new AnalysisException(s"Table or view not found: $table")
   }
 
+  def noSuchFunctionError(
+      rawName: Seq[String],
+      t: TreeNode[_],
+      fullName: Option[Seq[String]] = None): Throwable = {
+    if (rawName.length == 1 && fullName.isDefined) {
+      new AnalysisException(s"Undefined function: ${rawName.head}. " +
+        "This function is neither a built-in/temporary function, nor a persistent " +
+        s"function that is qualified as ${fullName.get.quoted}.",
+        t.origin.line, t.origin.startPosition)
+    } else {
+      new AnalysisException(s"Undefined function: ${rawName.quoted}",
+        t.origin.line, t.origin.startPosition)
+    }
+  }
+
   def unsetNonExistentPropertyError(property: String, table: TableIdentifier): Throwable = {
     new AnalysisException(s"Attempted to unset non-existent property '$property' in table '$table'")
   }
@@ -1827,13 +1842,8 @@ object QueryCompilationErrors {
     new AnalysisException("Cannot overwrite a path that is also being read from.")
   }
 
-  def specifyingDBInDropTempFuncError(databaseName: String): Throwable = {
-    new AnalysisException(
-      s"Specifying a database in DROP TEMPORARY FUNCTION is not allowed: '$databaseName'")
-  }
-
-  def cannotDropNativeFuncError(functionName: String): Throwable = {
-    new AnalysisException(s"Cannot drop native function '$functionName'")
+  def cannotDropBuiltinFuncError(functionName: String): Throwable = {
+    new AnalysisException(s"Cannot drop built-in function '$functionName'")
   }
 
   def cannotRefreshBuiltInFuncError(functionName: String): Throwable = {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index 88f3851..938bbfd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -22,6 +22,7 @@ import org.antlr.v4.runtime.ParserRuleContext
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
 /**
  * Object for grouping all error messages of the query parsing.
@@ -434,4 +435,9 @@ object QueryParsingErrors {
   def invalidTimeTravelSpec(reason: String, ctx: ParserRuleContext): Throwable = {
     new ParseException(s"Invalid time travel spec: $reason.", ctx)
   }
+
+  def invalidNameForDropTempFunc(name: Seq[String], ctx: ParserRuleContext): Throwable = {
+    new ParseException(
+      s"DROP TEMPORARY FUNCTION requires a single part name but got: ${name.quoted}", ctx)
+  }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/V1Function.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/V1Function.scala
new file mode 100644
index 0000000..fdd7161
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/V1Function.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.connector
+
+import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
+import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, UnboundFunction}
+import org.apache.spark.sql.types.StructType
+
+case class V1Function(info: ExpressionInfo) extends UnboundFunction {
+  override def bind(inputType: StructType): BoundFunction = {
+    throw new UnsupportedOperationException("Cannot bind a V1 function.")
+  }
+  override def name(): String = info.getName
+  override def description(): String = info.getUsage
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 3011351..8f690e2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -218,7 +218,7 @@ class AnalysisErrorSuite extends AnalysisTest {
     "higher order function with filter predicate",
     CatalystSqlParser.parsePlan("SELECT aggregate(array(1, 2, 3), 0, (acc, x) -> acc + x) " +
       "FILTER (WHERE c > 1)"),
-    "FILTER predicate specified, but aggregate is not an aggregate function" :: Nil)
+    "Function aggregate does not support FILTER clause" :: Nil)
 
   errorTest(
     "non-deterministic filter predicate in aggregate functions",
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
index 85e0b10..de8c57f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
@@ -19,23 +19,67 @@ package org.apache.spark.sql.catalyst.analysis
 
 import java.net.URI
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
-import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.{Alias, ExpressionInfo}
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import org.apache.spark.sql.connector.catalog.{CatalogManager, FunctionCatalog, Identifier}
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.connector.V1Function
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class LookupFunctionsSuite extends PlanTest {
 
-  test("SPARK-23486: the functionExists for the Persistent function check") {
+  test("SPARK-19737: detect undefined functions without triggering relation resolution") {
+    import org.apache.spark.sql.catalyst.dsl.plans._
+
+    Seq(true, false) foreach { caseSensitive =>
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+        val externalCatalog = new CustomInMemoryCatalog
+        externalCatalog.createDatabase(
+          CatalogDatabase("default", "", new URI("loc1"), Map.empty),
+          ignoreIfExists = false)
+        externalCatalog.createDatabase(
+          CatalogDatabase("db1", "", new URI("loc2"), Map.empty),
+          ignoreIfExists = false)
+        val catalog = new SessionCatalog(externalCatalog, new SimpleFunctionRegistry)
+        val catalogManager = new CatalogManager(new CustomV2SessionCatalog(catalog), catalog)
+        catalog.setCurrentDatabase("db1")
+        try {
+          val analyzer = new Analyzer(catalogManager)
+
+          // The analyzer should report the undefined function
+          // rather than the undefined table first.
+          val cause = intercept[AnalysisException] {
+            analyzer.execute(
+              UnresolvedRelation(TableIdentifier("undefined_table")).select(
+                UnresolvedFunction("undefined_fn", Nil, isDistinct = false)
+              )
+            )
+          }
+
+          assert(cause.getMessage.contains("Undefined function: undefined_fn"))
+          // SPARK-21318: the error message should contains the current database name
+          assert(cause.getMessage.contains("db1"))
+        } finally {
+          catalog.reset()
+        }
+      }
+    }
+  }
+
+  test("SPARK-23486: the getFunction for the Persistent function check") {
     val externalCatalog = new CustomInMemoryCatalog
-    val catalog = new SessionCatalog(externalCatalog, FunctionRegistry.builtin)
+    val catalog = new SessionCatalog(externalCatalog, FunctionRegistry.builtin.clone())
+    val catalogManager = new CatalogManager(new CustomV2SessionCatalog(catalog), catalog)
     val analyzer = {
       catalog.createDatabase(
         CatalogDatabase("default", "", new URI("loc"), Map.empty),
         ignoreIfExists = false)
-      new Analyzer(catalog)
+      new Analyzer(catalogManager)
     }
 
     def table(ref: String): LogicalPlan = UnresolvedRelation(TableIdentifier(ref))
@@ -49,19 +93,18 @@ class LookupFunctionsSuite extends PlanTest {
     analyzer.LookupFunctions.apply(plan)
 
     assert(externalCatalog.getFunctionExistsCalledTimes == 1)
-    assert(analyzer.LookupFunctions.normalizeFuncName
-      (unresolvedPersistentFunc.nameParts.asFunctionIdentifier).database == Some("default"))
   }
 
-  test("SPARK-23486: the functionExists for the Registered function check") {
+  test("SPARK-23486: the lookupFunction for the Registered function check") {
     val externalCatalog = new InMemoryCatalog
     val customerFunctionReg = new CustomerFunctionRegistry
     val catalog = new SessionCatalog(externalCatalog, customerFunctionReg)
+    val catalogManager = new CatalogManager(new CustomV2SessionCatalog(catalog), catalog)
     val analyzer = {
       catalog.createDatabase(
         CatalogDatabase("default", "", new URI("loc"), Map.empty),
         ignoreIfExists = false)
-      new Analyzer(catalog)
+      new Analyzer(catalogManager)
     }
 
     def table(ref: String): LogicalPlan = UnresolvedRelation(TableIdentifier(ref))
@@ -71,32 +114,45 @@ class LookupFunctionsSuite extends PlanTest {
       table("TaBlE"))
     analyzer.LookupFunctions.apply(plan)
 
-    assert(customerFunctionReg.getIsRegisteredFunctionCalledTimes == 4)
-    assert(analyzer.LookupFunctions.normalizeFuncName
-      (unresolvedRegisteredFunc.nameParts.asFunctionIdentifier).database == Some("default"))
+    assert(customerFunctionReg.getLookupFunctionCalledTimes == 2)
   }
 }
 
 class CustomerFunctionRegistry extends SimpleFunctionRegistry {
+  private var lookupFunctionCalledTimes: Int = 0;
 
-  private var isRegisteredFunctionCalledTimes: Int = 0;
-
-  override def functionExists(funcN: FunctionIdentifier): Boolean = synchronized {
-    isRegisteredFunctionCalledTimes = isRegisteredFunctionCalledTimes + 1
-    true
+  override def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] = {
+    lookupFunctionCalledTimes += 1
+    if (name.funcName == "undefined_fn") return None
+    Some(new ExpressionInfo("fake", "name"))
   }
 
-  def getIsRegisteredFunctionCalledTimes: Int = isRegisteredFunctionCalledTimes
+  def getLookupFunctionCalledTimes: Int = lookupFunctionCalledTimes
 }
 
 class CustomInMemoryCatalog extends InMemoryCatalog {
-
   private var functionExistsCalledTimes: Int = 0
 
   override def functionExists(db: String, funcName: String): Boolean = synchronized {
-    functionExistsCalledTimes = functionExistsCalledTimes + 1
-    true
+    functionExistsCalledTimes += 1
+    funcName != "undefined_fn"
   }
 
   def getFunctionExistsCalledTimes: Int = functionExistsCalledTimes
 }
+
+class CustomV2SessionCatalog(v1Catalog: SessionCatalog) extends FunctionCatalog {
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+  override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}
+  override def name(): String = CatalogManager.SESSION_CATALOG_NAME
+  override def listFunctions(namespace: Array[String]): Array[Identifier] = {
+    throw new UnsupportedOperationException()
+  }
+
+  override def loadFunction(ident: Identifier): UnboundFunction = {
+    V1Function(v1Catalog.lookupPersistentFunction(ident.asFunctionIdentifier))
+  }
+  override def functionExists(ident: Identifier): Boolean = {
+    v1Catalog.isPersistentFunction(ident.asFunctionIdentifier)
+  }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 89afa02..b134085 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -1623,36 +1623,6 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
     }
   }
 
-  test("SPARK-19737: detect undefined functions without triggering relation resolution") {
-    import org.apache.spark.sql.catalyst.dsl.plans._
-
-    Seq(true, false) foreach { caseSensitive =>
-      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
-        val catalog = new SessionCatalog(newBasicCatalog(), new SimpleFunctionRegistry)
-        catalog.setCurrentDatabase("db1")
-        try {
-          val analyzer = new Analyzer(catalog)
-
-          // The analyzer should report the undefined function
-          // rather than the undefined table first.
-          val cause = intercept[AnalysisException] {
-            analyzer.execute(
-              UnresolvedRelation(TableIdentifier("undefined_table")).select(
-                UnresolvedFunction("undefined_fn", Nil, isDistinct = false)
-              )
-            )
-          }
-
-          assert(cause.getMessage.contains("Undefined function: 'undefined_fn'"))
-          // SPARK-21318: the error message should contains the current database name
-          assert(cause.getMessage.contains("db1"))
-        } finally {
-          catalog.reset()
-        }
-      }
-    }
-  }
-
   test("SPARK-24544: test print actual failure cause when look up function failed") {
     withBasicCatalog { catalog =>
       val cause = intercept[NoSuchFunctionException] {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index b619e1f..0430929 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2033,18 +2033,21 @@ class DDLParserSuite extends AnalysisTest {
   }
 
   test("DESCRIBE FUNCTION") {
+    def createFuncPlan(name: Seq[String]): UnresolvedFunc = {
+      UnresolvedFunc(name, "DESCRIBE FUNCTION", false, None)
+    }
     comparePlans(
       parsePlan("DESC FUNCTION a"),
-      DescribeFunction(UnresolvedFunc(Seq("a")), false))
+      DescribeFunction(createFuncPlan(Seq("a")), false))
     comparePlans(
       parsePlan("DESCRIBE FUNCTION a"),
-      DescribeFunction(UnresolvedFunc(Seq("a")), false))
+      DescribeFunction(createFuncPlan(Seq("a")), false))
     comparePlans(
       parsePlan("DESCRIBE FUNCTION a.b.c"),
-      DescribeFunction(UnresolvedFunc(Seq("a", "b", "c")), false))
+      DescribeFunction(createFuncPlan(Seq("a", "b", "c")), false))
     comparePlans(
       parsePlan("DESCRIBE FUNCTION EXTENDED a.b.c"),
-      DescribeFunction(UnresolvedFunc(Seq("a", "b", "c")), true))
+      DescribeFunction(createFuncPlan(Seq("a", "b", "c")), true))
   }
 
   test("SHOW FUNCTIONS") {
@@ -2092,31 +2095,16 @@ class DDLParserSuite extends AnalysisTest {
       ShowFunctions(UnresolvedNamespace(Seq("a", "b")), true, true, Some("c")))
   }
 
-  test("DROP FUNCTION") {
-    comparePlans(
-      parsePlan("DROP FUNCTION a"),
-      DropFunction(UnresolvedFunc(Seq("a")), false, false))
-    comparePlans(
-      parsePlan("DROP FUNCTION a.b.c"),
-      DropFunction(UnresolvedFunc(Seq("a", "b", "c")), false, false))
-    comparePlans(
-      parsePlan("DROP TEMPORARY FUNCTION a.b.c"),
-      DropFunction(UnresolvedFunc(Seq("a", "b", "c")), false, true))
-    comparePlans(
-      parsePlan("DROP FUNCTION IF EXISTS a.b.c"),
-      DropFunction(UnresolvedFunc(Seq("a", "b", "c")), true, false))
-    comparePlans(
-      parsePlan("DROP TEMPORARY FUNCTION IF EXISTS a.b.c"),
-      DropFunction(UnresolvedFunc(Seq("a", "b", "c")), true, true))
-  }
-
   test("REFRESH FUNCTION") {
+    def createFuncPlan(name: Seq[String]): UnresolvedFunc = {
+      UnresolvedFunc(name, "REFRESH FUNCTION", true, None)
+    }
     parseCompare("REFRESH FUNCTION c",
-      RefreshFunction(UnresolvedFunc(Seq("c"))))
+      RefreshFunction(createFuncPlan(Seq("c"))))
     parseCompare("REFRESH FUNCTION b.c",
-      RefreshFunction(UnresolvedFunc(Seq("b", "c"))))
+      RefreshFunction(createFuncPlan(Seq("b", "c"))))
     parseCompare("REFRESH FUNCTION a.b.c",
-      RefreshFunction(UnresolvedFunc(Seq("a", "b", "c"))))
+      RefreshFunction(createFuncPlan(Seq("a", "b", "c"))))
   }
 
   test("CREATE INDEX") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 7e5e2ba..aaf2ead 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1, DataSource}
 import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.internal.connector.V1Function
 import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
 
 /**
@@ -360,15 +361,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
           replace = replace,
           viewType = PersistedView)
       } else {
-        throw QueryCompilationErrors.sqlOnlySupportedWithV1TablesError("CREATE VIEW")
+        throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "views")
       }
 
     case ShowViews(ns: ResolvedNamespace, pattern, output) =>
       ns match {
         case DatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output)
         case _ =>
-          throw QueryCompilationErrors.sqlOnlySupportedWithV1CatalogError(
-            "SHOW VIEWS", ns.catalog.name)
+          throw QueryCompilationErrors.missingCatalogAbilityError(ns.catalog, "views")
       }
 
     // If target is view, force use v1 command
@@ -379,21 +379,39 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         if conf.useV1Command =>
       ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey, output)
 
-    case DescribeFunction(ResolvedFunc(identifier), extended) =>
-      DescribeFunctionCommand(identifier.asFunctionIdentifier, extended)
+    case DescribeFunction(ResolvedNonPersistentFunc(_, V1Function(info)), extended) =>
+      DescribeFunctionCommand(info, extended)
+
+    case DescribeFunction(ResolvedPersistentFunc(catalog, _, func), extended) =>
+      if (isSessionCatalog(catalog)) {
+        DescribeFunctionCommand(func.asInstanceOf[V1Function].info, extended)
+      } else {
+        throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "functions")
+      }
 
     case ShowFunctions(ns: ResolvedNamespace, userScope, systemScope, pattern, output) =>
       ns match {
         case DatabaseInSessionCatalog(db) =>
           ShowFunctionsCommand(db, pattern, userScope, systemScope, output)
         case _ =>
-          throw QueryCompilationErrors.sqlOnlySupportedWithV1CatalogError(
-            "SHOW FUNCTIONS", ns.catalog.name)
+          throw QueryCompilationErrors.missingCatalogAbilityError(ns.catalog, "functions")
+      }
+
+    case DropFunction(ResolvedPersistentFunc(catalog, identifier, _), ifExists) =>
+      if (isSessionCatalog(catalog)) {
+        val funcIdentifier = identifier.asFunctionIdentifier
+        DropFunctionCommand(funcIdentifier.database, funcIdentifier.funcName, ifExists, false)
+      } else {
+        throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "DROP FUNCTION")
       }
 
-    case DropFunction(ResolvedFunc(identifier), ifExists, isTemp) =>
-      val funcIdentifier = identifier.asFunctionIdentifier
-      DropFunctionCommand(funcIdentifier.database, funcIdentifier.funcName, ifExists, isTemp)
+    case RefreshFunction(ResolvedPersistentFunc(catalog, identifier, _)) =>
+      if (isSessionCatalog(catalog)) {
+        val funcIdentifier = identifier.asFunctionIdentifier
+        RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName)
+      } else {
+        throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION")
+      }
 
     case CreateFunction(ResolvedDBObjectName(catalog, nameParts),
         className, resources, ignoreIfExists, replace) =>
@@ -414,13 +432,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
           ignoreIfExists,
           replace)
       } else {
-        throw QueryCompilationErrors.functionUnsupportedInV2CatalogError()
+        throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "CREATE FUNCTION")
       }
-
-    case RefreshFunction(ResolvedFunc(identifier)) =>
-      // Fallback to v1 command
-      val funcIdentifier = identifier.asFunctionIdentifier
-      RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName)
   }
 
   private def constructV1TableCmd(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 80739e5..fed02dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -27,7 +27,7 @@ import org.antlr.v4.runtime.{ParserRuleContext, Token}
 import org.antlr.v4.runtime.tree.TerminalNode
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedDBObjectName}
+import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedDBObjectName, UnresolvedFunc}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser._
@@ -579,6 +579,38 @@ class SparkSqlAstBuilder extends AstBuilder {
     }
   }
 
+  /**
+   * Create a DROP FUNCTION statement.
+   *
+   * For example:
+   * {{{
+   *   DROP [TEMPORARY] FUNCTION [IF EXISTS] function;
+   * }}}
+   */
+  override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = withOrigin(ctx) {
+    val functionName = visitMultipartIdentifier(ctx.multipartIdentifier)
+    val isTemp = ctx.TEMPORARY != null
+    if (isTemp) {
+      if (functionName.length > 1) {
+        throw QueryParsingErrors.invalidNameForDropTempFunc(functionName, ctx)
+      }
+      DropFunctionCommand(
+        databaseName = None,
+        functionName = functionName.head,
+        ifExists = ctx.EXISTS != null,
+        isTemp = true)
+    } else {
+      val hintStr = "Please use fully qualified identifier to drop the persistent function."
+      DropFunction(
+        UnresolvedFunc(
+          functionName,
+          "DROP FUNCTION",
+          requirePersistent = true,
+          funcTypeMismatchHint = Some(hintStr)),
+        ctx.EXISTS != null)
+    }
+  }
+
   private def toStorageFormat(
       location: Option[String],
       maybeSerdeInfo: Option[SerdeInfo],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index b989b33..f2c3bfd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -17,13 +17,11 @@
 
 package org.apache.spark.sql.execution.command
 
-import java.util.Locale
-
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.FunctionIdentifier
-import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException}
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource}
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo}
 import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -94,7 +92,7 @@ case class CreateFunctionCommand(
  * }}}
  */
 case class DescribeFunctionCommand(
-    functionName: FunctionIdentifier,
+    info: ExpressionInfo,
     isExtended: Boolean) extends LeafRunnableCommand {
 
   override val output: Seq[Attribute] = {
@@ -103,48 +101,19 @@ case class DescribeFunctionCommand(
   }
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    // Hard code "<>", "!=", "between", "case", and "||"
-    // for now as there is no corresponding functions.
-    functionName.funcName.toLowerCase(Locale.ROOT) match {
-      case "<>" =>
-        Row(s"Function: $functionName") ::
-          Row("Usage: expr1 <> expr2 - " +
-            "Returns true if `expr1` is not equal to `expr2`.") :: Nil
-      case "!=" =>
-        Row(s"Function: $functionName") ::
-          Row("Usage: expr1 != expr2 - " +
-            "Returns true if `expr1` is not equal to `expr2`.") :: Nil
-      case "between" =>
-        Row("Function: between") ::
-          Row("Usage: expr1 [NOT] BETWEEN expr2 AND expr3 - " +
-            "evaluate if `expr1` is [not] in between `expr2` and `expr3`.") :: Nil
-      case "case" =>
-        Row("Function: case") ::
-          Row("Usage: CASE expr1 WHEN expr2 THEN expr3 " +
-            "[WHEN expr4 THEN expr5]* [ELSE expr6] END - " +
-            "When `expr1` = `expr2`, returns `expr3`; " +
-            "when `expr1` = `expr4`, return `expr5`; else return `expr6`.") :: Nil
-      case "||" =>
-        Row("Function: ||") ::
-          Row("Usage: expr1 || expr2 - Returns the concatenation of `expr1` and `expr2`.") :: Nil
-      case _ =>
-        try {
-          val info = sparkSession.sessionState.catalog.lookupFunctionInfo(functionName)
-          val name = if (info.getDb != null) info.getDb + "." + info.getName else info.getName
-          val result =
-            Row(s"Function: $name") ::
-              Row(s"Class: ${info.getClassName}") ::
-              Row(s"Usage: ${info.getUsage}") :: Nil
+    val name = if (info.getDb != null) info.getDb + "." + info.getName else info.getName
+    val result = if (info.getClassName != null) {
+      Row(s"Function: $name") ::
+        Row(s"Class: ${info.getClassName}") ::
+        Row(s"Usage: ${info.getUsage}") :: Nil
+    } else {
+      Row(s"Function: $name") :: Row(s"Usage: ${info.getUsage}") :: Nil
+    }
 
-          if (isExtended) {
-            result :+
-              Row(s"Extended Usage:${info.getExtended}")
-          } else {
-            result
-          }
-        } catch {
-          case _: NoSuchFunctionException => Seq(Row(s"Function: $functionName not found."))
-        }
+    if (isExtended) {
+      result :+ Row(s"Extended Usage:${info.getExtended}")
+    } else {
+      result
     }
   }
 }
@@ -165,11 +134,8 @@ case class DropFunctionCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     if (isTemp) {
-      if (databaseName.isDefined) {
-        throw QueryCompilationErrors.specifyingDBInDropTempFuncError(databaseName.get)
-      }
       if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) {
-        throw QueryCompilationErrors.cannotDropNativeFuncError(functionName)
+        throw QueryCompilationErrors.cannotDropBuiltinFuncError(functionName)
       }
       catalog.dropTempFunction(functionName, ifExists)
     } else {
@@ -216,7 +182,8 @@ case class ShowFunctionsCommand(
     // only show when showSystemFunctions=true
     if (showSystemFunctions) {
       (functionNames ++
-        StringUtils.filterPattern(FunctionsCommand.virtualOperators, pattern.getOrElse("*")))
+        StringUtils.filterPattern(
+          FunctionRegistry.builtinOperators.keys.toSeq, pattern.getOrElse("*")))
         .sorted.map(Row(_))
     } else {
       functionNames.sorted.map(Row(_))
@@ -263,9 +230,3 @@ case class RefreshFunctionCommand(
     Seq.empty[Row]
   }
 }
-
-object FunctionsCommand {
-  // operators that do not have corresponding functions.
-  // They should be handled `DescribeFunctionCommand`, `ShowFunctionsCommand`
-  val virtualOperators = Seq("!=", "<>", "between", "case", "||")
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index d5547c1..906107a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -23,14 +23,16 @@ import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange, V1Table}
 import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
 import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.internal.connector.V1Function
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -38,7 +40,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
  * A [[TableCatalog]] that translates calls to the v1 SessionCatalog.
  */
 class V2SessionCatalog(catalog: SessionCatalog)
-  extends TableCatalog with SupportsNamespaces with SQLConfHelper {
+  extends TableCatalog with FunctionCatalog with SupportsNamespaces with SQLConfHelper {
   import V2SessionCatalog._
 
   override val defaultNamespace: Array[String] = Array("default")
@@ -61,14 +63,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
   }
 
   override def loadTable(ident: Identifier): Table = {
-    val catalogTable = try {
-      catalog.getTableMetadata(ident.asTableIdentifier)
-    } catch {
-      case _: NoSuchTableException =>
-        throw QueryCompilationErrors.noSuchTableError(ident)
-    }
-
-    V1Table(catalogTable)
+    V1Table(catalog.getTableMetadata(ident.asTableIdentifier))
   }
 
   override def loadTable(ident: Identifier, timestamp: Long): Table = {
@@ -213,6 +208,15 @@ class V2SessionCatalog(catalog: SessionCatalog)
           throw QueryCompilationErrors.requiresSinglePartNamespaceError(other)
       }
     }
+
+    def asFunctionIdentifier: FunctionIdentifier = {
+      ident.namespace match {
+        case Array(db) =>
+          FunctionIdentifier(ident.name, Some(db))
+        case other =>
+          throw QueryCompilationErrors.requiresSinglePartNamespaceError(other)
+      }
+    }
   }
 
   override def namespaceExists(namespace: Array[String]): Boolean = namespace match {
@@ -302,6 +306,26 @@ class V2SessionCatalog(catalog: SessionCatalog)
     catalog.isTempView(ident.namespace() :+ ident.name())
   }
 
+  override def loadFunction(ident: Identifier): UnboundFunction = {
+    V1Function(catalog.lookupPersistentFunction(ident.asFunctionIdentifier))
+  }
+
+  override def listFunctions(namespace: Array[String]): Array[Identifier] = {
+    namespace match {
+      case Array(db) =>
+        catalog.listFunctions(db).filter(_._2 == "USER").map { case (funcIdent, _) =>
+          assert(funcIdent.database.isDefined)
+          Identifier.of(Array(funcIdent.database.get), funcIdent.identifier)
+        }.toArray
+      case _ =>
+        throw QueryCompilationErrors.noSuchNamespaceError(namespace)
+    }
+  }
+
+  override def functionExists(ident: Identifier): Boolean = {
+    catalog.isPersistentFunction(ident.asFunctionIdentifier)
+  }
+
   override def toString: String = s"V2SessionCatalog($name)"
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out
index 230393f..12450fa 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out
@@ -1324,7 +1324,7 @@ select interval (-30) day
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: 'interval'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7
+Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
 
 
 -- !query
@@ -1333,7 +1333,7 @@ select interval (a + 1) day
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: 'interval'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7
+Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
 
 
 -- !query
@@ -1356,7 +1356,7 @@ select interval (-30) days
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: 'interval'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7
+Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
 
 
 -- !query
@@ -1365,7 +1365,7 @@ select interval (a + 1) days
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: 'interval'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7
+Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
index 12dcf33..401d684 100644
--- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
@@ -133,7 +133,7 @@ select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 29
+Undefined function: random_not_exist_func. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.random_not_exist_func.; line 1 pos 29
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/interval.sql.out b/sql/core/src/test/resources/sql-tests/results/interval.sql.out
index c9beb76..f32836e 100644
--- a/sql/core/src/test/resources/sql-tests/results/interval.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/interval.sql.out
@@ -1317,7 +1317,7 @@ select interval (-30) day
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: 'interval'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7
+Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
 
 
 -- !query
@@ -1326,7 +1326,7 @@ select interval (a + 1) day
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: 'interval'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7
+Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
 
 
 -- !query
@@ -1349,7 +1349,7 @@ select interval (-30) days
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: 'interval'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7
+Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
 
 
 -- !query
@@ -1358,7 +1358,7 @@ select interval (a + 1) days
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: 'interval'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7
+Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
index 2872f1b..e7891b5 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
@@ -133,7 +133,7 @@ select udf(a), udf(b) from values ("one", random_not_exist_func(1)), ("two", 2)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 42
+Undefined function: random_not_exist_func. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.random_not_exist_func.; line 1 pos 42
 
 
 -- !query
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index fd6f679..d7f18ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -28,6 +28,7 @@ import org.apache.commons.io.FileUtils
 
 import org.apache.spark.{AccumulatorSuite, SparkException}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.expressions.GenericRow
 import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, Partial}
 import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, NestedColumnAliasingSuite}
@@ -37,7 +38,7 @@ import org.apache.spark.sql.execution.{CommandResultExec, UnionExec}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.aggregate._
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-import org.apache.spark.sql.execution.command.{DataWritingCommandExec, FunctionsCommand}
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
@@ -77,7 +78,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
     def getFunctions(pattern: String): Seq[Row] = {
       StringUtils.filterPattern(
         spark.sessionState.catalog.listFunctions("default").map(_._1.funcName)
-        ++ FunctionsCommand.virtualOperators, pattern)
+        ++ FunctionRegistry.builtinOperators.keys, pattern)
         .map(Row(_))
     }
 
@@ -126,7 +127,9 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
 
     checkKeywordsNotExist(sql("describe functioN Upper"), "Extended Usage")
 
-    checkKeywordsExist(sql("describe functioN abcadf"), "Function: abcadf not found.")
+    val e = intercept[AnalysisException](sql("describe functioN abcadf"))
+    assert(e.message.contains("Undefined function: abcadf. This function is neither a " +
+      "built-in/temporary function, nor a persistent function"))
   }
 
   test("SPARK-34678: describe functions for table-valued functions") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
index d5417be..3277cd6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
@@ -51,10 +51,73 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
     withSQLConf("spark.sql.catalog.testcat" -> classOf[BasicInMemoryTableCatalog].getName) {
       assert(intercept[AnalysisException](
         sql("SELECT testcat.strlen('abc')").collect()
-      ).getMessage.contains("is not a FunctionCatalog"))
+      ).getMessage.contains("Catalog testcat does not support functions"))
     }
   }
 
+  test("DESCRIBE FUNCTION: only support session catalog") {
+    addFunction(Identifier.of(Array.empty, "abc"), new JavaStrLen(new JavaStrLenNoImpl))
+
+    val e = intercept[AnalysisException] {
+      sql("DESCRIBE FUNCTION testcat.abc")
+    }
+    assert(e.message.contains("Catalog testcat does not support functions"))
+
+    val e1 = intercept[AnalysisException] {
+      sql("DESCRIBE FUNCTION default.ns1.ns2.fun")
+    }
+    assert(e1.message.contains("requires a single-part namespace"))
+  }
+
+  test("SHOW FUNCTIONS: only support session catalog") {
+    addFunction(Identifier.of(Array.empty, "abc"), new JavaStrLen(new JavaStrLenNoImpl))
+
+    val e = intercept[AnalysisException] {
+      sql(s"SHOW FUNCTIONS LIKE testcat.abc")
+    }
+    assert(e.message.contains("Catalog testcat does not support functions"))
+  }
+
+  test("DROP FUNCTION: only support session catalog") {
+    addFunction(Identifier.of(Array.empty, "abc"), new JavaStrLen(new JavaStrLenNoImpl))
+
+    val e = intercept[AnalysisException] {
+      sql("DROP FUNCTION testcat.abc")
+    }
+    assert(e.message.contains("Catalog testcat does not support DROP FUNCTION"))
+
+    val e1 = intercept[AnalysisException] {
+      sql("DROP FUNCTION default.ns1.ns2.fun")
+    }
+    assert(e1.message.contains("requires a single-part namespace"))
+  }
+
+  test("CREATE FUNCTION: only support session catalog") {
+    val e = intercept[AnalysisException] {
+      sql("CREATE FUNCTION testcat.ns1.ns2.fun as 'f'")
+    }
+    assert(e.message.contains("Catalog testcat does not support CREATE FUNCTION"))
+
+    val e1 = intercept[AnalysisException] {
+      sql("CREATE FUNCTION default.ns1.ns2.fun as 'f'")
+    }
+    assert(e1.message.contains("requires a single-part namespace"))
+  }
+
+  test("REFRESH FUNCTION: only support session catalog") {
+    addFunction(Identifier.of(Array.empty, "abc"), new JavaStrLen(new JavaStrLenNoImpl))
+
+    val e = intercept[AnalysisException] {
+      sql("REFRESH FUNCTION testcat.abc")
+    }
+    assert(e.message.contains("Catalog testcat does not support REFRESH FUNCTION"))
+
+    val e1 = intercept[AnalysisException] {
+      sql("REFRESH FUNCTION default.ns1.ns2.fun")
+    }
+    assert(e1.message.contains("requires a single-part namespace"))
+  }
+
   test("built-in with non-function catalog should still work") {
     withSQLConf(SQLConf.DEFAULT_CATALOG.key -> "testcat",
       "spark.sql.catalog.testcat" -> classOf[BasicInMemoryTableCatalog].getName) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index bc5a186..1fcfd94 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1089,7 +1089,7 @@ class DataSourceV2SQLSuite
       sql("SHOW VIEWS FROM testcat")
     }
 
-    assert(exception.getMessage.contains("Catalog testcat does not support SHOW VIEWS"))
+    assert(exception.getMessage.contains("Catalog testcat does not support views"))
   }
 
   test("create/replace/alter table - reserved properties") {
@@ -2010,64 +2010,7 @@ class DataSourceV2SQLSuite
     val e = intercept[AnalysisException] {
       sql(s"CREATE VIEW $v AS SELECT 1")
     }
-    assert(e.message.contains("CREATE VIEW is only supported with v1 tables"))
-  }
-
-  test("DESCRIBE FUNCTION: only support session catalog") {
-    val e = intercept[AnalysisException] {
-      sql("DESCRIBE FUNCTION testcat.ns1.ns2.fun")
-    }
-    assert(e.message.contains("function is only supported in v1 catalog"))
-
-    val e1 = intercept[AnalysisException] {
-      sql("DESCRIBE FUNCTION default.ns1.ns2.fun")
-    }
-    assert(e1.message.contains("requires a single-part namespace"))
-  }
-
-  test("SHOW FUNCTIONS not valid v1 namespace") {
-    val function = "testcat.ns1.ns2.fun"
-
-    val e = intercept[AnalysisException] {
-      sql(s"SHOW FUNCTIONS LIKE $function")
-    }
-    assert(e.getMessage.contains("Catalog testcat does not support SHOW FUNCTIONS"))
-  }
-
-  test("DROP FUNCTION: only support session catalog") {
-    val e = intercept[AnalysisException] {
-      sql("DROP FUNCTION testcat.ns1.ns2.fun")
-    }
-    assert(e.message.contains("function is only supported in v1 catalog"))
-
-    val e1 = intercept[AnalysisException] {
-      sql("DROP FUNCTION default.ns1.ns2.fun")
-    }
-    assert(e1.message.contains("requires a single-part namespace"))
-  }
-
-  test("CREATE FUNCTION: only support session catalog") {
-    val e = intercept[AnalysisException] {
-      sql("CREATE FUNCTION testcat.ns1.ns2.fun as 'f'")
-    }
-    assert(e.message.contains("function is only supported in v1 catalog"))
-
-    val e1 = intercept[AnalysisException] {
-      sql("CREATE FUNCTION default.ns1.ns2.fun as 'f'")
-    }
-    assert(e1.message.contains("requires a single-part namespace"))
-  }
-
-  test("REFRESH FUNCTION: only support session catalog") {
-    val e = intercept[AnalysisException] {
-      sql("REFRESH FUNCTION testcat.ns1.ns2.fun")
-    }
-    assert(e.message.contains("function is only supported in v1 catalog"))
-
-    val e1 = intercept[AnalysisException] {
-      sql("REFRESH FUNCTION default.ns1.ns2.fun")
-    }
-    assert(e1.message.contains("requires a single-part namespace"))
+    assert(e.message.contains("Catalog testcat does not support views"))
   }
 
   test("global temp view should not be masked by v2 catalog") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
index a5d60f6..4d24b26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
 import java.util.Locale
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, UnresolvedAttribute, UnresolvedDBObjectName}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, UnresolvedAttribute, UnresolvedDBObjectName, UnresolvedFunc}
 import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, FileResource, FunctionResource, JarResource}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans
@@ -462,6 +462,33 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
       "Operation not allowed: CREATE FUNCTION with resource type 'other'")
   }
 
+  test("DROP FUNCTION") {
+    def createFuncPlan(name: Seq[String]): UnresolvedFunc = {
+      UnresolvedFunc(name, "DROP FUNCTION", true,
+        Some("Please use fully qualified identifier to drop the persistent function."))
+    }
+    comparePlans(
+      parser.parsePlan("DROP FUNCTION a"),
+      DropFunction(createFuncPlan(Seq("a")), false))
+    comparePlans(
+      parser.parsePlan("DROP FUNCTION a.b.c"),
+      DropFunction(createFuncPlan(Seq("a", "b", "c")), false))
+    comparePlans(
+      parser.parsePlan("DROP TEMPORARY FUNCTION a"),
+      DropFunctionCommand(None, "a", false, true))
+    comparePlans(
+      parser.parsePlan("DROP FUNCTION IF EXISTS a.b.c"),
+      DropFunction(createFuncPlan(Seq("a", "b", "c")), true))
+    comparePlans(
+      parser.parsePlan("DROP TEMPORARY FUNCTION IF EXISTS a"),
+      DropFunctionCommand(None, "a", true, true))
+
+    intercept("DROP TEMPORARY FUNCTION a.b",
+      "DROP TEMPORARY FUNCTION requires a single part name")
+    intercept("DROP TEMPORARY FUNCTION IF EXISTS a.b",
+      "DROP TEMPORARY FUNCTION requires a single part name")
+  }
+
   test("SPARK-32374: create temporary view with properties not allowed") {
     assertUnsupported(
       sql = """
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 4380618..00d1ed2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.{SparkException, SparkFiles}
 import org.apache.spark.internal.config
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException, TableFunctionRegistry, TempTableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TableFunctionRegistry, TempTableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
@@ -1246,24 +1246,24 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     assert(getMetadata("col1").getString("comment") == "this is col1")
   }
 
-  test("drop build-in function") {
+  test("drop built-in function") {
     Seq("true", "false").foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {
         // partition to add already exists
         var e = intercept[AnalysisException] {
           sql("DROP TEMPORARY FUNCTION year")
         }
-        assert(e.getMessage.contains("Cannot drop native function 'year'"))
+        assert(e.getMessage.contains("Cannot drop built-in function 'year'"))
 
         e = intercept[AnalysisException] {
           sql("DROP TEMPORARY FUNCTION YeAr")
         }
-        assert(e.getMessage.contains("Cannot drop native function 'YeAr'"))
+        assert(e.getMessage.contains("Cannot drop built-in function 'YeAr'"))
 
         e = intercept[AnalysisException] {
           sql("DROP TEMPORARY FUNCTION `YeAr`")
         }
-        assert(e.getMessage.contains("Cannot drop native function 'YeAr'"))
+        assert(e.getMessage.contains("Cannot drop built-in function 'YeAr'"))
       }
     }
   }
@@ -1468,7 +1468,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     withUserDefinedFunction("add_one" -> true) {
       val numFunctions = FunctionRegistry.functionSet.size.toLong +
         TableFunctionRegistry.functionSet.size.toLong +
-        FunctionsCommand.virtualOperators.size.toLong
+        FunctionRegistry.builtinOperators.size.toLong
       assert(sql("show functions").count() === numFunctions)
       assert(sql("show system functions").count() === numFunctions)
       assert(sql("show all functions").count() === numFunctions)
@@ -2288,25 +2288,26 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     val msg = intercept[AnalysisException] {
       sql("REFRESH FUNCTION md5")
     }.getMessage
-    assert(msg.contains("Cannot refresh built-in function"))
-    val msg2 = intercept[NoSuchFunctionException] {
+    assert(msg.contains(
+      "md5 is a built-in/temporary function. 'REFRESH FUNCTION' expects a persistent function"))
+    val msg2 = intercept[AnalysisException] {
       sql("REFRESH FUNCTION default.md5")
     }.getMessage
-    assert(msg2.contains(s"Undefined function: 'md5'. This function is neither a registered " +
-      s"temporary function nor a permanent function registered in the database 'default'."))
+    assert(msg2.contains(s"Undefined function: default.md5"))
 
     withUserDefinedFunction("func1" -> true) {
       sql("CREATE TEMPORARY FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'")
       val msg = intercept[AnalysisException] {
         sql("REFRESH FUNCTION func1")
       }.getMessage
-      assert(msg.contains("Cannot refresh temporary function"))
+      assert(msg.contains("" +
+        "func1 is a built-in/temporary function. 'REFRESH FUNCTION' expects a persistent function"))
     }
 
     withUserDefinedFunction("func1" -> false) {
       val func = FunctionIdentifier("func1", Some("default"))
       assert(!spark.sessionState.catalog.isRegisteredFunction(func))
-      intercept[NoSuchFunctionException] {
+      intercept[AnalysisException] {
         sql("REFRESH FUNCTION func1")
       }
       assert(!spark.sessionState.catalog.isRegisteredFunction(func))
@@ -2315,16 +2316,17 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
       assert(!spark.sessionState.catalog.isRegisteredFunction(func))
       sql("REFRESH FUNCTION func1")
       assert(spark.sessionState.catalog.isRegisteredFunction(func))
-      val msg = intercept[NoSuchFunctionException] {
+      val msg = intercept[AnalysisException] {
         sql("REFRESH FUNCTION func2")
       }.getMessage
-      assert(msg.contains(s"Undefined function: 'func2'. This function is neither a registered " +
-        s"temporary function nor a permanent function registered in the database 'default'."))
+      assert(msg.contains(s"Undefined function: func2. This function is neither a " +
+        "built-in/temporary function, nor a persistent function that is qualified as " +
+        "spark_catalog.default.func2"))
       assert(spark.sessionState.catalog.isRegisteredFunction(func))
 
       spark.sessionState.catalog.externalCatalog.dropFunction("default", "func1")
       assert(spark.sessionState.catalog.isRegisteredFunction(func))
-      intercept[NoSuchFunctionException] {
+      intercept[AnalysisException] {
         sql("REFRESH FUNCTION func1")
       }
       assert(!spark.sessionState.catalog.isRegisteredFunction(func))
@@ -2348,7 +2350,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
       val msg = intercept[AnalysisException] {
         sql("REFRESH FUNCTION rand")
       }.getMessage
-      assert(msg.contains("Cannot refresh built-in function"))
+      assert(msg.contains(
+        "rand is a built-in/temporary function. 'REFRESH FUNCTION' expects a persistent function"))
       assert(!spark.sessionState.catalog.isRegisteredFunction(rand))
       sql("REFRESH FUNCTION default.rand")
       assert(spark.sessionState.catalog.isRegisteredFunction(rand))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index 2e6cfc5..7465ab6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -224,8 +224,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
       catalog.loadTable(testIdent)
     }
 
-    assert(exc.message.contains(testIdent.quoted))
-    assert(exc.message.contains("not found"))
+    assert(exc.message.contains("Table or view 'test_table' not found in database 'db'"))
   }
 
   test("invalidateTable") {
@@ -740,8 +739,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
       catalog.renameTable(testIdent, testIdentNew)
     }
 
-    assert(exc.message.contains(testIdent.quoted))
-    assert(exc.message.contains("not found"))
+    assert(exc.message.contains("Table or view 'test_table' not found in database 'db'"))
   }
 
   test("renameTable: fail if new table name already exists") {
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index c263932..37efb2d 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -143,6 +143,10 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     // Hive returns null for minute('2015-03-18')
     "udf_minute",
 
+    // Hive DESC FUNCTION returns a string to indicate undefined function, while Spark triggers a
+    // query error.
+    "udf_index",
+    "udf_stddev_pop",
 
     // Cant run without local map/reduce.
     "index_auto_update",
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 895709d..fc2501c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -34,8 +34,8 @@ import org.apache.hadoop.io.{LongWritable, Writable}
 
 import org.apache.spark.{SparkFiles, TestUtils}
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.plans.logical.Project
-import org.apache.spark.sql.execution.command.FunctionsCommand
 import org.apache.spark.sql.functions.max
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -563,7 +563,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
           sql("SELECT testUDFToListInt(s) FROM inputTable"),
           Seq(Row(Seq(1, 2, 3))))
         assert(sql("show functions").count() ==
-          numFunc + FunctionsCommand.virtualOperators.size + 1)
+          numFunc + FunctionRegistry.builtinOperators.size + 1)
         assert(spark.catalog.listFunctions().count() == numFunc + 1)
 
         withDatabase("db2") {
@@ -600,8 +600,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
           val message = intercept[AnalysisException] {
             sql("SELECT dAtABaSe1.unknownFunc(1)")
           }.getMessage
-          assert(message.contains("Undefined function: 'unknownFunc'") &&
-            message.contains("nor a permanent function registered in the database 'dAtABaSe1'"))
+          assert(message.contains("Undefined function: dAtABaSe1.unknownFunc"))
         }
       }
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index ba8e6cd..1829f38 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.TestUncaughtExceptionHandler
 import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
-import org.apache.spark.sql.execution.command.{FunctionsCommand, LoadDataCommand}
+import org.apache.spark.sql.execution.command.LoadDataCommand
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
@@ -202,7 +202,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
       assert(allFunctions.contains(f))
     }
 
-    FunctionsCommand.virtualOperators.foreach { f =>
+    FunctionRegistry.builtinOperators.keys.foreach { f =>
       assert(allFunctions.contains(f))
     }
 
@@ -263,8 +263,8 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
     checkKeywordsNotExist(sql("describe functioN Upper"),
       "Extended Usage")
 
-    checkKeywordsExist(sql("describe functioN abcadf"),
-      "Function: abcadf not found.")
+    val e = intercept[AnalysisException](sql("describe functioN abcadf"))
+    assert(e.message.contains("Undefined function: abcadf"))
 
     checkKeywordsExist(sql("describe functioN  `~`"),
       "Function: ~",

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org