You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/19 08:56:12 UTC
[arrow-rs] branch master updated: Add csv-core based reader (#3338) (#3365)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new c3444334c Add csv-core based reader (#3338) (#3365)
c3444334c is described below
commit c3444334c58b651206e7bac946e21e7d2ae48c59
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Dec 19 08:56:06 2022 +0000
Add csv-core based reader (#3338) (#3365)
* Add csv-core based reader (#3338)
* More docs
---
arrow-csv/Cargo.toml | 1 +
arrow-csv/src/{reader.rs => reader/mod.rs} | 397 ++++++++++++++---------------
arrow-csv/src/reader/records.rs | 266 +++++++++++++++++++
3 files changed, 458 insertions(+), 206 deletions(-)
diff --git a/arrow-csv/Cargo.toml b/arrow-csv/Cargo.toml
index 0a8a0bec7..d02e599b3 100644
--- a/arrow-csv/Cargo.toml
+++ b/arrow-csv/Cargo.toml
@@ -45,6 +45,7 @@ arrow-data = { version = "29.0.0", path = "../arrow-data" }
arrow-schema = { version = "29.0.0", path = "../arrow-schema" }
chrono = { version = "0.4.23", default-features = false, features = ["clock"] }
csv = { version = "1.1", default-features = false }
+csv-core = { version = "0.1"}
lazy_static = { version = "1.4", default-features = false }
lexical-core = { version = "^0.8", default-features = false }
regex = { version = "1.7.0", default-features = false, features = ["std", "unicode", "perf"] }
diff --git a/arrow-csv/src/reader.rs b/arrow-csv/src/reader/mod.rs
similarity index 90%
rename from arrow-csv/src/reader.rs
rename to arrow-csv/src/reader/mod.rs
index c69e1753b..877876b77 100644
--- a/arrow-csv/src/reader.rs
+++ b/arrow-csv/src/reader/mod.rs
@@ -40,13 +40,14 @@
//! let batch = csv.next().unwrap().unwrap();
//! ```
-use core::cmp::min;
+mod records;
+
use lazy_static::lazy_static;
use regex::{Regex, RegexSet};
use std::collections::HashSet;
use std::fmt;
use std::fs::File;
-use std::io::{Read, Seek, SeekFrom};
+use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
use arrow_array::builder::Decimal128Builder;
@@ -56,8 +57,9 @@ use arrow_cast::parse::Parser;
use arrow_schema::*;
use crate::map_csv_error;
+use crate::reader::records::{RecordReader, StringRecords};
use arrow_data::decimal::validate_decimal_precision;
-use csv::{ByteRecord, StringRecord};
+use csv::StringRecord;
use std::ops::Neg;
lazy_static! {
@@ -107,7 +109,7 @@ fn infer_field_schema(string: &str, datetime_re: Option<Regex>) -> DataType {
/// This is a collection of options for csv reader when the builder pattern cannot be used
/// and the parameters need to be passed around
#[derive(Debug, Default, Clone)]
-pub struct ReaderOptions {
+struct ReaderOptions {
has_header: bool,
delimiter: Option<u8>,
escape: Option<u8>,
@@ -177,11 +179,36 @@ pub fn infer_reader_schema<R: Read>(
infer_reader_schema_with_csv_options(reader, roptions)
}
+/// Creates a `csv::Reader`
+fn build_csv_reader<R: Read>(
+ reader: R,
+ has_header: bool,
+ delimiter: Option<u8>,
+ escape: Option<u8>,
+ quote: Option<u8>,
+ terminator: Option<u8>,
+) -> csv::Reader<R> {
+ let mut reader_builder = csv::ReaderBuilder::new();
+ reader_builder.has_headers(has_header);
+
+ if let Some(c) = delimiter {
+ reader_builder.delimiter(c);
+ }
+ reader_builder.escape(escape);
+ if let Some(c) = quote {
+ reader_builder.quote(c);
+ }
+ if let Some(t) = terminator {
+ reader_builder.terminator(csv::Terminator::Any(t));
+ }
+ reader_builder.from_reader(reader)
+}
+
fn infer_reader_schema_with_csv_options<R: Read>(
reader: R,
roptions: ReaderOptions,
) -> Result<(Schema, usize), ArrowError> {
- let mut csv_reader = Reader::build_csv_reader(
+ let mut csv_reader = build_csv_reader(
reader,
roptions.has_header,
roptions.delimiter,
@@ -305,15 +332,15 @@ pub struct Reader<R: Read> {
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<usize>>,
/// File reader
- reader: csv::Reader<R>,
+ reader: RecordReader<BufReader<R>>,
+ /// Rows to skip
+ to_skip: usize,
/// Current line number
line_number: usize,
- /// Maximum number of rows to read
+ /// End line number
end: usize,
/// Number of records per batch
batch_size: usize,
- /// Vector that can hold the `StringRecord`s of the batches
- batch_records: Vec<StringRecord>,
/// datetime format used to parse datetime values, (format understood by chrono)
///
/// For format refer to [chrono docs](https://docs.rs/chrono/0.4.19/chrono/format/strftime/index.html)
@@ -351,16 +378,23 @@ impl<R: Read> Reader<R> {
projection: Option<Vec<usize>>,
datetime_format: Option<String>,
) -> Self {
- Self::from_reader(
- reader,
- schema,
- has_header,
- delimiter,
- batch_size,
- bounds,
- projection,
- datetime_format,
- )
+ let mut builder = ReaderBuilder::new()
+ .has_header(has_header)
+ .with_batch_size(batch_size);
+
+ if let Some(delimiter) = delimiter {
+ builder = builder.with_delimiter(delimiter);
+ }
+ if let Some((start, end)) = bounds {
+ builder = builder.with_bounds(start, end);
+ }
+ if let Some(projection) = projection {
+ builder = builder.with_projection(projection)
+ }
+ if let Some(format) = datetime_format {
+ builder = builder.with_datetime_format(format)
+ }
+ builder.build_with_schema(reader, schema)
}
/// Returns the schema of the reader, useful for getting the schema without reading
@@ -383,6 +417,7 @@ impl<R: Read> Reader<R> {
/// This constructor allows you more flexibility in what records are processed by the
/// csv reader.
#[allow(clippy::too_many_arguments)]
+ #[deprecated(note = "Use Reader::new or ReaderBuilder")]
pub fn from_reader(
reader: R,
schema: SchemaRef,
@@ -393,142 +428,57 @@ impl<R: Read> Reader<R> {
projection: Option<Vec<usize>>,
datetime_format: Option<String>,
) -> Self {
- let csv_reader =
- Self::build_csv_reader(reader, has_header, delimiter, None, None, None);
- Self::from_csv_reader(
- csv_reader,
+ Self::new(
+ reader,
schema,
has_header,
+ delimiter,
batch_size,
bounds,
projection,
datetime_format,
)
}
-
- fn build_csv_reader(
- reader: R,
- has_header: bool,
- delimiter: Option<u8>,
- escape: Option<u8>,
- quote: Option<u8>,
- terminator: Option<u8>,
- ) -> csv::Reader<R> {
- let mut reader_builder = csv::ReaderBuilder::new();
- reader_builder.has_headers(has_header);
-
- if let Some(c) = delimiter {
- reader_builder.delimiter(c);
- }
- reader_builder.escape(escape);
- if let Some(c) = quote {
- reader_builder.quote(c);
- }
- if let Some(t) = terminator {
- reader_builder.terminator(csv::Terminator::Any(t));
- }
- reader_builder.from_reader(reader)
- }
-
- fn from_csv_reader(
- mut csv_reader: csv::Reader<R>,
- schema: SchemaRef,
- has_header: bool,
- batch_size: usize,
- bounds: Bounds,
- projection: Option<Vec<usize>>,
- datetime_format: Option<String>,
- ) -> Self {
- let (start, end) = match bounds {
- None => (0, usize::MAX),
- Some((start, end)) => (start, end),
- };
-
- // First we will skip `start` rows
- // note that this skips by iteration. This is because in general it is not possible
- // to seek in CSV. However, skipping still saves the burden of creating arrow arrays,
- // which is a slow operation that scales with the number of columns
-
- let mut record = ByteRecord::new();
- // Skip first start items
- for _ in 0..start {
- let res = csv_reader.read_byte_record(&mut record);
- if !res.unwrap_or(false) {
- break;
- }
- }
-
- // Initialize batch_records with StringRecords so they
- // can be reused across batches
- let mut batch_records = Vec::with_capacity(batch_size);
- batch_records.resize_with(batch_size, Default::default);
-
- Self {
- schema,
- projection,
- reader: csv_reader,
- line_number: if has_header { start + 1 } else { start },
- batch_size,
- end,
- batch_records,
- datetime_format,
- }
- }
}
impl<R: Read> Iterator for Reader<R> {
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
- let remaining = self.end - self.line_number;
-
- let mut read_records = 0;
- for i in 0..min(self.batch_size, remaining) {
- match self.reader.read_record(&mut self.batch_records[i]) {
- Ok(true) => {
- read_records += 1;
- }
- Ok(false) => break,
- Err(e) => {
- return Some(Err(ArrowError::ParseError(format!(
- "Error parsing line {}: {:?}",
- self.line_number + i,
- e
- ))));
- }
+ if self.to_skip != 0 {
+ if let Err(e) = self.reader.skip(std::mem::take(&mut self.to_skip)) {
+ return Some(Err(e));
}
}
- // return early if no data was loaded
- if read_records == 0 {
- return None;
- }
+ let remaining = self.end - self.line_number;
+ let to_read = self.batch_size.min(remaining);
- let format: Option<&str> = match self.datetime_format {
- Some(ref format) => Some(format.as_ref()),
- _ => None,
+ let batch = match self.reader.read(to_read) {
+ Ok(b) if b.is_empty() => return None,
+ Ok(b) => b,
+ Err(e) => return Some(Err(e)),
};
// parse the batches into a RecordBatch
let result = parse(
- &self.batch_records[..read_records],
+ &batch,
self.schema.fields(),
Some(self.schema.metadata.clone()),
self.projection.as_ref(),
self.line_number,
- format,
+ self.datetime_format.as_deref(),
);
- self.line_number += read_records;
+ self.line_number += batch.len();
Some(result)
}
}
-/// parses a slice of [csv::StringRecord] into a
-/// [RecordBatch]
+/// Parses a slice of [`StringRecords`] into a [RecordBatch]
fn parse(
- rows: &[StringRecord],
+ rows: &StringRecords<'_>,
fields: &[Field],
metadata: Option<std::collections::HashMap<String, String>>,
projection: Option<&Vec<usize>>,
@@ -624,7 +574,9 @@ fn parse(
)
}
DataType::Utf8 => Ok(Arc::new(
- rows.iter().map(|row| row.get(i)).collect::<StringArray>(),
+ rows.iter()
+ .map(|row| Some(row.get(i)))
+ .collect::<StringArray>(),
) as ArrayRef),
DataType::Dictionary(key_type, value_type)
if value_type.as_ref() == &DataType::Utf8 =>
@@ -723,34 +675,26 @@ fn parse_bool(string: &str) -> Option<bool> {
// parse the column string to an Arrow Array
fn build_decimal_array(
_line_number: usize,
- rows: &[StringRecord],
+ rows: &StringRecords<'_>,
col_idx: usize,
precision: u8,
scale: i8,
) -> Result<ArrayRef, ArrowError> {
let mut decimal_builder = Decimal128Builder::with_capacity(rows.len());
- for row in rows {
- let col_s = row.get(col_idx);
- match col_s {
- None => {
- // No data for this row
- decimal_builder.append_null();
- }
- Some(s) => {
- if s.is_empty() {
- // append null
- decimal_builder.append_null();
- } else {
- let decimal_value: Result<i128, _> =
- parse_decimal_with_parameter(s, precision, scale);
- match decimal_value {
- Ok(v) => {
- decimal_builder.append_value(v);
- }
- Err(e) => {
- return Err(e);
- }
- }
+ for row in rows.iter() {
+ let s = row.get(col_idx);
+ if s.is_empty() {
+ // append null
+ decimal_builder.append_null();
+ } else {
+ let decimal_value: Result<i128, _> =
+ parse_decimal_with_parameter(s, precision, scale);
+ match decimal_value {
+ Ok(v) => {
+ decimal_builder.append_value(v);
+ }
+ Err(e) => {
+ return Err(e);
}
}
}
@@ -878,35 +822,31 @@ fn parse_decimal(s: &str) -> Result<i128, ArrowError> {
// parses a specific column (col_idx) into an Arrow Array.
fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
line_number: usize,
- rows: &[StringRecord],
+ rows: &StringRecords<'_>,
col_idx: usize,
format: Option<&str>,
) -> Result<ArrayRef, ArrowError> {
rows.iter()
.enumerate()
.map(|(row_index, row)| {
- match row.get(col_idx) {
- Some(s) => {
- if s.is_empty() {
- return Ok(None);
- }
+ let s = row.get(col_idx);
+ if s.is_empty() {
+ return Ok(None);
+ }
- let parsed = match format {
- Some(format) => parse_formatted::<T>(s, format),
- _ => parse_item::<T>(s),
- };
- match parsed {
- Some(e) => Ok(Some(e)),
- None => Err(ArrowError::ParseError(format!(
- // TODO: we should surface the underlying error here.
- "Error while parsing value {} for column {} at line {}",
- s,
- col_idx,
- line_number + row_index
- ))),
- }
- }
- None => Ok(None),
+ let parsed = match format {
+ Some(format) => parse_formatted::<T>(s, format),
+ _ => parse_item::<T>(s),
+ };
+ match parsed {
+ Some(e) => Ok(Some(e)),
+ None => Err(ArrowError::ParseError(format!(
+ // TODO: we should surface the underlying error here.
+ "Error while parsing value {} for column {} at line {}",
+ s,
+ col_idx,
+ line_number + row_index
+ ))),
}
})
.collect::<Result<PrimitiveArray<T>, ArrowError>>()
@@ -916,31 +856,23 @@ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
// parses a specific column (col_idx) into an Arrow Array.
fn build_boolean_array(
line_number: usize,
- rows: &[StringRecord],
+ rows: &StringRecords<'_>,
col_idx: usize,
) -> Result<ArrayRef, ArrowError> {
rows.iter()
.enumerate()
.map(|(row_index, row)| {
- match row.get(col_idx) {
- Some(s) => {
- if s.is_empty() {
- return Ok(None);
- }
-
- let parsed = parse_bool(s);
- match parsed {
- Some(e) => Ok(Some(e)),
- None => Err(ArrowError::ParseError(format!(
- // TODO: we should surface the underlying error here.
- "Error while parsing value {} for column {} at line {}",
- s,
- col_idx,
- line_number + row_index
- ))),
- }
- }
- None => Ok(None),
+ let s = row.get(col_idx);
+ let parsed = parse_bool(s);
+ match parsed {
+ Some(e) => Ok(Some(e)),
+ None => Err(ArrowError::ParseError(format!(
+ // TODO: we should surface the underlying error here.
+ "Error while parsing value {} for column {} at line {}",
+ s,
+ col_idx,
+ line_number + row_index
+ ))),
}
})
.collect::<Result<BooleanArray, _>>()
@@ -1109,10 +1041,13 @@ impl ReaderBuilder {
}
/// Create a new `Reader` from the `ReaderBuilder`
- pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<Reader<R>, ArrowError> {
+ pub fn build<R: Read + Seek>(
+ mut self,
+ mut reader: R,
+ ) -> Result<Reader<R>, ArrowError> {
// check if schema should be inferred
let delimiter = self.delimiter.unwrap_or(b',');
- let schema = match self.schema {
+ let schema = match self.schema.take() {
Some(schema) => schema,
None => {
let roptions = ReaderOptions {
@@ -1122,7 +1057,7 @@ impl ReaderBuilder {
escape: self.escape,
quote: self.quote,
terminator: self.terminator,
- datetime_re: self.datetime_re,
+ datetime_re: self.datetime_re.take(),
};
let (inferred_schema, _) =
infer_file_schema_with_csv_options(&mut reader, roptions)?;
@@ -1130,23 +1065,45 @@ impl ReaderBuilder {
Arc::new(inferred_schema)
}
};
- let csv_reader = Reader::build_csv_reader(
- reader,
- self.has_header,
- self.delimiter,
- self.escape,
- self.quote,
- self.terminator,
+ Ok(self.build_with_schema(reader, schema))
+ }
+
+ fn build_with_schema<R: Read>(self, reader: R, schema: SchemaRef) -> Reader<R> {
+ let mut reader_builder = csv_core::ReaderBuilder::new();
+ reader_builder.escape(self.escape);
+
+ if let Some(c) = self.delimiter {
+ reader_builder.delimiter(c);
+ }
+ if let Some(c) = self.quote {
+ reader_builder.quote(c);
+ }
+ if let Some(t) = self.terminator {
+ reader_builder.terminator(csv_core::Terminator::Any(t));
+ }
+ let reader = RecordReader::new(
+ BufReader::new(reader),
+ reader_builder.build(),
+ schema.fields().len(),
);
- Ok(Reader::from_csv_reader(
- csv_reader,
+
+ let header = self.has_header as usize;
+
+ let (start, end) = match self.bounds {
+ Some((start, end)) => (start + header, end + header),
+ None => (header, usize::MAX),
+ };
+
+ Reader {
schema,
- self.has_header,
- self.batch_size,
- self.bounds,
- self.projection.clone(),
- self.datetime_format,
- ))
+ projection: self.projection,
+ reader,
+ to_skip: start,
+ line_number: start,
+ end,
+ batch_size: self.batch_size,
+ datetime_format: self.datetime_format,
+ }
}
}
@@ -1285,7 +1242,7 @@ mod tests {
let both_files = file_with_headers
.chain(Cursor::new("\n".to_string()))
.chain(file_without_headers);
- let mut csv = Reader::from_reader(
+ let mut csv = Reader::new(
both_files,
Arc::new(schema),
true,
@@ -1480,6 +1437,7 @@ mod tests {
Field::new("c_int", DataType::UInt64, false),
Field::new("c_float", DataType::Float32, true),
Field::new("c_string", DataType::Utf8, false),
+ Field::new("c_bool", DataType::Boolean, false),
]);
let file = File::open("test/data/null_test.csv").unwrap();
@@ -2074,4 +2032,31 @@ mod tests {
let col1_arr = col1.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(col1_arr.value(5), "value5");
}
+
+ #[test]
+ fn test_header_bounds() {
+ let csv = "a,b\na,b\na,b\na,b\na,b\n";
+ let tests = [
+ (None, false, 5),
+ (None, true, 4),
+ (Some((0, 4)), false, 4),
+ (Some((1, 4)), false, 3),
+ (Some((0, 4)), true, 4),
+ (Some((1, 4)), true, 3),
+ ];
+
+ for (idx, (bounds, has_header, expected)) in tests.into_iter().enumerate() {
+ let mut reader = ReaderBuilder::new().has_header(has_header);
+ if let Some((start, end)) = bounds {
+ reader = reader.with_bounds(start, end);
+ }
+ let b = reader
+ .build(Cursor::new(csv.as_bytes()))
+ .unwrap()
+ .next()
+ .unwrap()
+ .unwrap();
+ assert_eq!(b.num_rows(), expected, "{}", idx);
+ }
+ }
}
diff --git a/arrow-csv/src/reader/records.rs b/arrow-csv/src/reader/records.rs
new file mode 100644
index 000000000..711baa152
--- /dev/null
+++ b/arrow-csv/src/reader/records.rs
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_schema::ArrowError;
+use csv_core::{ReadRecordResult, Reader};
+use std::io::BufRead;
+
+/// The estimated length of a field in bytes
+const AVERAGE_FIELD_SIZE: usize = 8;
+
+/// The minimum amount of data in a single read
+const MIN_CAPACITY: usize = 1024;
+
+pub struct RecordReader<R> {
+ reader: R,
+ delimiter: Reader,
+
+ num_columns: usize,
+
+ num_rows: usize,
+ offsets: Vec<usize>,
+ data: Vec<u8>,
+}
+
+impl<R: BufRead> RecordReader<R> {
+ pub fn new(reader: R, delimiter: Reader, num_columns: usize) -> Self {
+ Self {
+ reader,
+ delimiter,
+ num_columns,
+ num_rows: 0,
+ offsets: vec![],
+ data: vec![],
+ }
+ }
+
+ fn fill_buf(&mut self, to_read: usize) -> Result<(), ArrowError> {
+ // Reserve sufficient capacity in offsets
+ self.offsets.resize(to_read * self.num_columns + 1, 0);
+ self.num_rows = 0;
+
+ if to_read == 0 {
+ return Ok(());
+ }
+
+ // The current offset into `self.data`
+ let mut output_offset = 0;
+ // The current offset into `input`
+ let mut input_offset = 0;
+ // The current offset into `self.offsets`
+ let mut field_offset = 1;
+ // The number of fields read for the current row
+ let mut field_count = 0;
+
+ 'outer: loop {
+ let input = self.reader.fill_buf()?;
+
+ 'input: loop {
+ // Reserve necessary space in output data based on best estimate
+ let remaining_rows = to_read - self.num_rows;
+ let capacity = remaining_rows * self.num_columns * AVERAGE_FIELD_SIZE;
+ let estimated_data = capacity.max(MIN_CAPACITY);
+ self.data.resize(output_offset + estimated_data, 0);
+
+ loop {
+ let (result, bytes_read, bytes_written, end_positions) =
+ self.delimiter.read_record(
+ &input[input_offset..],
+ &mut self.data[output_offset..],
+ &mut self.offsets[field_offset..],
+ );
+
+ field_count += end_positions;
+ field_offset += end_positions;
+ input_offset += bytes_read;
+ output_offset += bytes_written;
+
+ match result {
+ ReadRecordResult::End => break 'outer, // Reached end of file
+ ReadRecordResult::InputEmpty => break 'input, // Input exhausted, need to read more
+ ReadRecordResult::OutputFull => break, // Need to allocate more capacity
+ ReadRecordResult::OutputEndsFull => {
+ return Err(ArrowError::CsvError(format!("incorrect number of fields, expected {} got more than {}", self.num_columns, field_count)))
+ }
+ ReadRecordResult::Record => {
+ if field_count != self.num_columns {
+ return Err(ArrowError::CsvError(format!("incorrect number of fields, expected {} got {}", self.num_columns, field_count)))
+ }
+ self.num_rows += 1;
+ field_count = 0;
+
+ if self.num_rows == to_read {
+ break 'outer // Read sufficient rows
+ }
+
+ if input.len() == input_offset {
+ // Input exhausted, need to read more
+ // Without this read_record will interpret the empty input
+ // byte array as indicating the end of the file
+ break 'input
+ }
+ }
+ }
+ }
+ }
+ self.reader.consume(input_offset);
+ input_offset = 0;
+ }
+ self.reader.consume(input_offset);
+
+ // csv_core::Reader writes end offsets relative to the start of the row
+ // Therefore scan through and offset these based on the cumulative row offsets
+ let mut row_offset = 0;
+ self.offsets[1..]
+ .chunks_mut(self.num_columns)
+ .for_each(|row| {
+ let offset = row_offset;
+ row.iter_mut().for_each(|x| {
+ *x += offset;
+ row_offset = *x;
+ });
+ });
+
+ Ok(())
+ }
+
+ /// Skips forward `to_skip` rows
+ pub fn skip(&mut self, mut to_skip: usize) -> Result<(), ArrowError> {
+ // TODO: This could be done by scanning for unquoted newline delimiters
+ while to_skip != 0 {
+ self.fill_buf(to_skip.min(1024))?;
+ to_skip -= self.num_rows;
+ }
+ Ok(())
+ }
+
+ /// Reads up to `to_read` rows from the reader
+ pub fn read(&mut self, to_read: usize) -> Result<StringRecords<'_>, ArrowError> {
+ self.fill_buf(to_read)?;
+
+ // Need to slice fields to the actual number of rows read
+ //
+ // We intentionally avoid using `Vec::truncate` to avoid having
+ // to re-initialize the data again
+ let num_fields = self.num_rows * self.num_columns;
+ let last_offset = self.offsets[num_fields];
+
+ // Need to truncate data to the actual amount of data read
+ let data = std::str::from_utf8(&self.data[..last_offset]).map_err(|e| {
+ ArrowError::CsvError(format!("Encountered invalid UTF-8 data: {}", e))
+ })?;
+
+ Ok(StringRecords {
+ num_columns: self.num_columns,
+ num_rows: self.num_rows,
+ offsets: &self.offsets[..num_fields + 1],
+ data,
+ })
+ }
+}
+
+/// A collection of parsed, UTF-8 CSV records
+#[derive(Debug)]
+pub struct StringRecords<'a> {
+ num_columns: usize,
+ num_rows: usize,
+ offsets: &'a [usize],
+ data: &'a str,
+}
+
+impl<'a> StringRecords<'a> {
+ fn get(&self, index: usize) -> StringRecord<'a> {
+ let field_idx = index * self.num_columns;
+ StringRecord {
+ data: self.data,
+ offsets: &self.offsets[field_idx..field_idx + self.num_columns + 1],
+ }
+ }
+
+ pub fn len(&self) -> usize {
+ self.num_rows
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.num_rows == 0
+ }
+
+ pub fn iter(&self) -> impl Iterator<Item = StringRecord<'a>> + '_ {
+ (0..self.num_rows).map(|x| self.get(x))
+ }
+}
+
+/// A single parsed, UTF-8 CSV record
+#[derive(Debug, Clone, Copy)]
+pub struct StringRecord<'a> {
+ data: &'a str,
+ offsets: &'a [usize],
+}
+
+impl<'a> StringRecord<'a> {
+ pub fn get(&self, index: usize) -> &'a str {
+ let end = self.offsets[index + 1];
+ let start = self.offsets[index];
+
+ // SAFETY:
+ // Parsing produces offsets at valid byte boundaries
+ unsafe { self.data.get_unchecked(start..end) }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::reader::records::RecordReader;
+ use csv_core::Reader;
+ use std::io::Cursor;
+
+ #[test]
+ fn test_basic() {
+ let csv = [
+ "foo,bar,baz",
+ "a,b,c",
+ "12,3,5",
+ "\"asda\"\"asas\",\"sdffsnsd\", as",
+ ]
+ .join("\n");
+
+ let mut expected = vec![
+ vec!["foo", "bar", "baz"],
+ vec!["a", "b", "c"],
+ vec!["12", "3", "5"],
+ vec!["asda\"asas", "sdffsnsd", " as"],
+ ]
+ .into_iter();
+
+ let cursor = Cursor::new(csv.as_bytes());
+ let mut reader = RecordReader::new(cursor, Reader::new(), 3);
+
+ loop {
+ let b = reader.read(3).unwrap();
+ if b.is_empty() {
+ break;
+ }
+
+ b.iter().zip(&mut expected).for_each(|(record, expected)| {
+ let actual = (0..3)
+ .map(|field_idx| record.get(field_idx))
+ .collect::<Vec<_>>();
+ assert_eq!(actual, expected)
+ })
+ }
+ }
+}