You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/22 17:06:22 UTC

[arrow] branch master updated: ARROW-9815 [Rust] [DataFusion] Fixed deadlock caused by accessing the scalar functions' registry.

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new f98de24  ARROW-9815 [Rust] [DataFusion] Fixed deadlock caused by accessing the scalar functions' registry.
f98de24 is described below

commit f98de241cb04deab3b9f8281638f79b9b57a736a
Author: Jorge C. Leitao <jo...@gmail.com>
AuthorDate: Sat Aug 22 11:05:40 2020 -0600

    ARROW-9815 [Rust] [DataFusion] Fixed deadlock caused by accessing the scalar functions' registry.
    
    @andygrove and @alamb , I have no formal training in thread and mutex management, so I am not certain about this proposal or the following explanation:
    
    My understanding is that because the result of
    
    ```
    ctx_state
                    .lock()
                    .expect("failed to lock mutex")
                    .scalar_functions
                    .lock()
                    .expect("failed to lock mutex")
                    .get(name)
    ```
    
    is of temporary lifetime, using this in `match` blocks any access to `scalar_functions` until we leave the match, which deadlocks when we recursively call the function. Here I just cloned `.scalar_functions` so that we allow the lock to be released.
    
    I may also be dead wrong on every word that I wrote above.
    
    This does work, but if you could validate my reasoning above, I would appreciate very much!
    
    Note that we are also doing the same for `.datasources` in this file, which I suspect will also deadlock if when we have a plan with two sources. I did not touch that as I do not know the idiom/pattern to address this (locking within recursions).
    
    An alternative solution for this is to not make `PhysicalPlanner::create_physical_plan` recursive, and instead call a recursive function (with all the current logic of `create_physical_plan`) with references to `datasources` and `scalar_functions`, so that they can be used recursively (and we do not have to lock on every recursion.
    
    Closes #8018 from jorgecarleitao/fix_deadlock
    
    Authored-by: Jorge C. Leitao <jo...@gmail.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 rust/datafusion/src/execution/context.rs           |  68 +++++--------
 rust/datafusion/src/execution/physical_plan/mod.rs |   2 +-
 .../src/execution/physical_plan/planner.rs         | 111 ++++++---------------
 rust/datafusion/src/optimizer/type_coercion.rs     |  25 ++---
 rust/datafusion/src/sql/planner.rs                 |  10 +-
 rust/datafusion/tests/sql.rs                       |  13 +++
 6 files changed, 85 insertions(+), 144 deletions(-)

diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index dac3215..6a0b150 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -106,8 +106,8 @@ impl ExecutionContext {
     pub fn with_config(config: ExecutionConfig) -> Self {
         let mut ctx = Self {
             state: Arc::new(Mutex::new(ExecutionContextState {
-                datasources: Arc::new(Mutex::new(HashMap::new())),
-                scalar_functions: Arc::new(Mutex::new(HashMap::new())),
+                datasources: Box::new(HashMap::new()),
+                scalar_functions: Box::new(HashMap::new()),
                 config,
             })),
         };
@@ -191,22 +191,18 @@ impl ExecutionContext {
 
         // create a query planner
         let state = self.state.lock().expect("failed to lock mutex");
-        let query_planner = SqlToRel::new(state.clone());
+        let query_planner = SqlToRel::new(&*state);
         Ok(query_planner.statement_to_plan(&statements[0])?)
     }
 
     /// Register a scalar UDF
     pub fn register_udf(&mut self, f: ScalarFunction) {
-        let state = self.state.lock().expect("failed to lock mutex");
-        state
-            .scalar_functions
-            .lock()
-            .expect("failed to lock mutex")
-            .insert(f.name.clone(), Box::new(f));
+        let mut state = self.state.lock().expect("failed to lock mutex");
+        state.scalar_functions.insert(f.name.clone(), Box::new(f));
     }
 
     /// Get a reference to the registered scalar functions
-    pub fn scalar_functions(&self) -> Arc<Mutex<HashMap<String, Box<ScalarFunction>>>> {
+    pub fn scalar_functions(&self) -> Box<HashMap<String, Box<ScalarFunction>>> {
         self.state
             .lock()
             .expect("failed to lock mutex")
@@ -281,12 +277,8 @@ impl ExecutionContext {
         name: &str,
         provider: Box<dyn TableProvider + Send + Sync>,
     ) {
-        let state = self.state.lock().expect("failed to lock mutex");
-        state
-            .datasources
-            .lock()
-            .expect("failed to lock mutex")
-            .insert(name.to_string(), provider);
+        let mut ctx_state = self.state.lock().expect("failed to lock mutex");
+        ctx_state.datasources.insert(name.to_string(), provider);
     }
 
     /// Retrieves a DataFrame representing a table previously registered by calling the
@@ -298,8 +290,6 @@ impl ExecutionContext {
             .lock()
             .expect("failed to lock mutex")
             .datasources
-            .lock()
-            .expect("failed to lock mutex")
             .get(table_name)
         {
             Some(provider) => {
@@ -329,8 +319,6 @@ impl ExecutionContext {
             .lock()
             .expect("failed to lock mutex")
             .datasources
-            .lock()
-            .expect("failed to lock mutex")
             .keys()
             .cloned()
             .collect()
@@ -346,7 +334,7 @@ impl ExecutionContext {
                     .lock()
                     .expect("failed to lock mutex")
                     .scalar_functions
-                    .clone(),
+                    .as_ref(),
             )),
         ];
         let mut plan = plan.clone();
@@ -366,7 +354,8 @@ impl ExecutionContext {
             Some(planner) => planner,
             None => Arc::new(DefaultPhysicalPlanner::default()),
         };
-        planner.create_physical_plan(logical_plan, self.state.clone())
+        let ctx_state = self.state.lock().expect("Failed to aquire lock");
+        planner.create_physical_plan(logical_plan, &ctx_state)
     }
 
     /// Execute a physical plan and collect the results in memory
@@ -495,38 +484,29 @@ impl ExecutionConfig {
 }
 
 /// Execution context for registering data sources and executing queries
-#[derive(Clone)]
 pub struct ExecutionContextState {
     /// Data sources that are registered with the context
-    pub datasources: Arc<Mutex<HashMap<String, Box<dyn TableProvider + Send + Sync>>>>,
+    pub datasources: Box<HashMap<String, Box<dyn TableProvider + Send + Sync>>>,
     /// Scalar functions that are registered with the context
-    pub scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>,
+    pub scalar_functions: Box<HashMap<String, Box<ScalarFunction>>>,
     /// Context configuration
     pub config: ExecutionConfig,
 }
 
 impl SchemaProvider for ExecutionContextState {
     fn get_table_meta(&self, name: &str) -> Option<SchemaRef> {
-        self.datasources
-            .lock()
-            .expect("failed to lock mutex")
-            .get(name)
-            .map(|ds| ds.schema().clone())
+        self.datasources.get(name).map(|ds| ds.schema().clone())
     }
 
     fn get_function_meta(&self, name: &str) -> Option<Arc<FunctionMeta>> {
-        self.scalar_functions
-            .lock()
-            .expect("failed to lock mutex")
-            .get(name)
-            .map(|f| {
-                Arc::new(FunctionMeta::new(
-                    name.to_owned(),
-                    f.args.clone(),
-                    f.return_type.clone(),
-                    FunctionType::Scalar,
-                ))
-            })
+        self.scalar_functions.get(name).map(|f| {
+            Arc::new(FunctionMeta::new(
+                name.to_owned(),
+                f.args.clone(),
+                f.return_type.clone(),
+                FunctionType::Scalar,
+            ))
+        })
     }
 }
 
@@ -641,8 +621,6 @@ mod tests {
             .lock()
             .expect("failed to lock mutex")
             .datasources
-            .lock()
-            .expect("failed to lock mutex")
             .get("test")
             .unwrap()
             .schema();
@@ -1127,7 +1105,7 @@ mod tests {
         fn create_physical_plan(
             &self,
             _logical_plan: &LogicalPlan,
-            _ctx_state: Arc<Mutex<ExecutionContextState>>,
+            _ctx_state: &ExecutionContextState,
         ) -> Result<Arc<dyn ExecutionPlan>> {
             Err(ExecutionError::NotImplemented(
                 "query not supported".to_string(),
diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs
index 191d256..817b6ab 100644
--- a/rust/datafusion/src/execution/physical_plan/mod.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -40,7 +40,7 @@ pub trait PhysicalPlanner {
     fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
-        ctx_state: Arc<Mutex<ExecutionContextState>>,
+        ctx_state: &ExecutionContextState,
     ) -> Result<Arc<dyn ExecutionPlan>>;
 }
 
diff --git a/rust/datafusion/src/execution/physical_plan/planner.rs b/rust/datafusion/src/execution/physical_plan/planner.rs
index ddb0789..2e83cd0 100644
--- a/rust/datafusion/src/execution/physical_plan/planner.rs
+++ b/rust/datafusion/src/execution/physical_plan/planner.rs
@@ -17,7 +17,7 @@
 
 //! Physical query planner
 
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 
 use crate::error::{ExecutionError, Result};
 use crate::execution::context::ExecutionContextState;
@@ -59,27 +59,16 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
     fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
-        ctx_state: Arc<Mutex<ExecutionContextState>>,
+        ctx_state: &ExecutionContextState,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let batch_size = ctx_state
-            .lock()
-            .expect("failed to lock mutex")
-            .config
-            .batch_size;
+        let batch_size = ctx_state.config.batch_size;
 
         match logical_plan {
             LogicalPlan::TableScan {
                 table_name,
                 projection,
                 ..
-            } => match ctx_state
-                .lock()
-                .expect("failed to lock mutex")
-                .datasources
-                .lock()
-                .expect("failed to lock mutex")
-                .get(table_name)
-            {
+            } => match ctx_state.datasources.get(table_name) {
                 Some(provider) => {
                     let partitions = provider.scan(projection, batch_size)?;
                     if partitions.is_empty() {
@@ -139,17 +128,13 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
                 batch_size,
             )?)),
             LogicalPlan::Projection { input, expr, .. } => {
-                let input = self.create_physical_plan(input, ctx_state.clone())?;
+                let input = self.create_physical_plan(input, ctx_state)?;
                 let input_schema = input.as_ref().schema().clone();
                 let runtime_expr = expr
                     .iter()
                     .map(|e| {
                         tuple_err((
-                            self.create_physical_expr(
-                                e,
-                                &input_schema,
-                                ctx_state.clone(),
-                            ),
+                            self.create_physical_expr(e, &input_schema, &ctx_state),
                             e.name(&input_schema),
                         ))
                     })
@@ -163,18 +148,14 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
                 ..
             } => {
                 // Initially need to perform the aggregate and then merge the partitions
-                let input = self.create_physical_plan(input, ctx_state.clone())?;
+                let input = self.create_physical_plan(input, ctx_state)?;
                 let input_schema = input.as_ref().schema().clone();
 
                 let groups = group_expr
                     .iter()
                     .map(|e| {
                         tuple_err((
-                            self.create_physical_expr(
-                                e,
-                                &input_schema,
-                                ctx_state.clone(),
-                            ),
+                            self.create_physical_expr(e, &input_schema, ctx_state),
                             e.name(&input_schema),
                         ))
                     })
@@ -183,11 +164,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
                     .iter()
                     .map(|e| {
                         tuple_err((
-                            self.create_aggregate_expr(
-                                e,
-                                &input_schema,
-                                ctx_state.clone(),
-                            ),
+                            self.create_aggregate_expr(e, &input_schema, ctx_state),
                             e.name(&input_schema),
                         ))
                     })
@@ -209,11 +186,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
                 let merge = Arc::new(MergeExec::new(
                     schema.clone(),
                     partitions,
-                    ctx_state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
+                    ctx_state.config.concurrency,
                 ));
 
                 // construct the expressions for the final aggregation
@@ -241,17 +214,14 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
             LogicalPlan::Filter {
                 input, predicate, ..
             } => {
-                let input = self.create_physical_plan(input, ctx_state.clone())?;
+                let input = self.create_physical_plan(input, ctx_state)?;
                 let input_schema = input.as_ref().schema().clone();
-                let runtime_expr = self.create_physical_expr(
-                    predicate,
-                    &input_schema,
-                    ctx_state.clone(),
-                )?;
+                let runtime_expr =
+                    self.create_physical_expr(predicate, &input_schema, ctx_state)?;
                 Ok(Arc::new(FilterExec::try_new(runtime_expr, input)?))
             }
             LogicalPlan::Sort { expr, input, .. } => {
-                let input = self.create_physical_plan(input, ctx_state.clone())?;
+                let input = self.create_physical_plan(input, ctx_state)?;
                 let input_schema = input.as_ref().schema().clone();
 
                 let sort_expr = expr
@@ -268,7 +238,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
                                 descending: !*asc,
                                 nulls_first: *nulls_first,
                             },
-                            ctx_state.clone(),
+                            ctx_state,
                         ),
                         _ => Err(ExecutionError::ExecutionError(
                             "Sort only accepts sort expressions".to_string(),
@@ -279,26 +249,18 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
                 Ok(Arc::new(SortExec::try_new(
                     sort_expr,
                     input,
-                    ctx_state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
+                    ctx_state.config.concurrency,
                 )?))
             }
             LogicalPlan::Limit { input, n, .. } => {
-                let input = self.create_physical_plan(input, ctx_state.clone())?;
+                let input = self.create_physical_plan(input, ctx_state)?;
                 let input_schema = input.as_ref().schema().clone();
 
                 Ok(Arc::new(GlobalLimitExec::new(
                     input_schema.clone(),
                     input.partitions()?,
                     *n,
-                    ctx_state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
+                    ctx_state.config.concurrency,
                 )))
             }
             LogicalPlan::Explain {
@@ -307,7 +269,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
                 stringified_plans,
                 schema,
             } => {
-                let input = self.create_physical_plan(plan, ctx_state.clone())?;
+                let input = self.create_physical_plan(plan, ctx_state)?;
 
                 let mut stringified_plans = stringified_plans
                     .iter()
@@ -338,11 +300,11 @@ impl DefaultPhysicalPlanner {
         &self,
         e: &Expr,
         input_schema: &Schema,
-        ctx_state: Arc<Mutex<ExecutionContextState>>,
+        ctx_state: &ExecutionContextState,
     ) -> Result<Arc<dyn PhysicalExpr>> {
         match e {
             Expr::Alias(expr, ..) => {
-                Ok(self.create_physical_expr(expr, input_schema, ctx_state.clone())?)
+                Ok(self.create_physical_expr(expr, input_schema, ctx_state)?)
             }
             Expr::Column(name) => {
                 // check that name exists
@@ -351,12 +313,12 @@ impl DefaultPhysicalPlanner {
             }
             Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
             Expr::BinaryExpr { left, op, right } => Ok(Arc::new(BinaryExpr::new(
-                self.create_physical_expr(left, input_schema, ctx_state.clone())?,
+                self.create_physical_expr(left, input_schema, ctx_state)?,
                 op.clone(),
-                self.create_physical_expr(right, input_schema, ctx_state.clone())?,
+                self.create_physical_expr(right, input_schema, ctx_state)?,
             ))),
             Expr::Cast { expr, data_type } => Ok(Arc::new(CastExpr::try_new(
-                self.create_physical_expr(expr, input_schema, ctx_state.clone())?,
+                self.create_physical_expr(expr, input_schema, ctx_state)?,
                 input_schema,
                 data_type.clone(),
             )?)),
@@ -364,14 +326,7 @@ impl DefaultPhysicalPlanner {
                 name,
                 args,
                 return_type,
-            } => match ctx_state
-                .lock()
-                .expect("failed to lock mutex")
-                .scalar_functions
-                .lock()
-                .expect("failed to lock mutex")
-                .get(name)
-            {
+            } => match ctx_state.scalar_functions.get(name) {
                 Some(f) => {
                     let mut physical_args = vec![];
                     for e in args {
@@ -405,7 +360,7 @@ impl DefaultPhysicalPlanner {
         &self,
         e: &Expr,
         input_schema: &Schema,
-        ctx_state: Arc<Mutex<ExecutionContextState>>,
+        ctx_state: &ExecutionContextState,
     ) -> Result<Arc<dyn AggregateExpr>> {
         match e {
             Expr::AggregateFunction { name, args, .. } => {
@@ -413,27 +368,27 @@ impl DefaultPhysicalPlanner {
                     "sum" => Ok(Arc::new(Sum::new(self.create_physical_expr(
                         &args[0],
                         input_schema,
-                        ctx_state.clone(),
+                        ctx_state,
                     )?))),
                     "avg" => Ok(Arc::new(Avg::new(self.create_physical_expr(
                         &args[0],
                         input_schema,
-                        ctx_state.clone(),
+                        ctx_state,
                     )?))),
                     "max" => Ok(Arc::new(Max::new(self.create_physical_expr(
                         &args[0],
                         input_schema,
-                        ctx_state.clone(),
+                        ctx_state,
                     )?))),
                     "min" => Ok(Arc::new(Min::new(self.create_physical_expr(
                         &args[0],
                         input_schema,
-                        ctx_state.clone(),
+                        ctx_state,
                     )?))),
                     "count" => Ok(Arc::new(Count::new(self.create_physical_expr(
                         &args[0],
                         input_schema,
-                        ctx_state.clone(),
+                        ctx_state,
                     )?))),
                     other => Err(ExecutionError::NotImplemented(format!(
                         "Unsupported aggregate function '{}'",
@@ -454,10 +409,10 @@ impl DefaultPhysicalPlanner {
         e: &Expr,
         input_schema: &Schema,
         options: SortOptions,
-        ctx_state: Arc<Mutex<ExecutionContextState>>,
+        ctx_state: &ExecutionContextState,
     ) -> Result<PhysicalSortExpr> {
         Ok(PhysicalSortExpr {
-            expr: self.create_physical_expr(e, input_schema, ctx_state.clone())?,
+            expr: self.create_physical_expr(e, input_schema, ctx_state)?,
             options: options,
         })
     }
diff --git a/rust/datafusion/src/optimizer/type_coercion.rs b/rust/datafusion/src/optimizer/type_coercion.rs
index df4818c..22f3ef0 100644
--- a/rust/datafusion/src/optimizer/type_coercion.rs
+++ b/rust/datafusion/src/optimizer/type_coercion.rs
@@ -21,7 +21,7 @@
 //! float)`. This keeps the runtime query execution code much simpler.
 
 use std::collections::HashMap;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 
 use arrow::datatypes::Schema;
 
@@ -37,16 +37,16 @@ use utils::optimize_explain;
 ///
 /// This optimizer does not alter the structure of the plan, it only changes expressions on it.
 pub struct TypeCoercionRule {
-    scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>,
+    scalar_functions: Arc<HashMap<String, Box<ScalarFunction>>>,
 }
 
 impl TypeCoercionRule {
     /// Create a new type coercion optimizer rule using meta-data about registered
     /// scalar functions
-    pub fn new(
-        scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>,
-    ) -> Self {
-        Self { scalar_functions }
+    pub fn new(scalar_functions: &HashMap<String, Box<ScalarFunction>>) -> Self {
+        Self {
+            scalar_functions: Arc::new(scalar_functions.clone()),
+        }
     }
 
     /// Rewrite an expression to include explicit CAST operations when required
@@ -73,12 +73,7 @@ impl TypeCoercionRule {
             }
             Expr::ScalarFunction { name, .. } => {
                 // cast the inputs of scalar functions to the appropriate type where possible
-                match self
-                    .scalar_functions
-                    .lock()
-                    .expect("failed to lock mutex")
-                    .get(name)
-                {
+                match self.scalar_functions.get(name) {
                     Some(func_meta) => {
                         for i in 0..expressions.len() {
                             let field = &func_meta.args[i];
@@ -172,7 +167,7 @@ mod tests {
             .build()?;
 
         let scalar_functions = HashMap::new();
-        let mut rule = TypeCoercionRule::new(Arc::new(Mutex::new(scalar_functions)));
+        let mut rule = TypeCoercionRule::new(&scalar_functions);
         let plan = rule.optimize(&plan)?;
 
         // check that the filter had a cast added
@@ -199,7 +194,7 @@ mod tests {
             .build()?;
 
         let scalar_functions = HashMap::new();
-        let mut rule = TypeCoercionRule::new(Arc::new(Mutex::new(scalar_functions)));
+        let mut rule = TypeCoercionRule::new(&scalar_functions);
         let plan = rule.optimize(&plan)?;
 
         assert!(format!("{:?}", plan).starts_with("Filter: CAST(#c7 AS Float64) Lt #c12"));
@@ -276,7 +271,7 @@ mod tests {
         };
 
         let ctx = ExecutionContext::new();
-        let rule = TypeCoercionRule::new(ctx.scalar_functions());
+        let rule = TypeCoercionRule::new(ctx.scalar_functions().as_ref());
 
         let expr2 = rule.rewrite_expr(&expr, &schema).unwrap();
 
diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs
index c66d238..1735710 100644
--- a/rust/datafusion/src/sql/planner.rs
+++ b/rust/datafusion/src/sql/planner.rs
@@ -47,13 +47,13 @@ pub trait SchemaProvider {
 }
 
 /// SQL query planner
-pub struct SqlToRel<S: SchemaProvider> {
-    schema_provider: S,
+pub struct SqlToRel<'a, S: SchemaProvider> {
+    schema_provider: &'a S,
 }
 
-impl<S: SchemaProvider> SqlToRel<S> {
+impl<'a, S: SchemaProvider> SqlToRel<'a, S> {
     /// Create a new query planner
-    pub fn new(schema_provider: S) -> Self {
+    pub fn new(schema_provider: &'a S) -> Self {
         SqlToRel { schema_provider }
     }
 
@@ -854,7 +854,7 @@ mod tests {
     }
 
     fn logical_plan(sql: &str) -> Result<LogicalPlan> {
-        let planner = SqlToRel::new(MockSchemaProvider {});
+        let planner = SqlToRel::new(&MockSchemaProvider {});
         let ast = DFParser::parse_sql(&sql).unwrap();
         planner.statement_to_plan(&ast[0])
     }
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 4ae90cd..942a781 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -201,6 +201,19 @@ fn csv_query_avg_sqrt() -> Result<()> {
     Ok(())
 }
 
+// this query used to deadlock due to the call udf(udf())
+#[test]
+fn csv_query_sqrt_sqrt() -> Result<()> {
+    let mut ctx = create_ctx()?;
+    register_aggregate_csv(&mut ctx)?;
+    let sql = "SELECT sqrt(sqrt(c12)) FROM aggregate_test_100 LIMIT 1";
+    let actual = execute(&mut ctx, sql);
+    // sqrt(sqrt(c12=0.9294097332465232)) = 0.9818650561397431
+    let expected = "0.9818650561397431".to_string();
+    assert_eq!(actual.join("\n"), expected);
+    Ok(())
+}
+
 fn create_ctx() -> Result<ExecutionContext> {
     let mut ctx = ExecutionContext::new();