You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2021/06/28 14:25:20 UTC

[arrow-datafusion] branch master updated: Remove unnecessary mutex (#639)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 407de2a  Remove unnecessary mutex (#639)
407de2a is described below

commit 407de2a60550d1fcb36fe6da2e77dde6ddb3621c
Author: Ximo Guanter <xi...@gmail.com>
AuthorDate: Mon Jun 28 16:25:15 2021 +0200

    Remove unnecessary mutex (#639)
---
 ballista/rust/core/src/execution_plans/query_stage.rs | 11 ++++-------
 1 file changed, 4 insertions(+), 7 deletions(-)

diff --git a/ballista/rust/core/src/execution_plans/query_stage.rs b/ballista/rust/core/src/execution_plans/query_stage.rs
index c117110..1e91540 100644
--- a/ballista/rust/core/src/execution_plans/query_stage.rs
+++ b/ballista/rust/core/src/execution_plans/query_stage.rs
@@ -184,7 +184,7 @@ impl ExecutionPlan for QueryStageExec {
 
                 // we won't necessary produce output for every possible partition, so we
                 // create writers on demand
-                let mut writers: Vec<Option<Arc<Mutex<ShuffleWriter>>>> = vec![];
+                let mut writers: Vec<Option<ShuffleWriter>> = vec![];
                 for _ in 0..num_output_partitions {
                     writers.push(None);
                 }
@@ -229,9 +229,8 @@ impl ExecutionPlan for QueryStageExec {
                             RecordBatch::try_new(input_batch.schema(), columns)?;
 
                         // write batch out
-                        match &writers[num_output_partition] {
+                        match &mut writers[num_output_partition] {
                             Some(w) => {
-                                let mut w = w.lock().unwrap();
                                 w.write(&output_batch)?;
                             }
                             None => {
@@ -247,8 +246,7 @@ impl ExecutionPlan for QueryStageExec {
                                     ShuffleWriter::new(path, stream.schema().as_ref())?;
 
                                 writer.write(&output_batch)?;
-                                writers[num_output_partition] =
-                                    Some(Arc::new(Mutex::new(writer)));
+                                writers[num_output_partition] = Some(writer);
                             }
                         }
                     }
@@ -262,10 +260,9 @@ impl ExecutionPlan for QueryStageExec {
                 let mut num_batches_builder = UInt64Builder::new(num_writers);
                 let mut num_bytes_builder = UInt64Builder::new(num_writers);
 
-                for (i, w) in writers.iter().enumerate() {
+                for (i, w) in writers.iter_mut().enumerate() {
                     match w {
                         Some(w) => {
-                            let mut w = w.lock().unwrap();
                             w.finish()?;
                             path_builder.append_value(w.path())?;
                             partition_builder.append_value(i as u32)?;