You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/28 08:43:40 UTC

[spark] branch branch-3.4 updated: [SPARK-42928][SQL] Make resolvePersistentFunction synchronized

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 0620b56764f [SPARK-42928][SQL] Make resolvePersistentFunction synchronized
0620b56764f is described below

commit 0620b56764fab4e4d57af10907a471a2ee0970ce
Author: allisonwang-db <al...@databricks.com>
AuthorDate: Tue Mar 28 16:43:02 2023 +0800

    [SPARK-42928][SQL] Make resolvePersistentFunction synchronized
    
    ### What changes were proposed in this pull request?
    
    This PR makes the function `resolvePersistentFunctionInternal` synchronized.
    
    ### Why are the changes needed?
    
    To make function resolution thread-safe.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing UTs.
    
    Closes #40557 from allisonwang-db/SPARK-42928-sync-func.
    
    Authored-by: allisonwang-db <al...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit f62ffde045771b3275acf3dfb24573804e7daf93)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/catalog/SessionCatalog.scala      | 56 ++++++++++++----------
 1 file changed, 31 insertions(+), 25 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 4f7196e2dc9..cd4b4cfaf6b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1720,31 +1720,37 @@ class SessionCatalog(
       arguments: Seq[Expression],
       registry: FunctionRegistryBase[T],
       createFunctionBuilder: CatalogFunction => FunctionRegistryBase[T]#FunctionBuilder): T = {
-    val qualifiedIdent = qualifyIdentifier(name)
-    val db = qualifiedIdent.database.get
-    val funcName = qualifiedIdent.funcName
-    if (registry.functionExists(qualifiedIdent)) {
-      // This function has been already loaded into the function registry.
-      registry.lookupFunction(qualifiedIdent, arguments)
-    } else {
-      // The function has not been loaded to the function registry, which means
-      // that the function is a persistent function (if it actually has been registered
-      // in the metastore). We need to first put the function in the function registry.
-      val catalogFunction = externalCatalog.getFunction(db, funcName)
-      loadFunctionResources(catalogFunction.resources)
-      // Please note that qualifiedName is provided by the user. However,
-      // catalogFunction.identifier.unquotedString is returned by the underlying
-      // catalog. So, it is possible that qualifiedName is not exactly the same as
-      // catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
-      // At here, we preserve the input from the user.
-      val funcMetadata = catalogFunction.copy(identifier = qualifiedIdent)
-      registerFunction(
-        funcMetadata,
-        overrideIfExists = false,
-        registry = registry,
-        functionBuilder = createFunctionBuilder(funcMetadata))
-      // Now, we need to create the Expression.
-      registry.lookupFunction(qualifiedIdent, arguments)
+    // `synchronized` is used to prevent multiple threads from concurrently resolving the
+    // same function that has not yet been loaded into the function registry. This is needed
+    // because calling `registerFunction` twice with `overrideIfExists = false` can lead to
+    // a FunctionAlreadyExistsException.
+    synchronized {
+      val qualifiedIdent = qualifyIdentifier(name)
+      val db = qualifiedIdent.database.get
+      val funcName = qualifiedIdent.funcName
+      if (registry.functionExists(qualifiedIdent)) {
+        // This function has been already loaded into the function registry.
+        registry.lookupFunction(qualifiedIdent, arguments)
+      } else {
+        // The function has not been loaded to the function registry, which means
+        // that the function is a persistent function (if it actually has been registered
+        // in the metastore). We need to first put the function in the function registry.
+        val catalogFunction = externalCatalog.getFunction(db, funcName)
+        loadFunctionResources(catalogFunction.resources)
+        // Please note that qualifiedName is provided by the user. However,
+        // catalogFunction.identifier.unquotedString is returned by the underlying
+        // catalog. So, it is possible that qualifiedName is not exactly the same as
+        // catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
+        // At here, we preserve the input from the user.
+        val funcMetadata = catalogFunction.copy(identifier = qualifiedIdent)
+        registerFunction(
+          funcMetadata,
+          overrideIfExists = false,
+          registry = registry,
+          functionBuilder = createFunctionBuilder(funcMetadata))
+        // Now, we need to create the Expression.
+        registry.lookupFunction(qualifiedIdent, arguments)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org