You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/15 18:20:58 UTC

[GitHub] [arrow] alamb commented on a change in pull request #7969: ARROW-9753: [Rust] [DataFusion] Remove use of Mutex from DataFusion Partition trait

alamb commented on a change in pull request #7969:
URL: https://github.com/apache/arrow/pull/7969#discussion_r471019931



##########
File path: rust/datafusion/src/execution/physical_plan/csv.rs
##########
@@ -241,22 +241,22 @@ impl CsvPartition {
 
 impl Partition for CsvPartition {
     /// Execute this partition and return an iterator over RecordBatch
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
-        Ok(Arc::new(Mutex::new(CsvIterator::try_new(
+    fn execute(&self) -> Result<Arc<dyn RecordBatchReader + Send + Sync>> {
+        Ok(Arc::new(CsvIterator::try_new(
             &self.path,
             self.schema.clone(),
             self.has_header,
             self.delimiter,
             &self.projection,
             self.batch_size,
-        )?)))
+        )?))
     }
 }
 
 /// Iterator over batches
 struct CsvIterator {
     /// Arrow CSV reader
-    reader: csv::Reader<File>,
+    reader: Arc<Mutex<csv::Reader<File>>>,

Review comment:
       I think this is a better place for the Mutex -- in the place where concurrency is explicit and the code has to handle multiple files

##########
File path: rust/arrow/src/ipc/reader.rs
##########
@@ -651,19 +654,20 @@ impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
         self.schema.clone()
     }
 
-    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {

Review comment:
       Is this change (to not need `mut`) required in this PR? It seems like it could be separated out from the DataFusion change, though perhaps I am missing something
   
   Given `self.reader` and `self.current_block` are actually being changed by `next_batch` I think it might be harder to reason about if this method looks like it doesn't mutate any state. 
   
   Also, once we make this non-mut, if we ever wanted to make it mut in the future, that would likely be a compatibility breaking change. 

##########
File path: rust/datafusion/src/execution/physical_plan/common.rs
##########
@@ -52,24 +53,21 @@ impl RecordBatchReader for RecordBatchIterator {
         self.schema.clone()
     }
 
-    fn next_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
-        if self.index < self.batches.len() {

Review comment:
       Yeah, I think the old code here (with `&mut self`) is clearer about what is going on




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org