You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/15 13:37:16 UTC

[GitHub] [arrow] andygrove opened a new pull request #7969: ARROW-9753: [Rust] [DataFusion] Remove use of Mutex from DataFusion Partition trait

andygrove opened a new pull request #7969:
URL: https://github.com/apache/arrow/pull/7969


   This PR modifies the DataFusion Partition trait, changing this method ...
   
   ```rust
   fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>;
   ```
   
   to
   
   ```rust
   fn execute(&self) -> Result<Arc<dyn RecordBatchReader + Send + Sync>>;
   ```
   
   This is a cleaner API in my opinion and removes the overhead of a mutex lock per batch per operator, which is often redundant since many operators do not contain mutable state.
   
   This does affect the core arrow and parquet crates as well, removing the `&mut self` requirement from `ArrayReader`, for example, in preference of using `Rc<RefCell<_>>` with the reader.
   
   I ran the TPC-H query 1 benchmark (scale factor 100) before and after these changes and saw no noticeable difference in performance (20.6 seconds).
   
   I think these changes are also a good step towards being able to adopt async as well.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #7969: ARROW-9753: [Rust] [DataFusion] Remove use of Mutex from DataFusion Partition trait

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #7969:
URL: https://github.com/apache/arrow/pull/7969


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #7969: ARROW-9753: [Rust] [DataFusion] Remove use of Mutex from DataFusion Partition trait

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #7969:
URL: https://github.com/apache/arrow/pull/7969#discussion_r471031743



##########
File path: rust/arrow/src/ipc/reader.rs
##########
@@ -651,19 +654,20 @@ impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
         self.schema.clone()
     }
 
-    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {

Review comment:
       The issue is that both DataFusion and Parquet implement the same RecordBatchReader trait that requires `&mut self` and therefore requires the use of mutexes. Maybe DataFusion should have its own trait instead. I'll try that approach next.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7969: ARROW-9753: [Rust] [DataFusion] Remove use of Mutex from DataFusion Partition trait

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7969:
URL: https://github.com/apache/arrow/pull/7969#issuecomment-674446547


   @alamb @sunchao Thanks for the reviews. I'm closing this for now and will re-think the approach.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7969: ARROW-9753: [Rust] [DataFusion] Remove use of Mutex from DataFusion Partition trait

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7969:
URL: https://github.com/apache/arrow/pull/7969#issuecomment-674398261


   https://issues.apache.org/jira/browse/ARROW-9753


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] sunchao commented on a change in pull request #7969: ARROW-9753: [Rust] [DataFusion] Remove use of Mutex from DataFusion Partition trait

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #7969:
URL: https://github.com/apache/arrow/pull/7969#discussion_r471017707



##########
File path: rust/arrow/src/ipc/reader.rs
##########
@@ -473,7 +476,7 @@ pub struct FileReader<R: Read + Seek> {
     blocks: Vec<ipc::Block>,
 
     /// A counter to keep track of the current block that should be read
-    current_block: usize,
+    current_block: AtomicUsize,

Review comment:
       curious why this is an atomic var now - will there by multiple threads accessing the same `FileReader` instance? 

##########
File path: rust/arrow/src/ipc/reader.rs
##########
@@ -783,21 +787,22 @@ impl<R: Read> RecordBatchReader for StreamReader<R> {
         self.schema.clone()
     }
 
-    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
-        if self.finished {
+    fn next_batch(&self) -> Result<Option<RecordBatch>> {
+        if self.finished.load(Ordering::SeqCst) {
             return Ok(None);
         }
         // determine metadata length
         let mut meta_size: [u8; 4] = [0; 4];
 
-        match self.reader.read_exact(&mut meta_size) {
+        let mut reader = self.reader.borrow_mut();

Review comment:
       this will panic when someone else is holding the reference, so we need to be careful - not sure if `try_borrow_mut` makes more sense here though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7969: ARROW-9753: [Rust] [DataFusion] Remove use of Mutex from DataFusion Partition trait

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7969:
URL: https://github.com/apache/arrow/pull/7969#issuecomment-674397460


   @sunchao @nevi-me @paddyhoran PTAL since this touches the core arrow and the parquet crates.
   
   @jorgecarleitao @alamb fyi


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7969: ARROW-9753: [Rust] [DataFusion] Remove use of Mutex from DataFusion Partition trait

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7969:
URL: https://github.com/apache/arrow/pull/7969#discussion_r471019931



##########
File path: rust/datafusion/src/execution/physical_plan/csv.rs
##########
@@ -241,22 +241,22 @@ impl CsvPartition {
 
 impl Partition for CsvPartition {
     /// Execute this partition and return an iterator over RecordBatch
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
-        Ok(Arc::new(Mutex::new(CsvIterator::try_new(
+    fn execute(&self) -> Result<Arc<dyn RecordBatchReader + Send + Sync>> {
+        Ok(Arc::new(CsvIterator::try_new(
             &self.path,
             self.schema.clone(),
             self.has_header,
             self.delimiter,
             &self.projection,
             self.batch_size,
-        )?)))
+        )?))
     }
 }
 
 /// Iterator over batches
 struct CsvIterator {
     /// Arrow CSV reader
-    reader: csv::Reader<File>,
+    reader: Arc<Mutex<csv::Reader<File>>>,

Review comment:
       I think this is a better place for the Mutex -- in the place where concurrency is explicit and the code has to handle multiple files

##########
File path: rust/arrow/src/ipc/reader.rs
##########
@@ -651,19 +654,20 @@ impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
         self.schema.clone()
     }
 
-    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {

Review comment:
       Is this change (to not need `mut`) required in this PR? It seems like it could be separated out from the DataFusion change, though perhaps I am missing something
   
   Given `self.reader` and `self.current_block` are actually being changed by `next_batch` I think it might be harder to reason about if this method looks like it doesn't mutate any state. 
   
   Also, once we make this non-mut, if we ever wanted to make it mut in the future, that would likely be a compatibility breaking change. 

##########
File path: rust/datafusion/src/execution/physical_plan/common.rs
##########
@@ -52,24 +53,21 @@ impl RecordBatchReader for RecordBatchIterator {
         self.schema.clone()
     }
 
-    fn next_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
-        if self.index < self.batches.len() {

Review comment:
       Yeah, I think the old code here (with `&mut self`) is clearer about what is going on




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org