You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "marvinlanhenke (via GitHub)" <gi...@apache.org> on 2023/12/27 16:18:55 UTC

Re: [PR] parallel csv scan [arrow-datafusion]

marvinlanhenke commented on code in PR #6801:
URL: https://github.com/apache/arrow-datafusion/pull/6801#discussion_r1437135508


##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -270,14 +297,223 @@ impl CsvOpener {
     }
 }
 
+/// Returns the position of the first newline in the byte stream, or the total length if no newline is found.
+fn find_first_newline_bytes<R: std::io::Read>(reader: &mut R) -> Result<usize> {
+    let mut buffer = [0; 1];
+    let mut index = 0;
+
+    loop {
+        let result = reader.read(&mut buffer);
+        match result {
+            Ok(n) => {
+                if n == 0 {
+                    return Ok(index); // End of file, no newline found
+                }
+                if buffer[0] == b'\n' {
+                    return Ok(index);
+                }
+                index += 1;
+            }
+            Err(e) => {
+                return Err(DataFusionError::IoError(e));
+            }
+        }
+    }
+}
+
+/// Returns the offset of the first newline in the object store range [start, end), or the end offset if no newline is found.
+async fn find_first_newline(
+    object_store: &Arc<dyn ObjectStore>,
+    location: &object_store::path::Path,
+    start_byte: usize,
+    end_byte: usize,
+) -> Result<usize> {
+    let options = GetOptions {
+        range: Some(Range {
+            start: start_byte,
+            end: end_byte,
+        }),
+        ..Default::default()
+    };
+
+    let offset = match object_store.get_opts(location, options).await? {
+        GetResult::File(_, _) => {
+            // Range currently is ignored for GetResult::File(...)
+            // Alternative get_range() will copy the whole range into memory, thus set a limit of
+            // max bytes to read to find the first newline
+            let max_line_length = 4096; // in bytes
+            let get_range_end_result = object_store
+                .get_range(
+                    location,
+                    Range {
+                        start: start_byte,
+                        end: std::cmp::min(start_byte + max_line_length, end_byte),
+                    },
+                )
+                .await;
+            let mut decoder_tail = Cursor::new(get_range_end_result?);
+            find_first_newline_bytes(&mut decoder_tail)?
+        }
+        GetResult::Stream(s) => {
+            let mut input = s.map_err(DataFusionError::from);
+            let mut buffered = Bytes::new();
+
+            let future_index = async move {
+                let mut index = 0;
+
+                loop {
+                    if buffered.is_empty() {
+                        match input.next().await {
+                            Some(Ok(b)) => buffered = b,
+                            Some(Err(e)) => return Err(e),
+                            None => return Ok(index),
+                        };
+                    }
+
+                    for byte in &buffered {
+                        if *byte == b'\n' {
+                            return Ok(index);
+                        }
+                        index += 1;
+                    }
+
+                    buffered.advance(buffered.len());
+                }
+            };
+            future_index.await?
+        }
+    };
+    Ok(offset)
+}
+
 impl FileOpener for CsvOpener {
+    /// Open a partitioned CSV file.
+    ///
+    /// If `file_meta.range` is `None`, the entire file is opened.
+    /// If `file_meta.range` is `Some(FileRange {start, end})`, this signifies that the partition
+    /// corresponds to the byte range [start, end) within the file.
+    ///
+    /// Note: `start` or `end` might be in the middle of some lines. In such cases, the following rules

Review Comment:
   > # Alternate idea
   > Another approach that would reduce the number of object store requests would be to read past the initial range and stop at the next newline `\n` like this:
   > 
   > ```
   >   Each partition reads *more* than its assigned ranged to find                     
   >   the trailing new line, and ignores everything afterwards                         
   >                                                                                    
   >                                                                                    
   > ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                                          
   > ┌─────────────┬──┬───────┼────────────┬──┬────────────────┬──┬────────┬──┬────────┐
   > │     ...     │\n│        ...         │\n│      ...       │\n│  ...   │\n│  ...   │
   > └─────────────┴──┴───────┼────────────┴──┴────────────────┴──┴────────┴──┴────────┘
   > │                                                                                  
   >  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                                                         
   >   Partition 0                                                                      
   >   Read                                                                             
   > ```
   > 
   > I think the tricky bit of this design would be to ensure enough extra data was read. Initially, maybe we could just pick something sufficiently large for most files, like 1MB and error if the next newline can't be found. As a follow on we could add some fanciness like make another object store request if necessary.
   
   @alamb 
   ...working on #8502; I noticed the same thing - so am I right; that this "issue" is yet to be resolved? 
   
   One could benchmark the different approaches (this PR vs. alternative Idea); I like the idea of picking a "sufficient large enough range at the beginning" and trying to recover with an additional object_store request. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org