You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2021/06/28 14:25:20 UTC
[arrow-datafusion] branch master updated: Remove unnecessary mutex
(#639)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 407de2a Remove unnecessary mutex (#639)
407de2a is described below
commit 407de2a60550d1fcb36fe6da2e77dde6ddb3621c
Author: Ximo Guanter <xi...@gmail.com>
AuthorDate: Mon Jun 28 16:25:15 2021 +0200
Remove unnecessary mutex (#639)
---
ballista/rust/core/src/execution_plans/query_stage.rs | 11 ++++-------
1 file changed, 4 insertions(+), 7 deletions(-)
diff --git a/ballista/rust/core/src/execution_plans/query_stage.rs b/ballista/rust/core/src/execution_plans/query_stage.rs
index c117110..1e91540 100644
--- a/ballista/rust/core/src/execution_plans/query_stage.rs
+++ b/ballista/rust/core/src/execution_plans/query_stage.rs
@@ -184,7 +184,7 @@ impl ExecutionPlan for QueryStageExec {
// we won't necessary produce output for every possible partition, so we
// create writers on demand
- let mut writers: Vec<Option<Arc<Mutex<ShuffleWriter>>>> = vec![];
+ let mut writers: Vec<Option<ShuffleWriter>> = vec![];
for _ in 0..num_output_partitions {
writers.push(None);
}
@@ -229,9 +229,8 @@ impl ExecutionPlan for QueryStageExec {
RecordBatch::try_new(input_batch.schema(), columns)?;
// write batch out
- match &writers[num_output_partition] {
+ match &mut writers[num_output_partition] {
Some(w) => {
- let mut w = w.lock().unwrap();
w.write(&output_batch)?;
}
None => {
@@ -247,8 +246,7 @@ impl ExecutionPlan for QueryStageExec {
ShuffleWriter::new(path, stream.schema().as_ref())?;
writer.write(&output_batch)?;
- writers[num_output_partition] =
- Some(Arc::new(Mutex::new(writer)));
+ writers[num_output_partition] = Some(writer);
}
}
}
@@ -262,10 +260,9 @@ impl ExecutionPlan for QueryStageExec {
let mut num_batches_builder = UInt64Builder::new(num_writers);
let mut num_bytes_builder = UInt64Builder::new(num_writers);
- for (i, w) in writers.iter().enumerate() {
+ for (i, w) in writers.iter_mut().enumerate() {
match w {
Some(w) => {
- let mut w = w.lock().unwrap();
w.finish()?;
path_builder.append_value(w.path())?;
partition_builder.append_value(i as u32)?;