You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/19 21:02:14 UTC

[arrow-rs] branch master updated: Add CSV build_buffered (#3338) (#3368)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cab7a2b4 Add CSV build_buffered (#3338) (#3368)
8cab7a2b4 is described below

commit 8cab7a2b446a916bb4b6f3152ddecd4b8b5dd61a
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Dec 19 21:02:08 2022 +0000

    Add CSV build_buffered (#3338) (#3368)
    
    * Add CSV build_buffered (#3338)
    
    * Doc tweaks
---
 arrow-csv/src/reader/mod.rs | 42 +++++++++++++++++++++++++-----------------
 arrow/benches/csv_reader.rs |  2 +-
 2 files changed, 26 insertions(+), 18 deletions(-)

diff --git a/arrow-csv/src/reader/mod.rs b/arrow-csv/src/reader/mod.rs
index 877876b77..bc6b016ec 100644
--- a/arrow-csv/src/reader/mod.rs
+++ b/arrow-csv/src/reader/mod.rs
@@ -47,7 +47,7 @@ use regex::{Regex, RegexSet};
 use std::collections::HashSet;
 use std::fmt;
 use std::fs::File;
-use std::io::{BufReader, Read, Seek, SeekFrom};
+use std::io::{BufRead, BufReader as StdBufReader, Read, Seek, SeekFrom};
 use std::sync::Arc;
 
 use arrow_array::builder::Decimal128Builder;
@@ -325,14 +325,17 @@ pub fn infer_schema_from_files(
 // optional bounds of the reader, of the form (min line, max line).
 type Bounds = Option<(usize, usize)>;
 
+/// CSV file reader using [`std::io::BufReader`]
+pub type Reader<R> = BufReader<StdBufReader<R>>;
+
 /// CSV file reader
-pub struct Reader<R: Read> {
+pub struct BufReader<R> {
     /// Explicit schema for the CSV file
     schema: SchemaRef,
     /// Optional projection for which columns to load (zero-based column indices)
     projection: Option<Vec<usize>>,
     /// File reader
-    reader: RecordReader<BufReader<R>>,
+    reader: RecordReader<R>,
     /// Rows to skip
     to_skip: usize,
     /// Current line number
@@ -347,9 +350,9 @@ pub struct Reader<R: Read> {
     datetime_format: Option<String>,
 }
 
-impl<R> fmt::Debug for Reader<R>
+impl<R> fmt::Debug for BufReader<R>
 where
-    R: Read,
+    R: BufRead,
 {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         f.debug_struct("Reader")
@@ -394,7 +397,7 @@ impl<R: Read> Reader<R> {
         if let Some(format) = datetime_format {
             builder = builder.with_datetime_format(format)
         }
-        builder.build_with_schema(reader, schema)
+        builder.build_with_schema(StdBufReader::new(reader), schema)
     }
 
     /// Returns the schema of the reader, useful for getting the schema without reading
@@ -441,7 +444,7 @@ impl<R: Read> Reader<R> {
     }
 }
 
-impl<R: Read> Iterator for Reader<R> {
+impl<R: BufRead> Iterator for BufReader<R> {
     type Item = Result<RecordBatch, ArrowError>;
 
     fn next(&mut self) -> Option<Self::Item> {
@@ -1040,11 +1043,19 @@ impl ReaderBuilder {
         self
     }
 
-    /// Create a new `Reader` from the `ReaderBuilder`
-    pub fn build<R: Read + Seek>(
+    /// Create a new `Reader` from a non-buffered reader
+    ///
+    /// If `R: BufRead` consider using [`Self::build_buffered`] to avoid unnecessary additional
+    /// buffering, as internally this method wraps `reader` in [`std::io::BufReader`]
+    pub fn build<R: Read + Seek>(self, reader: R) -> Result<Reader<R>, ArrowError> {
+        self.build_buffered(StdBufReader::new(reader))
+    }
+
+    /// Create a new `BufReader` from a buffered reader
+    pub fn build_buffered<R: BufRead + Seek>(
         mut self,
         mut reader: R,
-    ) -> Result<Reader<R>, ArrowError> {
+    ) -> Result<BufReader<R>, ArrowError> {
         // check if schema should be inferred
         let delimiter = self.delimiter.unwrap_or(b',');
         let schema = match self.schema.take() {
@@ -1068,7 +1079,7 @@ impl ReaderBuilder {
         Ok(self.build_with_schema(reader, schema))
     }
 
-    fn build_with_schema<R: Read>(self, reader: R, schema: SchemaRef) -> Reader<R> {
+    fn build_with_schema<R: BufRead>(self, reader: R, schema: SchemaRef) -> BufReader<R> {
         let mut reader_builder = csv_core::ReaderBuilder::new();
         reader_builder.escape(self.escape);
 
@@ -1081,11 +1092,8 @@ impl ReaderBuilder {
         if let Some(t) = self.terminator {
             reader_builder.terminator(csv_core::Terminator::Any(t));
         }
-        let reader = RecordReader::new(
-            BufReader::new(reader),
-            reader_builder.build(),
-            schema.fields().len(),
-        );
+        let delimiter = reader_builder.build();
+        let reader = RecordReader::new(reader, delimiter, schema.fields().len());
 
         let header = self.has_header as usize;
 
@@ -1094,7 +1102,7 @@ impl ReaderBuilder {
             None => (header, usize::MAX),
         };
 
-        Reader {
+        BufReader {
             schema,
             projection: self.projection,
             reader,
diff --git a/arrow/benches/csv_reader.rs b/arrow/benches/csv_reader.rs
index f6353fb85..02c8ca2d2 100644
--- a/arrow/benches/csv_reader.rs
+++ b/arrow/benches/csv_reader.rs
@@ -44,7 +44,7 @@ fn do_bench(c: &mut Criterion, name: &str, cols: Vec<ArrayRef>) {
                     .with_schema(batch.schema())
                     .with_batch_size(batch_size)
                     .has_header(true)
-                    .build(cursor)
+                    .build_buffered(cursor)
                     .unwrap();
 
                 for next in reader {