You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/19 08:56:12 UTC

[arrow-rs] branch master updated: Add csv-core based reader (#3338) (#3365)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new c3444334c Add csv-core based reader (#3338) (#3365)
c3444334c is described below

commit c3444334c58b651206e7bac946e21e7d2ae48c59
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Dec 19 08:56:06 2022 +0000

    Add csv-core based reader (#3338) (#3365)
    
    * Add csv-core based reader (#3338)
    
    * More docs
---
 arrow-csv/Cargo.toml                       |   1 +
 arrow-csv/src/{reader.rs => reader/mod.rs} | 397 ++++++++++++++---------------
 arrow-csv/src/reader/records.rs            | 266 +++++++++++++++++++
 3 files changed, 458 insertions(+), 206 deletions(-)

diff --git a/arrow-csv/Cargo.toml b/arrow-csv/Cargo.toml
index 0a8a0bec7..d02e599b3 100644
--- a/arrow-csv/Cargo.toml
+++ b/arrow-csv/Cargo.toml
@@ -45,6 +45,7 @@ arrow-data = { version = "29.0.0", path = "../arrow-data" }
 arrow-schema = { version = "29.0.0", path = "../arrow-schema" }
 chrono = { version = "0.4.23", default-features = false, features = ["clock"] }
 csv = { version = "1.1", default-features = false }
+csv-core = { version = "0.1"}
 lazy_static = { version = "1.4", default-features = false }
 lexical-core = { version = "^0.8", default-features = false }
 regex = { version = "1.7.0", default-features = false, features = ["std", "unicode", "perf"] }
diff --git a/arrow-csv/src/reader.rs b/arrow-csv/src/reader/mod.rs
similarity index 90%
rename from arrow-csv/src/reader.rs
rename to arrow-csv/src/reader/mod.rs
index c69e1753b..877876b77 100644
--- a/arrow-csv/src/reader.rs
+++ b/arrow-csv/src/reader/mod.rs
@@ -40,13 +40,14 @@
 //! let batch = csv.next().unwrap().unwrap();
 //! ```
 
-use core::cmp::min;
+mod records;
+
 use lazy_static::lazy_static;
 use regex::{Regex, RegexSet};
 use std::collections::HashSet;
 use std::fmt;
 use std::fs::File;
-use std::io::{Read, Seek, SeekFrom};
+use std::io::{BufReader, Read, Seek, SeekFrom};
 use std::sync::Arc;
 
 use arrow_array::builder::Decimal128Builder;
@@ -56,8 +57,9 @@ use arrow_cast::parse::Parser;
 use arrow_schema::*;
 
 use crate::map_csv_error;
+use crate::reader::records::{RecordReader, StringRecords};
 use arrow_data::decimal::validate_decimal_precision;
-use csv::{ByteRecord, StringRecord};
+use csv::StringRecord;
 use std::ops::Neg;
 
 lazy_static! {
@@ -107,7 +109,7 @@ fn infer_field_schema(string: &str, datetime_re: Option<Regex>) -> DataType {
 /// This is a collection of options for csv reader when the builder pattern cannot be used
 /// and the parameters need to be passed around
 #[derive(Debug, Default, Clone)]
-pub struct ReaderOptions {
+struct ReaderOptions {
     has_header: bool,
     delimiter: Option<u8>,
     escape: Option<u8>,
@@ -177,11 +179,36 @@ pub fn infer_reader_schema<R: Read>(
     infer_reader_schema_with_csv_options(reader, roptions)
 }
 
+/// Creates a `csv::Reader`
+fn build_csv_reader<R: Read>(
+    reader: R,
+    has_header: bool,
+    delimiter: Option<u8>,
+    escape: Option<u8>,
+    quote: Option<u8>,
+    terminator: Option<u8>,
+) -> csv::Reader<R> {
+    let mut reader_builder = csv::ReaderBuilder::new();
+    reader_builder.has_headers(has_header);
+
+    if let Some(c) = delimiter {
+        reader_builder.delimiter(c);
+    }
+    reader_builder.escape(escape);
+    if let Some(c) = quote {
+        reader_builder.quote(c);
+    }
+    if let Some(t) = terminator {
+        reader_builder.terminator(csv::Terminator::Any(t));
+    }
+    reader_builder.from_reader(reader)
+}
+
 fn infer_reader_schema_with_csv_options<R: Read>(
     reader: R,
     roptions: ReaderOptions,
 ) -> Result<(Schema, usize), ArrowError> {
-    let mut csv_reader = Reader::build_csv_reader(
+    let mut csv_reader = build_csv_reader(
         reader,
         roptions.has_header,
         roptions.delimiter,
@@ -305,15 +332,15 @@ pub struct Reader<R: Read> {
     /// Optional projection for which columns to load (zero-based column indices)
     projection: Option<Vec<usize>>,
     /// File reader
-    reader: csv::Reader<R>,
+    reader: RecordReader<BufReader<R>>,
+    /// Rows to skip
+    to_skip: usize,
     /// Current line number
     line_number: usize,
-    /// Maximum number of rows to read
+    /// End line number
     end: usize,
     /// Number of records per batch
     batch_size: usize,
-    /// Vector that can hold the `StringRecord`s of the batches
-    batch_records: Vec<StringRecord>,
     /// datetime format used to parse datetime values, (format understood by chrono)
     ///
     /// For format refer to [chrono docs](https://docs.rs/chrono/0.4.19/chrono/format/strftime/index.html)
@@ -351,16 +378,23 @@ impl<R: Read> Reader<R> {
         projection: Option<Vec<usize>>,
         datetime_format: Option<String>,
     ) -> Self {
-        Self::from_reader(
-            reader,
-            schema,
-            has_header,
-            delimiter,
-            batch_size,
-            bounds,
-            projection,
-            datetime_format,
-        )
+        let mut builder = ReaderBuilder::new()
+            .has_header(has_header)
+            .with_batch_size(batch_size);
+
+        if let Some(delimiter) = delimiter {
+            builder = builder.with_delimiter(delimiter);
+        }
+        if let Some((start, end)) = bounds {
+            builder = builder.with_bounds(start, end);
+        }
+        if let Some(projection) = projection {
+            builder = builder.with_projection(projection)
+        }
+        if let Some(format) = datetime_format {
+            builder = builder.with_datetime_format(format)
+        }
+        builder.build_with_schema(reader, schema)
     }
 
     /// Returns the schema of the reader, useful for getting the schema without reading
@@ -383,6 +417,7 @@ impl<R: Read> Reader<R> {
     /// This constructor allows you more flexibility in what records are processed by the
     /// csv reader.
     #[allow(clippy::too_many_arguments)]
+    #[deprecated(note = "Use Reader::new or ReaderBuilder")]
     pub fn from_reader(
         reader: R,
         schema: SchemaRef,
@@ -393,142 +428,57 @@ impl<R: Read> Reader<R> {
         projection: Option<Vec<usize>>,
         datetime_format: Option<String>,
     ) -> Self {
-        let csv_reader =
-            Self::build_csv_reader(reader, has_header, delimiter, None, None, None);
-        Self::from_csv_reader(
-            csv_reader,
+        Self::new(
+            reader,
             schema,
             has_header,
+            delimiter,
             batch_size,
             bounds,
             projection,
             datetime_format,
         )
     }
-
-    fn build_csv_reader(
-        reader: R,
-        has_header: bool,
-        delimiter: Option<u8>,
-        escape: Option<u8>,
-        quote: Option<u8>,
-        terminator: Option<u8>,
-    ) -> csv::Reader<R> {
-        let mut reader_builder = csv::ReaderBuilder::new();
-        reader_builder.has_headers(has_header);
-
-        if let Some(c) = delimiter {
-            reader_builder.delimiter(c);
-        }
-        reader_builder.escape(escape);
-        if let Some(c) = quote {
-            reader_builder.quote(c);
-        }
-        if let Some(t) = terminator {
-            reader_builder.terminator(csv::Terminator::Any(t));
-        }
-        reader_builder.from_reader(reader)
-    }
-
-    fn from_csv_reader(
-        mut csv_reader: csv::Reader<R>,
-        schema: SchemaRef,
-        has_header: bool,
-        batch_size: usize,
-        bounds: Bounds,
-        projection: Option<Vec<usize>>,
-        datetime_format: Option<String>,
-    ) -> Self {
-        let (start, end) = match bounds {
-            None => (0, usize::MAX),
-            Some((start, end)) => (start, end),
-        };
-
-        // First we will skip `start` rows
-        // note that this skips by iteration. This is because in general it is not possible
-        // to seek in CSV. However, skipping still saves the burden of creating arrow arrays,
-        // which is a slow operation that scales with the number of columns
-
-        let mut record = ByteRecord::new();
-        // Skip first start items
-        for _ in 0..start {
-            let res = csv_reader.read_byte_record(&mut record);
-            if !res.unwrap_or(false) {
-                break;
-            }
-        }
-
-        // Initialize batch_records with StringRecords so they
-        // can be reused across batches
-        let mut batch_records = Vec::with_capacity(batch_size);
-        batch_records.resize_with(batch_size, Default::default);
-
-        Self {
-            schema,
-            projection,
-            reader: csv_reader,
-            line_number: if has_header { start + 1 } else { start },
-            batch_size,
-            end,
-            batch_records,
-            datetime_format,
-        }
-    }
 }
 
 impl<R: Read> Iterator for Reader<R> {
     type Item = Result<RecordBatch, ArrowError>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        let remaining = self.end - self.line_number;
-
-        let mut read_records = 0;
-        for i in 0..min(self.batch_size, remaining) {
-            match self.reader.read_record(&mut self.batch_records[i]) {
-                Ok(true) => {
-                    read_records += 1;
-                }
-                Ok(false) => break,
-                Err(e) => {
-                    return Some(Err(ArrowError::ParseError(format!(
-                        "Error parsing line {}: {:?}",
-                        self.line_number + i,
-                        e
-                    ))));
-                }
+        if self.to_skip != 0 {
+            if let Err(e) = self.reader.skip(std::mem::take(&mut self.to_skip)) {
+                return Some(Err(e));
             }
         }
 
-        // return early if no data was loaded
-        if read_records == 0 {
-            return None;
-        }
+        let remaining = self.end - self.line_number;
+        let to_read = self.batch_size.min(remaining);
 
-        let format: Option<&str> = match self.datetime_format {
-            Some(ref format) => Some(format.as_ref()),
-            _ => None,
+        let batch = match self.reader.read(to_read) {
+            Ok(b) if b.is_empty() => return None,
+            Ok(b) => b,
+            Err(e) => return Some(Err(e)),
         };
 
         // parse the batches into a RecordBatch
         let result = parse(
-            &self.batch_records[..read_records],
+            &batch,
             self.schema.fields(),
             Some(self.schema.metadata.clone()),
             self.projection.as_ref(),
             self.line_number,
-            format,
+            self.datetime_format.as_deref(),
         );
 
-        self.line_number += read_records;
+        self.line_number += batch.len();
 
         Some(result)
     }
 }
 
-/// parses a slice of [csv::StringRecord] into a
-/// [RecordBatch]
+/// Parses a slice of [`StringRecords`] into a [RecordBatch]
 fn parse(
-    rows: &[StringRecord],
+    rows: &StringRecords<'_>,
     fields: &[Field],
     metadata: Option<std::collections::HashMap<String, String>>,
     projection: Option<&Vec<usize>>,
@@ -624,7 +574,9 @@ fn parse(
                     )
                 }
                 DataType::Utf8 => Ok(Arc::new(
-                    rows.iter().map(|row| row.get(i)).collect::<StringArray>(),
+                    rows.iter()
+                        .map(|row| Some(row.get(i)))
+                        .collect::<StringArray>(),
                 ) as ArrayRef),
                 DataType::Dictionary(key_type, value_type)
                     if value_type.as_ref() == &DataType::Utf8 =>
@@ -723,34 +675,26 @@ fn parse_bool(string: &str) -> Option<bool> {
 // parse the column string to an Arrow Array
 fn build_decimal_array(
     _line_number: usize,
-    rows: &[StringRecord],
+    rows: &StringRecords<'_>,
     col_idx: usize,
     precision: u8,
     scale: i8,
 ) -> Result<ArrayRef, ArrowError> {
     let mut decimal_builder = Decimal128Builder::with_capacity(rows.len());
-    for row in rows {
-        let col_s = row.get(col_idx);
-        match col_s {
-            None => {
-                // No data for this row
-                decimal_builder.append_null();
-            }
-            Some(s) => {
-                if s.is_empty() {
-                    // append null
-                    decimal_builder.append_null();
-                } else {
-                    let decimal_value: Result<i128, _> =
-                        parse_decimal_with_parameter(s, precision, scale);
-                    match decimal_value {
-                        Ok(v) => {
-                            decimal_builder.append_value(v);
-                        }
-                        Err(e) => {
-                            return Err(e);
-                        }
-                    }
+    for row in rows.iter() {
+        let s = row.get(col_idx);
+        if s.is_empty() {
+            // append null
+            decimal_builder.append_null();
+        } else {
+            let decimal_value: Result<i128, _> =
+                parse_decimal_with_parameter(s, precision, scale);
+            match decimal_value {
+                Ok(v) => {
+                    decimal_builder.append_value(v);
+                }
+                Err(e) => {
+                    return Err(e);
                 }
             }
         }
@@ -878,35 +822,31 @@ fn parse_decimal(s: &str) -> Result<i128, ArrowError> {
 // parses a specific column (col_idx) into an Arrow Array.
 fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
     line_number: usize,
-    rows: &[StringRecord],
+    rows: &StringRecords<'_>,
     col_idx: usize,
     format: Option<&str>,
 ) -> Result<ArrayRef, ArrowError> {
     rows.iter()
         .enumerate()
         .map(|(row_index, row)| {
-            match row.get(col_idx) {
-                Some(s) => {
-                    if s.is_empty() {
-                        return Ok(None);
-                    }
+            let s = row.get(col_idx);
+            if s.is_empty() {
+                return Ok(None);
+            }
 
-                    let parsed = match format {
-                        Some(format) => parse_formatted::<T>(s, format),
-                        _ => parse_item::<T>(s),
-                    };
-                    match parsed {
-                        Some(e) => Ok(Some(e)),
-                        None => Err(ArrowError::ParseError(format!(
-                            // TODO: we should surface the underlying error here.
-                            "Error while parsing value {} for column {} at line {}",
-                            s,
-                            col_idx,
-                            line_number + row_index
-                        ))),
-                    }
-                }
-                None => Ok(None),
+            let parsed = match format {
+                Some(format) => parse_formatted::<T>(s, format),
+                _ => parse_item::<T>(s),
+            };
+            match parsed {
+                Some(e) => Ok(Some(e)),
+                None => Err(ArrowError::ParseError(format!(
+                    // TODO: we should surface the underlying error here.
+                    "Error while parsing value {} for column {} at line {}",
+                    s,
+                    col_idx,
+                    line_number + row_index
+                ))),
             }
         })
         .collect::<Result<PrimitiveArray<T>, ArrowError>>()
@@ -916,31 +856,23 @@ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
 // parses a specific column (col_idx) into an Arrow Array.
 fn build_boolean_array(
     line_number: usize,
-    rows: &[StringRecord],
+    rows: &StringRecords<'_>,
     col_idx: usize,
 ) -> Result<ArrayRef, ArrowError> {
     rows.iter()
         .enumerate()
         .map(|(row_index, row)| {
-            match row.get(col_idx) {
-                Some(s) => {
-                    if s.is_empty() {
-                        return Ok(None);
-                    }
-
-                    let parsed = parse_bool(s);
-                    match parsed {
-                        Some(e) => Ok(Some(e)),
-                        None => Err(ArrowError::ParseError(format!(
-                            // TODO: we should surface the underlying error here.
-                            "Error while parsing value {} for column {} at line {}",
-                            s,
-                            col_idx,
-                            line_number + row_index
-                        ))),
-                    }
-                }
-                None => Ok(None),
+            let s = row.get(col_idx);
+            let parsed = parse_bool(s);
+            match parsed {
+                Some(e) => Ok(Some(e)),
+                None => Err(ArrowError::ParseError(format!(
+                    // TODO: we should surface the underlying error here.
+                    "Error while parsing value {} for column {} at line {}",
+                    s,
+                    col_idx,
+                    line_number + row_index
+                ))),
             }
         })
         .collect::<Result<BooleanArray, _>>()
@@ -1109,10 +1041,13 @@ impl ReaderBuilder {
     }
 
     /// Create a new `Reader` from the `ReaderBuilder`
-    pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<Reader<R>, ArrowError> {
+    pub fn build<R: Read + Seek>(
+        mut self,
+        mut reader: R,
+    ) -> Result<Reader<R>, ArrowError> {
         // check if schema should be inferred
         let delimiter = self.delimiter.unwrap_or(b',');
-        let schema = match self.schema {
+        let schema = match self.schema.take() {
             Some(schema) => schema,
             None => {
                 let roptions = ReaderOptions {
@@ -1122,7 +1057,7 @@ impl ReaderBuilder {
                     escape: self.escape,
                     quote: self.quote,
                     terminator: self.terminator,
-                    datetime_re: self.datetime_re,
+                    datetime_re: self.datetime_re.take(),
                 };
                 let (inferred_schema, _) =
                     infer_file_schema_with_csv_options(&mut reader, roptions)?;
@@ -1130,23 +1065,45 @@ impl ReaderBuilder {
                 Arc::new(inferred_schema)
             }
         };
-        let csv_reader = Reader::build_csv_reader(
-            reader,
-            self.has_header,
-            self.delimiter,
-            self.escape,
-            self.quote,
-            self.terminator,
+        Ok(self.build_with_schema(reader, schema))
+    }
+
+    fn build_with_schema<R: Read>(self, reader: R, schema: SchemaRef) -> Reader<R> {
+        let mut reader_builder = csv_core::ReaderBuilder::new();
+        reader_builder.escape(self.escape);
+
+        if let Some(c) = self.delimiter {
+            reader_builder.delimiter(c);
+        }
+        if let Some(c) = self.quote {
+            reader_builder.quote(c);
+        }
+        if let Some(t) = self.terminator {
+            reader_builder.terminator(csv_core::Terminator::Any(t));
+        }
+        let reader = RecordReader::new(
+            BufReader::new(reader),
+            reader_builder.build(),
+            schema.fields().len(),
         );
-        Ok(Reader::from_csv_reader(
-            csv_reader,
+
+        let header = self.has_header as usize;
+
+        let (start, end) = match self.bounds {
+            Some((start, end)) => (start + header, end + header),
+            None => (header, usize::MAX),
+        };
+
+        Reader {
             schema,
-            self.has_header,
-            self.batch_size,
-            self.bounds,
-            self.projection.clone(),
-            self.datetime_format,
-        ))
+            projection: self.projection,
+            reader,
+            to_skip: start,
+            line_number: start,
+            end,
+            batch_size: self.batch_size,
+            datetime_format: self.datetime_format,
+        }
     }
 }
 
@@ -1285,7 +1242,7 @@ mod tests {
         let both_files = file_with_headers
             .chain(Cursor::new("\n".to_string()))
             .chain(file_without_headers);
-        let mut csv = Reader::from_reader(
+        let mut csv = Reader::new(
             both_files,
             Arc::new(schema),
             true,
@@ -1480,6 +1437,7 @@ mod tests {
             Field::new("c_int", DataType::UInt64, false),
             Field::new("c_float", DataType::Float32, true),
             Field::new("c_string", DataType::Utf8, false),
+            Field::new("c_bool", DataType::Boolean, false),
         ]);
 
         let file = File::open("test/data/null_test.csv").unwrap();
@@ -2074,4 +2032,31 @@ mod tests {
         let col1_arr = col1.as_any().downcast_ref::<StringArray>().unwrap();
         assert_eq!(col1_arr.value(5), "value5");
     }
+
+    #[test]
+    fn test_header_bounds() {
+        let csv = "a,b\na,b\na,b\na,b\na,b\n";
+        let tests = [
+            (None, false, 5),
+            (None, true, 4),
+            (Some((0, 4)), false, 4),
+            (Some((1, 4)), false, 3),
+            (Some((0, 4)), true, 4),
+            (Some((1, 4)), true, 3),
+        ];
+
+        for (idx, (bounds, has_header, expected)) in tests.into_iter().enumerate() {
+            let mut reader = ReaderBuilder::new().has_header(has_header);
+            if let Some((start, end)) = bounds {
+                reader = reader.with_bounds(start, end);
+            }
+            let b = reader
+                .build(Cursor::new(csv.as_bytes()))
+                .unwrap()
+                .next()
+                .unwrap()
+                .unwrap();
+            assert_eq!(b.num_rows(), expected, "{}", idx);
+        }
+    }
 }
diff --git a/arrow-csv/src/reader/records.rs b/arrow-csv/src/reader/records.rs
new file mode 100644
index 000000000..711baa152
--- /dev/null
+++ b/arrow-csv/src/reader/records.rs
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_schema::ArrowError;
+use csv_core::{ReadRecordResult, Reader};
+use std::io::BufRead;
+
+/// The estimated length of a field in bytes
+const AVERAGE_FIELD_SIZE: usize = 8;
+
+/// The minimum amount of data in a single read
+const MIN_CAPACITY: usize = 1024;
+
+pub struct RecordReader<R> {
+    reader: R,
+    delimiter: Reader,
+
+    num_columns: usize,
+
+    num_rows: usize,
+    offsets: Vec<usize>,
+    data: Vec<u8>,
+}
+
+impl<R: BufRead> RecordReader<R> {
+    pub fn new(reader: R, delimiter: Reader, num_columns: usize) -> Self {
+        Self {
+            reader,
+            delimiter,
+            num_columns,
+            num_rows: 0,
+            offsets: vec![],
+            data: vec![],
+        }
+    }
+
+    fn fill_buf(&mut self, to_read: usize) -> Result<(), ArrowError> {
+        // Reserve sufficient capacity in offsets
+        self.offsets.resize(to_read * self.num_columns + 1, 0);
+        self.num_rows = 0;
+
+        if to_read == 0 {
+            return Ok(());
+        }
+
+        // The current offset into `self.data`
+        let mut output_offset = 0;
+        // The current offset into `input`
+        let mut input_offset = 0;
+        // The current offset into `self.offsets`
+        let mut field_offset = 1;
+        // The number of fields read for the current row
+        let mut field_count = 0;
+
+        'outer: loop {
+            let input = self.reader.fill_buf()?;
+
+            'input: loop {
+                // Reserve necessary space in output data based on best estimate
+                let remaining_rows = to_read - self.num_rows;
+                let capacity = remaining_rows * self.num_columns * AVERAGE_FIELD_SIZE;
+                let estimated_data = capacity.max(MIN_CAPACITY);
+                self.data.resize(output_offset + estimated_data, 0);
+
+                loop {
+                    let (result, bytes_read, bytes_written, end_positions) =
+                        self.delimiter.read_record(
+                            &input[input_offset..],
+                            &mut self.data[output_offset..],
+                            &mut self.offsets[field_offset..],
+                        );
+
+                    field_count += end_positions;
+                    field_offset += end_positions;
+                    input_offset += bytes_read;
+                    output_offset += bytes_written;
+
+                    match result {
+                        ReadRecordResult::End => break 'outer, // Reached end of file
+                        ReadRecordResult::InputEmpty => break 'input, // Input exhausted, need to read more
+                        ReadRecordResult::OutputFull => break, // Need to allocate more capacity
+                        ReadRecordResult::OutputEndsFull => {
+                            return Err(ArrowError::CsvError(format!("incorrect number of fields, expected {} got more than {}", self.num_columns, field_count)))
+                        }
+                        ReadRecordResult::Record => {
+                            if field_count != self.num_columns {
+                                return Err(ArrowError::CsvError(format!("incorrect number of fields, expected {} got {}", self.num_columns, field_count)))
+                            }
+                            self.num_rows += 1;
+                            field_count = 0;
+
+                            if self.num_rows == to_read {
+                                break 'outer // Read sufficient rows
+                            }
+
+                            if input.len() == input_offset {
+                                // Input exhausted, need to read more
+                                // Without this read_record will interpret the empty input
+                                // byte array as indicating the end of the file
+                                break 'input
+                            }
+                        }
+                    }
+                }
+            }
+            self.reader.consume(input_offset);
+            input_offset = 0;
+        }
+        self.reader.consume(input_offset);
+
+        // csv_core::Reader writes end offsets relative to the start of the row
+        // Therefore scan through and offset these based on the cumulative row offsets
+        let mut row_offset = 0;
+        self.offsets[1..]
+            .chunks_mut(self.num_columns)
+            .for_each(|row| {
+                let offset = row_offset;
+                row.iter_mut().for_each(|x| {
+                    *x += offset;
+                    row_offset = *x;
+                });
+            });
+
+        Ok(())
+    }
+
+    /// Skips forward `to_skip` rows
+    pub fn skip(&mut self, mut to_skip: usize) -> Result<(), ArrowError> {
+        // TODO: This could be done by scanning for unquoted newline delimiters
+        while to_skip != 0 {
+            self.fill_buf(to_skip.min(1024))?;
+            to_skip -= self.num_rows;
+        }
+        Ok(())
+    }
+
+    /// Reads up to `to_read` rows from the reader
+    pub fn read(&mut self, to_read: usize) -> Result<StringRecords<'_>, ArrowError> {
+        self.fill_buf(to_read)?;
+
+        // Need to slice fields to the actual number of rows read
+        //
+        // We intentionally avoid using `Vec::truncate` to avoid having
+        // to re-initialize the data again
+        let num_fields = self.num_rows * self.num_columns;
+        let last_offset = self.offsets[num_fields];
+
+        // Need to truncate data to the actual amount of data read
+        let data = std::str::from_utf8(&self.data[..last_offset]).map_err(|e| {
+            ArrowError::CsvError(format!("Encountered invalid UTF-8 data: {}", e))
+        })?;
+
+        Ok(StringRecords {
+            num_columns: self.num_columns,
+            num_rows: self.num_rows,
+            offsets: &self.offsets[..num_fields + 1],
+            data,
+        })
+    }
+}
+
+/// A collection of parsed, UTF-8 CSV records
+#[derive(Debug)]
+pub struct StringRecords<'a> {
+    num_columns: usize,
+    num_rows: usize,
+    offsets: &'a [usize],
+    data: &'a str,
+}
+
+impl<'a> StringRecords<'a> {
+    fn get(&self, index: usize) -> StringRecord<'a> {
+        let field_idx = index * self.num_columns;
+        StringRecord {
+            data: self.data,
+            offsets: &self.offsets[field_idx..field_idx + self.num_columns + 1],
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.num_rows
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.num_rows == 0
+    }
+
+    pub fn iter(&self) -> impl Iterator<Item = StringRecord<'a>> + '_ {
+        (0..self.num_rows).map(|x| self.get(x))
+    }
+}
+
+/// A single parsed, UTF-8 CSV record
+#[derive(Debug, Clone, Copy)]
+pub struct StringRecord<'a> {
+    data: &'a str,
+    offsets: &'a [usize],
+}
+
+impl<'a> StringRecord<'a> {
+    pub fn get(&self, index: usize) -> &'a str {
+        let end = self.offsets[index + 1];
+        let start = self.offsets[index];
+
+        // SAFETY:
+        // Parsing produces offsets at valid byte boundaries
+        unsafe { self.data.get_unchecked(start..end) }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::reader::records::RecordReader;
+    use csv_core::Reader;
+    use std::io::Cursor;
+
+    #[test]
+    fn test_basic() {
+        let csv = [
+            "foo,bar,baz",
+            "a,b,c",
+            "12,3,5",
+            "\"asda\"\"asas\",\"sdffsnsd\", as",
+        ]
+        .join("\n");
+
+        let mut expected = vec![
+            vec!["foo", "bar", "baz"],
+            vec!["a", "b", "c"],
+            vec!["12", "3", "5"],
+            vec!["asda\"asas", "sdffsnsd", " as"],
+        ]
+        .into_iter();
+
+        let cursor = Cursor::new(csv.as_bytes());
+        let mut reader = RecordReader::new(cursor, Reader::new(), 3);
+
+        loop {
+            let b = reader.read(3).unwrap();
+            if b.is_empty() {
+                break;
+            }
+
+            b.iter().zip(&mut expected).for_each(|(record, expected)| {
+                let actual = (0..3)
+                    .map(|field_idx| record.get(field_idx))
+                    .collect::<Vec<_>>();
+                assert_eq!(actual, expected)
+            })
+        }
+    }
+}