You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/22 17:06:22 UTC
[arrow] branch master updated: ARROW-9815 [Rust] [DataFusion] Fixed
deadlock caused by accessing the scalar functions' registry.
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new f98de24 ARROW-9815 [Rust] [DataFusion] Fixed deadlock caused by accessing the scalar functions' registry.
f98de24 is described below
commit f98de241cb04deab3b9f8281638f79b9b57a736a
Author: Jorge C. Leitao <jo...@gmail.com>
AuthorDate: Sat Aug 22 11:05:40 2020 -0600
ARROW-9815 [Rust] [DataFusion] Fixed deadlock caused by accessing the scalar functions' registry.
@andygrove and @alamb , I have no formal training in thread and mutex management, so I am not certain about this proposal or the following explanation:
My understanding is that because the result of
```
ctx_state
.lock()
.expect("failed to lock mutex")
.scalar_functions
.lock()
.expect("failed to lock mutex")
.get(name)
```
is of temporary lifetime, using this in `match` blocks any access to `scalar_functions` until we leave the match, which deadlocks when we recursively call the function. Here I just cloned `.scalar_functions` so that we allow the lock to be released.
I may also be dead wrong on every word that I wrote above.
This does work, but if you could validate my reasoning above, I would appreciate very much!
Note that we are also doing the same for `.datasources` in this file, which I suspect will also deadlock if when we have a plan with two sources. I did not touch that as I do not know the idiom/pattern to address this (locking within recursions).
An alternative solution for this is to not make `PhysicalPlanner::create_physical_plan` recursive, and instead call a recursive function (with all the current logic of `create_physical_plan`) with references to `datasources` and `scalar_functions`, so that they can be used recursively (and we do not have to lock on every recursion.
Closes #8018 from jorgecarleitao/fix_deadlock
Authored-by: Jorge C. Leitao <jo...@gmail.com>
Signed-off-by: Andy Grove <an...@gmail.com>
---
rust/datafusion/src/execution/context.rs | 68 +++++--------
rust/datafusion/src/execution/physical_plan/mod.rs | 2 +-
.../src/execution/physical_plan/planner.rs | 111 ++++++---------------
rust/datafusion/src/optimizer/type_coercion.rs | 25 ++---
rust/datafusion/src/sql/planner.rs | 10 +-
rust/datafusion/tests/sql.rs | 13 +++
6 files changed, 85 insertions(+), 144 deletions(-)
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index dac3215..6a0b150 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -106,8 +106,8 @@ impl ExecutionContext {
pub fn with_config(config: ExecutionConfig) -> Self {
let mut ctx = Self {
state: Arc::new(Mutex::new(ExecutionContextState {
- datasources: Arc::new(Mutex::new(HashMap::new())),
- scalar_functions: Arc::new(Mutex::new(HashMap::new())),
+ datasources: Box::new(HashMap::new()),
+ scalar_functions: Box::new(HashMap::new()),
config,
})),
};
@@ -191,22 +191,18 @@ impl ExecutionContext {
// create a query planner
let state = self.state.lock().expect("failed to lock mutex");
- let query_planner = SqlToRel::new(state.clone());
+ let query_planner = SqlToRel::new(&*state);
Ok(query_planner.statement_to_plan(&statements[0])?)
}
/// Register a scalar UDF
pub fn register_udf(&mut self, f: ScalarFunction) {
- let state = self.state.lock().expect("failed to lock mutex");
- state
- .scalar_functions
- .lock()
- .expect("failed to lock mutex")
- .insert(f.name.clone(), Box::new(f));
+ let mut state = self.state.lock().expect("failed to lock mutex");
+ state.scalar_functions.insert(f.name.clone(), Box::new(f));
}
/// Get a reference to the registered scalar functions
- pub fn scalar_functions(&self) -> Arc<Mutex<HashMap<String, Box<ScalarFunction>>>> {
+ pub fn scalar_functions(&self) -> Box<HashMap<String, Box<ScalarFunction>>> {
self.state
.lock()
.expect("failed to lock mutex")
@@ -281,12 +277,8 @@ impl ExecutionContext {
name: &str,
provider: Box<dyn TableProvider + Send + Sync>,
) {
- let state = self.state.lock().expect("failed to lock mutex");
- state
- .datasources
- .lock()
- .expect("failed to lock mutex")
- .insert(name.to_string(), provider);
+ let mut ctx_state = self.state.lock().expect("failed to lock mutex");
+ ctx_state.datasources.insert(name.to_string(), provider);
}
/// Retrieves a DataFrame representing a table previously registered by calling the
@@ -298,8 +290,6 @@ impl ExecutionContext {
.lock()
.expect("failed to lock mutex")
.datasources
- .lock()
- .expect("failed to lock mutex")
.get(table_name)
{
Some(provider) => {
@@ -329,8 +319,6 @@ impl ExecutionContext {
.lock()
.expect("failed to lock mutex")
.datasources
- .lock()
- .expect("failed to lock mutex")
.keys()
.cloned()
.collect()
@@ -346,7 +334,7 @@ impl ExecutionContext {
.lock()
.expect("failed to lock mutex")
.scalar_functions
- .clone(),
+ .as_ref(),
)),
];
let mut plan = plan.clone();
@@ -366,7 +354,8 @@ impl ExecutionContext {
Some(planner) => planner,
None => Arc::new(DefaultPhysicalPlanner::default()),
};
- planner.create_physical_plan(logical_plan, self.state.clone())
+ let ctx_state = self.state.lock().expect("Failed to aquire lock");
+ planner.create_physical_plan(logical_plan, &ctx_state)
}
/// Execute a physical plan and collect the results in memory
@@ -495,38 +484,29 @@ impl ExecutionConfig {
}
/// Execution context for registering data sources and executing queries
-#[derive(Clone)]
pub struct ExecutionContextState {
/// Data sources that are registered with the context
- pub datasources: Arc<Mutex<HashMap<String, Box<dyn TableProvider + Send + Sync>>>>,
+ pub datasources: Box<HashMap<String, Box<dyn TableProvider + Send + Sync>>>,
/// Scalar functions that are registered with the context
- pub scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>,
+ pub scalar_functions: Box<HashMap<String, Box<ScalarFunction>>>,
/// Context configuration
pub config: ExecutionConfig,
}
impl SchemaProvider for ExecutionContextState {
fn get_table_meta(&self, name: &str) -> Option<SchemaRef> {
- self.datasources
- .lock()
- .expect("failed to lock mutex")
- .get(name)
- .map(|ds| ds.schema().clone())
+ self.datasources.get(name).map(|ds| ds.schema().clone())
}
fn get_function_meta(&self, name: &str) -> Option<Arc<FunctionMeta>> {
- self.scalar_functions
- .lock()
- .expect("failed to lock mutex")
- .get(name)
- .map(|f| {
- Arc::new(FunctionMeta::new(
- name.to_owned(),
- f.args.clone(),
- f.return_type.clone(),
- FunctionType::Scalar,
- ))
- })
+ self.scalar_functions.get(name).map(|f| {
+ Arc::new(FunctionMeta::new(
+ name.to_owned(),
+ f.args.clone(),
+ f.return_type.clone(),
+ FunctionType::Scalar,
+ ))
+ })
}
}
@@ -641,8 +621,6 @@ mod tests {
.lock()
.expect("failed to lock mutex")
.datasources
- .lock()
- .expect("failed to lock mutex")
.get("test")
.unwrap()
.schema();
@@ -1127,7 +1105,7 @@ mod tests {
fn create_physical_plan(
&self,
_logical_plan: &LogicalPlan,
- _ctx_state: Arc<Mutex<ExecutionContextState>>,
+ _ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
Err(ExecutionError::NotImplemented(
"query not supported".to_string(),
diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs
index 191d256..817b6ab 100644
--- a/rust/datafusion/src/execution/physical_plan/mod.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -40,7 +40,7 @@ pub trait PhysicalPlanner {
fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
- ctx_state: Arc<Mutex<ExecutionContextState>>,
+ ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>>;
}
diff --git a/rust/datafusion/src/execution/physical_plan/planner.rs b/rust/datafusion/src/execution/physical_plan/planner.rs
index ddb0789..2e83cd0 100644
--- a/rust/datafusion/src/execution/physical_plan/planner.rs
+++ b/rust/datafusion/src/execution/physical_plan/planner.rs
@@ -17,7 +17,7 @@
//! Physical query planner
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use crate::error::{ExecutionError, Result};
use crate::execution::context::ExecutionContextState;
@@ -59,27 +59,16 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
- ctx_state: Arc<Mutex<ExecutionContextState>>,
+ ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
- let batch_size = ctx_state
- .lock()
- .expect("failed to lock mutex")
- .config
- .batch_size;
+ let batch_size = ctx_state.config.batch_size;
match logical_plan {
LogicalPlan::TableScan {
table_name,
projection,
..
- } => match ctx_state
- .lock()
- .expect("failed to lock mutex")
- .datasources
- .lock()
- .expect("failed to lock mutex")
- .get(table_name)
- {
+ } => match ctx_state.datasources.get(table_name) {
Some(provider) => {
let partitions = provider.scan(projection, batch_size)?;
if partitions.is_empty() {
@@ -139,17 +128,13 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
batch_size,
)?)),
LogicalPlan::Projection { input, expr, .. } => {
- let input = self.create_physical_plan(input, ctx_state.clone())?;
+ let input = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.as_ref().schema().clone();
let runtime_expr = expr
.iter()
.map(|e| {
tuple_err((
- self.create_physical_expr(
- e,
- &input_schema,
- ctx_state.clone(),
- ),
+ self.create_physical_expr(e, &input_schema, &ctx_state),
e.name(&input_schema),
))
})
@@ -163,18 +148,14 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
..
} => {
// Initially need to perform the aggregate and then merge the partitions
- let input = self.create_physical_plan(input, ctx_state.clone())?;
+ let input = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.as_ref().schema().clone();
let groups = group_expr
.iter()
.map(|e| {
tuple_err((
- self.create_physical_expr(
- e,
- &input_schema,
- ctx_state.clone(),
- ),
+ self.create_physical_expr(e, &input_schema, ctx_state),
e.name(&input_schema),
))
})
@@ -183,11 +164,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
.iter()
.map(|e| {
tuple_err((
- self.create_aggregate_expr(
- e,
- &input_schema,
- ctx_state.clone(),
- ),
+ self.create_aggregate_expr(e, &input_schema, ctx_state),
e.name(&input_schema),
))
})
@@ -209,11 +186,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
let merge = Arc::new(MergeExec::new(
schema.clone(),
partitions,
- ctx_state
- .lock()
- .expect("failed to lock mutex")
- .config
- .concurrency,
+ ctx_state.config.concurrency,
));
// construct the expressions for the final aggregation
@@ -241,17 +214,14 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
LogicalPlan::Filter {
input, predicate, ..
} => {
- let input = self.create_physical_plan(input, ctx_state.clone())?;
+ let input = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.as_ref().schema().clone();
- let runtime_expr = self.create_physical_expr(
- predicate,
- &input_schema,
- ctx_state.clone(),
- )?;
+ let runtime_expr =
+ self.create_physical_expr(predicate, &input_schema, ctx_state)?;
Ok(Arc::new(FilterExec::try_new(runtime_expr, input)?))
}
LogicalPlan::Sort { expr, input, .. } => {
- let input = self.create_physical_plan(input, ctx_state.clone())?;
+ let input = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.as_ref().schema().clone();
let sort_expr = expr
@@ -268,7 +238,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
descending: !*asc,
nulls_first: *nulls_first,
},
- ctx_state.clone(),
+ ctx_state,
),
_ => Err(ExecutionError::ExecutionError(
"Sort only accepts sort expressions".to_string(),
@@ -279,26 +249,18 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
Ok(Arc::new(SortExec::try_new(
sort_expr,
input,
- ctx_state
- .lock()
- .expect("failed to lock mutex")
- .config
- .concurrency,
+ ctx_state.config.concurrency,
)?))
}
LogicalPlan::Limit { input, n, .. } => {
- let input = self.create_physical_plan(input, ctx_state.clone())?;
+ let input = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.as_ref().schema().clone();
Ok(Arc::new(GlobalLimitExec::new(
input_schema.clone(),
input.partitions()?,
*n,
- ctx_state
- .lock()
- .expect("failed to lock mutex")
- .config
- .concurrency,
+ ctx_state.config.concurrency,
)))
}
LogicalPlan::Explain {
@@ -307,7 +269,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
stringified_plans,
schema,
} => {
- let input = self.create_physical_plan(plan, ctx_state.clone())?;
+ let input = self.create_physical_plan(plan, ctx_state)?;
let mut stringified_plans = stringified_plans
.iter()
@@ -338,11 +300,11 @@ impl DefaultPhysicalPlanner {
&self,
e: &Expr,
input_schema: &Schema,
- ctx_state: Arc<Mutex<ExecutionContextState>>,
+ ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn PhysicalExpr>> {
match e {
Expr::Alias(expr, ..) => {
- Ok(self.create_physical_expr(expr, input_schema, ctx_state.clone())?)
+ Ok(self.create_physical_expr(expr, input_schema, ctx_state)?)
}
Expr::Column(name) => {
// check that name exists
@@ -351,12 +313,12 @@ impl DefaultPhysicalPlanner {
}
Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
Expr::BinaryExpr { left, op, right } => Ok(Arc::new(BinaryExpr::new(
- self.create_physical_expr(left, input_schema, ctx_state.clone())?,
+ self.create_physical_expr(left, input_schema, ctx_state)?,
op.clone(),
- self.create_physical_expr(right, input_schema, ctx_state.clone())?,
+ self.create_physical_expr(right, input_schema, ctx_state)?,
))),
Expr::Cast { expr, data_type } => Ok(Arc::new(CastExpr::try_new(
- self.create_physical_expr(expr, input_schema, ctx_state.clone())?,
+ self.create_physical_expr(expr, input_schema, ctx_state)?,
input_schema,
data_type.clone(),
)?)),
@@ -364,14 +326,7 @@ impl DefaultPhysicalPlanner {
name,
args,
return_type,
- } => match ctx_state
- .lock()
- .expect("failed to lock mutex")
- .scalar_functions
- .lock()
- .expect("failed to lock mutex")
- .get(name)
- {
+ } => match ctx_state.scalar_functions.get(name) {
Some(f) => {
let mut physical_args = vec![];
for e in args {
@@ -405,7 +360,7 @@ impl DefaultPhysicalPlanner {
&self,
e: &Expr,
input_schema: &Schema,
- ctx_state: Arc<Mutex<ExecutionContextState>>,
+ ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn AggregateExpr>> {
match e {
Expr::AggregateFunction { name, args, .. } => {
@@ -413,27 +368,27 @@ impl DefaultPhysicalPlanner {
"sum" => Ok(Arc::new(Sum::new(self.create_physical_expr(
&args[0],
input_schema,
- ctx_state.clone(),
+ ctx_state,
)?))),
"avg" => Ok(Arc::new(Avg::new(self.create_physical_expr(
&args[0],
input_schema,
- ctx_state.clone(),
+ ctx_state,
)?))),
"max" => Ok(Arc::new(Max::new(self.create_physical_expr(
&args[0],
input_schema,
- ctx_state.clone(),
+ ctx_state,
)?))),
"min" => Ok(Arc::new(Min::new(self.create_physical_expr(
&args[0],
input_schema,
- ctx_state.clone(),
+ ctx_state,
)?))),
"count" => Ok(Arc::new(Count::new(self.create_physical_expr(
&args[0],
input_schema,
- ctx_state.clone(),
+ ctx_state,
)?))),
other => Err(ExecutionError::NotImplemented(format!(
"Unsupported aggregate function '{}'",
@@ -454,10 +409,10 @@ impl DefaultPhysicalPlanner {
e: &Expr,
input_schema: &Schema,
options: SortOptions,
- ctx_state: Arc<Mutex<ExecutionContextState>>,
+ ctx_state: &ExecutionContextState,
) -> Result<PhysicalSortExpr> {
Ok(PhysicalSortExpr {
- expr: self.create_physical_expr(e, input_schema, ctx_state.clone())?,
+ expr: self.create_physical_expr(e, input_schema, ctx_state)?,
options: options,
})
}
diff --git a/rust/datafusion/src/optimizer/type_coercion.rs b/rust/datafusion/src/optimizer/type_coercion.rs
index df4818c..22f3ef0 100644
--- a/rust/datafusion/src/optimizer/type_coercion.rs
+++ b/rust/datafusion/src/optimizer/type_coercion.rs
@@ -21,7 +21,7 @@
//! float)`. This keeps the runtime query execution code much simpler.
use std::collections::HashMap;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use arrow::datatypes::Schema;
@@ -37,16 +37,16 @@ use utils::optimize_explain;
///
/// This optimizer does not alter the structure of the plan, it only changes expressions on it.
pub struct TypeCoercionRule {
- scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>,
+ scalar_functions: Arc<HashMap<String, Box<ScalarFunction>>>,
}
impl TypeCoercionRule {
/// Create a new type coercion optimizer rule using meta-data about registered
/// scalar functions
- pub fn new(
- scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>,
- ) -> Self {
- Self { scalar_functions }
+ pub fn new(scalar_functions: &HashMap<String, Box<ScalarFunction>>) -> Self {
+ Self {
+ scalar_functions: Arc::new(scalar_functions.clone()),
+ }
}
/// Rewrite an expression to include explicit CAST operations when required
@@ -73,12 +73,7 @@ impl TypeCoercionRule {
}
Expr::ScalarFunction { name, .. } => {
// cast the inputs of scalar functions to the appropriate type where possible
- match self
- .scalar_functions
- .lock()
- .expect("failed to lock mutex")
- .get(name)
- {
+ match self.scalar_functions.get(name) {
Some(func_meta) => {
for i in 0..expressions.len() {
let field = &func_meta.args[i];
@@ -172,7 +167,7 @@ mod tests {
.build()?;
let scalar_functions = HashMap::new();
- let mut rule = TypeCoercionRule::new(Arc::new(Mutex::new(scalar_functions)));
+ let mut rule = TypeCoercionRule::new(&scalar_functions);
let plan = rule.optimize(&plan)?;
// check that the filter had a cast added
@@ -199,7 +194,7 @@ mod tests {
.build()?;
let scalar_functions = HashMap::new();
- let mut rule = TypeCoercionRule::new(Arc::new(Mutex::new(scalar_functions)));
+ let mut rule = TypeCoercionRule::new(&scalar_functions);
let plan = rule.optimize(&plan)?;
assert!(format!("{:?}", plan).starts_with("Filter: CAST(#c7 AS Float64) Lt #c12"));
@@ -276,7 +271,7 @@ mod tests {
};
let ctx = ExecutionContext::new();
- let rule = TypeCoercionRule::new(ctx.scalar_functions());
+ let rule = TypeCoercionRule::new(ctx.scalar_functions().as_ref());
let expr2 = rule.rewrite_expr(&expr, &schema).unwrap();
diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs
index c66d238..1735710 100644
--- a/rust/datafusion/src/sql/planner.rs
+++ b/rust/datafusion/src/sql/planner.rs
@@ -47,13 +47,13 @@ pub trait SchemaProvider {
}
/// SQL query planner
-pub struct SqlToRel<S: SchemaProvider> {
- schema_provider: S,
+pub struct SqlToRel<'a, S: SchemaProvider> {
+ schema_provider: &'a S,
}
-impl<S: SchemaProvider> SqlToRel<S> {
+impl<'a, S: SchemaProvider> SqlToRel<'a, S> {
/// Create a new query planner
- pub fn new(schema_provider: S) -> Self {
+ pub fn new(schema_provider: &'a S) -> Self {
SqlToRel { schema_provider }
}
@@ -854,7 +854,7 @@ mod tests {
}
fn logical_plan(sql: &str) -> Result<LogicalPlan> {
- let planner = SqlToRel::new(MockSchemaProvider {});
+ let planner = SqlToRel::new(&MockSchemaProvider {});
let ast = DFParser::parse_sql(&sql).unwrap();
planner.statement_to_plan(&ast[0])
}
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 4ae90cd..942a781 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -201,6 +201,19 @@ fn csv_query_avg_sqrt() -> Result<()> {
Ok(())
}
+// this query used to deadlock due to the call udf(udf())
+#[test]
+fn csv_query_sqrt_sqrt() -> Result<()> {
+ let mut ctx = create_ctx()?;
+ register_aggregate_csv(&mut ctx)?;
+ let sql = "SELECT sqrt(sqrt(c12)) FROM aggregate_test_100 LIMIT 1";
+ let actual = execute(&mut ctx, sql);
+ // sqrt(sqrt(c12=0.9294097332465232)) = 0.9818650561397431
+ let expected = "0.9818650561397431".to_string();
+ assert_eq!(actual.join("\n"), expected);
+ Ok(())
+}
+
fn create_ctx() -> Result<ExecutionContext> {
let mut ctx = ExecutionContext::new();