You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/19 21:02:14 UTC
[arrow-rs] branch master updated: Add CSV build_buffered (#3338) (#3368)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 8cab7a2b4 Add CSV build_buffered (#3338) (#3368)
8cab7a2b4 is described below
commit 8cab7a2b446a916bb4b6f3152ddecd4b8b5dd61a
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Dec 19 21:02:08 2022 +0000
Add CSV build_buffered (#3338) (#3368)
* Add CSV build_buffered (#3338)
* Doc tweaks
---
arrow-csv/src/reader/mod.rs | 42 +++++++++++++++++++++++++-----------------
arrow/benches/csv_reader.rs | 2 +-
2 files changed, 26 insertions(+), 18 deletions(-)
diff --git a/arrow-csv/src/reader/mod.rs b/arrow-csv/src/reader/mod.rs
index 877876b77..bc6b016ec 100644
--- a/arrow-csv/src/reader/mod.rs
+++ b/arrow-csv/src/reader/mod.rs
@@ -47,7 +47,7 @@ use regex::{Regex, RegexSet};
use std::collections::HashSet;
use std::fmt;
use std::fs::File;
-use std::io::{BufReader, Read, Seek, SeekFrom};
+use std::io::{BufRead, BufReader as StdBufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
use arrow_array::builder::Decimal128Builder;
@@ -325,14 +325,17 @@ pub fn infer_schema_from_files(
// optional bounds of the reader, of the form (min line, max line).
type Bounds = Option<(usize, usize)>;
+/// CSV file reader using [`std::io::BufReader`]
+pub type Reader<R> = BufReader<StdBufReader<R>>;
+
/// CSV file reader
-pub struct Reader<R: Read> {
+pub struct BufReader<R> {
/// Explicit schema for the CSV file
schema: SchemaRef,
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<usize>>,
/// File reader
- reader: RecordReader<BufReader<R>>,
+ reader: RecordReader<R>,
/// Rows to skip
to_skip: usize,
/// Current line number
@@ -347,9 +350,9 @@ pub struct Reader<R: Read> {
datetime_format: Option<String>,
}
-impl<R> fmt::Debug for Reader<R>
+impl<R> fmt::Debug for BufReader<R>
where
- R: Read,
+ R: BufRead,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Reader")
@@ -394,7 +397,7 @@ impl<R: Read> Reader<R> {
if let Some(format) = datetime_format {
builder = builder.with_datetime_format(format)
}
- builder.build_with_schema(reader, schema)
+ builder.build_with_schema(StdBufReader::new(reader), schema)
}
/// Returns the schema of the reader, useful for getting the schema without reading
@@ -441,7 +444,7 @@ impl<R: Read> Reader<R> {
}
}
-impl<R: Read> Iterator for Reader<R> {
+impl<R: BufRead> Iterator for BufReader<R> {
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
@@ -1040,11 +1043,19 @@ impl ReaderBuilder {
self
}
- /// Create a new `Reader` from the `ReaderBuilder`
- pub fn build<R: Read + Seek>(
+ /// Create a new `Reader` from a non-buffered reader
+ ///
+ /// If `R: BufRead` consider using [`Self::build_buffered`] to avoid unnecessary additional
+ /// buffering, as internally this method wraps `reader` in [`std::io::BufReader`]
+ pub fn build<R: Read + Seek>(self, reader: R) -> Result<Reader<R>, ArrowError> {
+ self.build_buffered(StdBufReader::new(reader))
+ }
+
+ /// Create a new `BufReader` from a buffered reader
+ pub fn build_buffered<R: BufRead + Seek>(
mut self,
mut reader: R,
- ) -> Result<Reader<R>, ArrowError> {
+ ) -> Result<BufReader<R>, ArrowError> {
// check if schema should be inferred
let delimiter = self.delimiter.unwrap_or(b',');
let schema = match self.schema.take() {
@@ -1068,7 +1079,7 @@ impl ReaderBuilder {
Ok(self.build_with_schema(reader, schema))
}
- fn build_with_schema<R: Read>(self, reader: R, schema: SchemaRef) -> Reader<R> {
+ fn build_with_schema<R: BufRead>(self, reader: R, schema: SchemaRef) -> BufReader<R> {
let mut reader_builder = csv_core::ReaderBuilder::new();
reader_builder.escape(self.escape);
@@ -1081,11 +1092,8 @@ impl ReaderBuilder {
if let Some(t) = self.terminator {
reader_builder.terminator(csv_core::Terminator::Any(t));
}
- let reader = RecordReader::new(
- BufReader::new(reader),
- reader_builder.build(),
- schema.fields().len(),
- );
+ let delimiter = reader_builder.build();
+ let reader = RecordReader::new(reader, delimiter, schema.fields().len());
let header = self.has_header as usize;
@@ -1094,7 +1102,7 @@ impl ReaderBuilder {
None => (header, usize::MAX),
};
- Reader {
+ BufReader {
schema,
projection: self.projection,
reader,
diff --git a/arrow/benches/csv_reader.rs b/arrow/benches/csv_reader.rs
index f6353fb85..02c8ca2d2 100644
--- a/arrow/benches/csv_reader.rs
+++ b/arrow/benches/csv_reader.rs
@@ -44,7 +44,7 @@ fn do_bench(c: &mut Criterion, name: &str, cols: Vec<ArrayRef>) {
.with_schema(batch.schema())
.with_batch_size(batch_size)
.has_header(true)
- .build(cursor)
+ .build_buffered(cursor)
.unwrap();
for next in reader {