You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/02/05 10:59:05 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5161: FileStream: Open next file in parallel while decoding

alamb commented on code in PR #5161:
URL: https://github.com/apache/arrow-datafusion/pull/5161#discussion_r1096662607


##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -98,6 +99,8 @@ enum FileStreamState {
         partition_values: Vec<ScalarValue>,
         /// The reader instance
         reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
+        /// A [`FileOpenFuture`] for the next file to be processed

Review Comment:
   ```suggestion
           /// A [`FileOpenFuture`] for the next file to be processed, 
           /// and its corresponding partition column values, if any.
           /// This allows the next file to be opened in parallel while the
           /// current file is read. 
   ```



##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -287,7 +321,18 @@ impl<F: FileOpener> FileStream<F> {
                     None => {
                         self.file_stream_metrics.time_scanning_until_data.stop();
                         self.file_stream_metrics.time_scanning_total.stop();
-                        self.state = FileStreamState::Idle;
+
+                        match mem::take(next) {
+                            Some((future, partition_values)) => {
+                                self.file_stream_metrics.time_opening.start();
+
+                                self.state = FileStreamState::Open {
+                                    future,
+                                    partition_values,
+                                }
+                            }
+                            None => return Poll::Ready(None),

Review Comment:
   ```suggestion
                               // No more input files
                               None => return Poll::Ready(None),
   ```



##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -202,30 +205,39 @@ impl<F: FileOpener> FileStream<F> {
         })
     }
 
+    fn next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {
+        let part_file = match self.file_iter.pop_front() {
+            Some(file) => file,
+            None => return None,
+        };

Review Comment:
   I think you can use `?` for returning options as well, like
   
   ```suggestion
           let part_file =  self.file_iter.pop_front()?;
   ```



##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -237,13 +249,34 @@ impl<F: FileOpener> FileStream<F> {
                     partition_values,
                 } => match ready!(future.poll_unpin(cx)) {
                     Ok(reader) => {
+                        let partition_values = mem::take(partition_values);
+
+                        let next = self.next_file().transpose();

Review Comment:
   ```suggestion
                           // begin opening next file
                           let next = self.next_file().transpose();
   ```



##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -202,30 +205,39 @@ impl<F: FileOpener> FileStream<F> {
         })
     }
 
+    fn next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {

Review Comment:
   ```suggestion
       // Begin opening the next file in parallel while decoding the current file in FileStream. 
       // Since file opening is mostly IO (and may involve a 
       // bunch of sequential IO), it can be parallelized with decoding. 
       fn next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {
   ```
   
   Also, I wonder what you think about calling this function `start_next_file` or something to hint that it begins to open the file ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org