You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/28 08:43:40 UTC
[spark] branch branch-3.4 updated: [SPARK-42928][SQL] Make resolvePersistentFunction synchronized
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 0620b56764f [SPARK-42928][SQL] Make resolvePersistentFunction synchronized
0620b56764f is described below
commit 0620b56764fab4e4d57af10907a471a2ee0970ce
Author: allisonwang-db <al...@databricks.com>
AuthorDate: Tue Mar 28 16:43:02 2023 +0800
[SPARK-42928][SQL] Make resolvePersistentFunction synchronized
### What changes were proposed in this pull request?
This PR makes the function `resolvePersistentFunctionInternal` synchronized.
### Why are the changes needed?
To make function resolution thread-safe.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing UTs.
Closes #40557 from allisonwang-db/SPARK-42928-sync-func.
Authored-by: allisonwang-db <al...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit f62ffde045771b3275acf3dfb24573804e7daf93)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/catalyst/catalog/SessionCatalog.scala | 56 ++++++++++++----------
1 file changed, 31 insertions(+), 25 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 4f7196e2dc9..cd4b4cfaf6b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1720,31 +1720,37 @@ class SessionCatalog(
arguments: Seq[Expression],
registry: FunctionRegistryBase[T],
createFunctionBuilder: CatalogFunction => FunctionRegistryBase[T]#FunctionBuilder): T = {
- val qualifiedIdent = qualifyIdentifier(name)
- val db = qualifiedIdent.database.get
- val funcName = qualifiedIdent.funcName
- if (registry.functionExists(qualifiedIdent)) {
- // This function has been already loaded into the function registry.
- registry.lookupFunction(qualifiedIdent, arguments)
- } else {
- // The function has not been loaded to the function registry, which means
- // that the function is a persistent function (if it actually has been registered
- // in the metastore). We need to first put the function in the function registry.
- val catalogFunction = externalCatalog.getFunction(db, funcName)
- loadFunctionResources(catalogFunction.resources)
- // Please note that qualifiedName is provided by the user. However,
- // catalogFunction.identifier.unquotedString is returned by the underlying
- // catalog. So, it is possible that qualifiedName is not exactly the same as
- // catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
- // At here, we preserve the input from the user.
- val funcMetadata = catalogFunction.copy(identifier = qualifiedIdent)
- registerFunction(
- funcMetadata,
- overrideIfExists = false,
- registry = registry,
- functionBuilder = createFunctionBuilder(funcMetadata))
- // Now, we need to create the Expression.
- registry.lookupFunction(qualifiedIdent, arguments)
+ // `synchronized` is used to prevent multiple threads from concurrently resolving the
+ // same function that has not yet been loaded into the function registry. This is needed
+ // because calling `registerFunction` twice with `overrideIfExists = false` can lead to
+ // a FunctionAlreadyExistsException.
+ synchronized {
+ val qualifiedIdent = qualifyIdentifier(name)
+ val db = qualifiedIdent.database.get
+ val funcName = qualifiedIdent.funcName
+ if (registry.functionExists(qualifiedIdent)) {
+ // This function has been already loaded into the function registry.
+ registry.lookupFunction(qualifiedIdent, arguments)
+ } else {
+ // The function has not been loaded to the function registry, which means
+ // that the function is a persistent function (if it actually has been registered
+ // in the metastore). We need to first put the function in the function registry.
+ val catalogFunction = externalCatalog.getFunction(db, funcName)
+ loadFunctionResources(catalogFunction.resources)
+ // Please note that qualifiedName is provided by the user. However,
+ // catalogFunction.identifier.unquotedString is returned by the underlying
+ // catalog. So, it is possible that qualifiedName is not exactly the same as
+ // catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
+ // At here, we preserve the input from the user.
+ val funcMetadata = catalogFunction.copy(identifier = qualifiedIdent)
+ registerFunction(
+ funcMetadata,
+ overrideIfExists = false,
+ registry = registry,
+ functionBuilder = createFunctionBuilder(funcMetadata))
+ // Now, we need to create the Expression.
+ registry.lookupFunction(qualifiedIdent, arguments)
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org