You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2020/11/28 20:31:29 UTC
[arrow] branch master updated: ARROW-10747: [Rust]: CSV reader
optimization
This is an automated email from the ASF dual-hosted git repository.
nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 95a497f ARROW-10747: [Rust]: CSV reader optimization
95a497f is described below
commit 95a497f89351ef27b114f5efbb7b64954fbee961
Author: Heres, Daniel <da...@gmail.com>
AuthorDate: Sat Nov 28 22:30:37 2020 +0200
ARROW-10747: [Rust]: CSV reader optimization
this PR makes CSV reading (quite a bit) faster by reusing allocations, and doing things a bit more manually.
It removes usage of BufReader, which is done in rust-csv already and causes overhead.
The nytaxi (entire job, with reading 1 year csv) benchmark speeds up from ~4500ms to ~1900ms.
Loading the line item csv in memory for the tpch benchmark for goes from ~9800ms -> ~6000 ms.
I think a further optimization would be to stop using the `StringRecords` altogether (e.g. by using the underlying https://docs.rs/csv-core/0.1.10/csv_core/ library instead) but that could be a next step.
FYI @alamb @nevi-me @jorgecarleitao
Closes #8781 from Dandandan/csv_opt
Authored-by: Heres, Daniel <da...@gmail.com>
Signed-off-by: Neville Dipale <ne...@gmail.com>
---
rust/arrow/src/csv/reader.rs | 129 ++++++++++++++++-------------
rust/arrow/src/util/buffered_iterator.rs | 138 -------------------------------
rust/arrow/src/util/mod.rs | 1 -
rust/benchmarks/Cargo.toml | 2 +-
rust/benchmarks/src/bin/nyctaxi.rs | 2 +-
5 files changed, 75 insertions(+), 197 deletions(-)
diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs
index e38926d..d1e7ff9 100644
--- a/rust/arrow/src/csv/reader.rs
+++ b/rust/arrow/src/csv/reader.rs
@@ -40,25 +40,23 @@
//! let batch = csv.next().unwrap().unwrap();
//! ```
+use core::cmp::min;
use lazy_static::lazy_static;
use regex::{Regex, RegexBuilder};
+use std::collections::HashSet;
+use std::fmt;
use std::fs::File;
-use std::io::{BufReader, Read, Seek, SeekFrom};
+use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
-use std::{collections::HashSet, iter::Skip};
-use std::{fmt, iter::Take};
use csv as csv_crate;
+use crate::array::{ArrayRef, PrimitiveArray, StringBuilder};
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
-use crate::{
- array::{ArrayRef, PrimitiveArray, StringBuilder},
- util::buffered_iterator::Buffered,
-};
-use self::csv_crate::{Error, StringRecord, StringRecordsIntoIter};
+use self::csv_crate::{ByteRecord, StringRecord};
lazy_static! {
static ref DECIMAL_RE: Regex = Regex::new(r"^-?(\d+\.\d+)$").unwrap();
@@ -95,7 +93,7 @@ fn infer_field_schema(string: &str) -> DataType {
///
/// Return infered schema and number of records used for inference.
fn infer_file_schema<R: Read + Seek>(
- reader: &mut BufReader<R>,
+ reader: &mut R,
delimiter: u8,
max_read_records: Option<usize>,
has_header: bool,
@@ -131,11 +129,12 @@ fn infer_file_schema<R: Read + Seek>(
let mut records_count = 0;
let mut fields = vec![];
- for result in csv_reader
- .records()
- .take(max_read_records.unwrap_or(std::usize::MAX))
- {
- let record = result?;
+ let mut record = StringRecord::new();
+ let max_records = max_read_records.unwrap_or(usize::MAX);
+ while records_count < max_records {
+ if !csv_reader.read_record(&mut record)? {
+ break;
+ }
records_count += 1;
for i in 0..header_length {
@@ -201,7 +200,7 @@ pub fn infer_schema_from_files(
for fname in files.iter() {
let (schema, records_read) = infer_file_schema(
- &mut BufReader::new(File::open(fname)?),
+ &mut File::open(fname)?,
delimiter,
Some(records_to_read),
has_header,
@@ -229,10 +228,15 @@ pub struct Reader<R: Read> {
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<usize>>,
/// File reader
- record_iter:
- Buffered<Skip<Take<StringRecordsIntoIter<BufReader<R>>>>, StringRecord, Error>,
+ reader: csv_crate::Reader<R>,
/// Current line number
line_number: usize,
+ /// Maximum number of rows to read
+ end: usize,
+ /// Number of records per batch
+ batch_size: usize,
+ /// Vector that can hold the `StringRecord`s of the batches
+ batch_records: Vec<StringRecord>,
}
impl<R> fmt::Debug for Reader<R>
@@ -263,14 +267,8 @@ impl<R: Read> Reader<R> {
bounds: Bounds,
projection: Option<Vec<usize>>,
) -> Self {
- Self::from_buf_reader(
- BufReader::new(reader),
- schema,
- has_header,
- delimiter,
- batch_size,
- bounds,
- projection,
+ Self::from_reader(
+ reader, schema, has_header, delimiter, batch_size, bounds, projection,
)
}
@@ -289,12 +287,12 @@ impl<R: Read> Reader<R> {
}
}
- /// Create a new CsvReader from a `BufReader<R: Read>
+ /// Create a new CsvReader from a Reader
///
/// This constructor allows you more flexibility in what records are processed by the
/// csv reader.
- pub fn from_buf_reader(
- buf_reader: BufReader<R>,
+ pub fn from_reader(
+ reader: R,
schema: SchemaRef,
has_header: bool,
delimiter: Option<u8>,
@@ -309,27 +307,40 @@ impl<R: Read> Reader<R> {
reader_builder.delimiter(c);
}
- let csv_reader = reader_builder.from_reader(buf_reader);
- let record_iter = csv_reader.into_records();
+ let mut csv_reader = reader_builder.from_reader(reader);
let (start, end) = match bounds {
None => (0, usize::MAX),
Some((start, end)) => (start, end),
};
- // Create an iterator that:
- // * skips the first `start` items
- // * runs up to `end` items
- // * buffers `batch_size` items
+
+ // First we will skip `start` rows
// note that this skips by iteration. This is because in general it is not possible
// to seek in CSV. However, skiping still saves the burden of creating arrow arrays,
// which is a slow operation that scales with the number of columns
- let record_iter = Buffered::new(record_iter.take(end).skip(start), batch_size);
+
+ let mut record = ByteRecord::new();
+ // Skip first start items
+ for _ in 0..start {
+ let res = csv_reader.read_byte_record(&mut record);
+ if !res.unwrap_or(false) {
+ break;
+ }
+ }
+
+ // Initialize batch_records with StringRecords so they
+ // can be reused accross batches
+ let mut batch_records = Vec::with_capacity(batch_size);
+ batch_records.resize_with(batch_size, Default::default);
Self {
schema,
projection,
- record_iter,
+ reader: csv_reader,
line_number: if has_header { start + 1 } else { start },
+ batch_size,
+ end,
+ batch_records,
}
}
}
@@ -338,32 +349,39 @@ impl<R: Read> Iterator for Reader<R> {
type Item = Result<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
- let rows = match self.record_iter.next() {
- Some(Ok(r)) => r,
- Some(Err(e)) => {
- return Some(Err(ArrowError::ParseError(format!(
- "Error parsing line {}: {:?}",
- self.line_number + self.record_iter.n(),
- e
- ))));
+ let remaining = self.end - self.line_number;
+
+ let mut read_records = 0;
+ for i in 0..min(self.batch_size, remaining) {
+ match self.reader.read_record(&mut self.batch_records[i]) {
+ Ok(true) => {
+ read_records += 1;
+ }
+ Ok(false) => break,
+ Err(e) => {
+ return Some(Err(ArrowError::ParseError(format!(
+ "Error parsing line {}: {:?}",
+ self.line_number + i,
+ e
+ ))))
+ }
}
- None => return None,
- };
+ }
// return early if no data was loaded
- if rows.is_empty() {
+ if read_records == 0 {
return None;
}
// parse the batches into a RecordBatch
let result = parse(
- &rows,
+ &self.batch_records[..read_records],
&self.schema.fields(),
&self.projection,
self.line_number,
);
- self.line_number += rows.len();
+ self.line_number += read_records;
Some(result)
}
@@ -640,15 +658,14 @@ impl ReaderBuilder {
}
/// Create a new `Reader` from the `ReaderBuilder`
- pub fn build<R: Read + Seek>(self, reader: R) -> Result<Reader<R>> {
+ pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<Reader<R>> {
// check if schema should be inferred
- let mut buf_reader = BufReader::new(reader);
let delimiter = self.delimiter.unwrap_or(b',');
let schema = match self.schema {
Some(schema) => schema,
None => {
let (inferred_schema, _) = infer_file_schema(
- &mut buf_reader,
+ &mut reader,
delimiter,
self.max_records,
self.has_header,
@@ -657,8 +674,8 @@ impl ReaderBuilder {
Arc::new(inferred_schema)
}
};
- Ok(Reader::from_buf_reader(
- buf_reader,
+ Ok(Reader::from_reader(
+ reader,
schema,
self.has_header,
self.delimiter,
@@ -736,8 +753,8 @@ mod tests {
let both_files = file_with_headers
.chain(Cursor::new("\n".to_string()))
.chain(file_without_headers);
- let mut csv = Reader::from_buf_reader(
- BufReader::new(both_files),
+ let mut csv = Reader::from_reader(
+ both_files,
Arc::new(schema),
true,
None,
diff --git a/rust/arrow/src/util/buffered_iterator.rs b/rust/arrow/src/util/buffered_iterator.rs
deleted file mode 100644
index 5d42ee4..0000000
--- a/rust/arrow/src/util/buffered_iterator.rs
+++ /dev/null
@@ -1,138 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! [Buffered] is an iterator useful to build an [arrow::array::Array] and other
-//! containers that benefit from batching or chunking.
-
-use std::marker::PhantomData;
-
-/// An iterator that buffers results in a vector so that the iterator returns a vector of `size` items.
-/// The items must be a [std::result::Result] and if an error is returned, tha error is returned
-/// and the iterator continues.
-/// An invariant of this iterator is that every returned vector's size is at most the specified size.
-#[derive(Debug)]
-pub struct Buffered<I, T, R>
-where
- T: Clone,
- I: Iterator<Item = Result<T, R>>,
-{
- iter: I,
- size: usize,
- buffer: Vec<T>,
- phantom: PhantomData<R>,
-}
-
-impl<I, T, R> Buffered<I, T, R>
-where
- T: Clone,
- I: Iterator<Item = Result<T, R>>,
-{
- pub fn new(iter: I, size: usize) -> Self {
- Buffered {
- iter,
- size,
- buffer: Vec::with_capacity(size),
- phantom: PhantomData,
- }
- }
-
- /// returns the number of items buffered so far.
- /// Useful to extract the exact item where an error occurred
- #[inline]
- pub fn n(&self) -> usize {
- self.buffer.len()
- }
-}
-
-impl<I, T, R> Iterator for Buffered<I, T, R>
-where
- T: Clone,
- I: Iterator<Item = Result<T, R>>,
-{
- type Item = Result<Vec<T>, R>;
-
- fn next(&mut self) -> Option<Self::Item> {
- for _ in 0..(self.size - self.n()) {
- match self.iter.next() {
- Some(Ok(item)) => self.buffer.push(item),
- Some(Err(error)) => return Some(Err(error)),
- None => break,
- }
- }
- if self.buffer.is_empty() {
- None
- } else {
- let result = self.buffer.clone();
- self.buffer.clear();
- Some(Ok(result))
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[derive(Debug, PartialEq)]
- struct AError {}
-
- impl std::fmt::Display for AError {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "Bla")
- }
- }
- impl std::error::Error for AError {}
-
- #[test]
- fn test_basic() {
- let a: Vec<Result<i32, AError>> = vec![Ok(1), Ok(2), Ok(3)];
- let iter = a.into_iter();
- let mut iter = Buffered::new(iter, 2);
-
- assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
- assert_eq!(iter.next(), Some(Ok(vec![3])));
- assert_eq!(iter.next(), None);
- }
-
- #[test]
- fn test_error_first() {
- let a: Vec<Result<i32, AError>> =
- vec![Ok(1), Ok(2), Err(AError {}), Ok(4), Ok(5)];
- let iter = a.into_iter();
- let mut iter = Buffered::new(iter, 2);
-
- assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
- assert_eq!(iter.next(), Some(Err(AError {})));
- // 4 is here: it was not skipped on the previous
- assert_eq!(iter.n(), 0);
- assert_eq!(iter.next(), Some(Ok(vec![4, 5])));
- assert_eq!(iter.next(), None);
- }
-
- #[test]
- fn test_error_last() {
- let a: Vec<Result<i32, AError>> = vec![Ok(1), Err(AError {}), Ok(3), Ok(4)];
- let iter = a.into_iter();
- let mut iter = Buffered::new(iter, 2);
-
- assert_eq!(iter.next(), Some(Err(AError {})));
- assert_eq!(iter.n(), 1);
- assert_eq!(iter.next(), Some(Ok(vec![1, 3])));
- assert_eq!(iter.next(), Some(Ok(vec![4])));
- assert_eq!(iter.next(), None);
- }
-}
diff --git a/rust/arrow/src/util/mod.rs b/rust/arrow/src/util/mod.rs
index 053d132..0f95043 100644
--- a/rust/arrow/src/util/mod.rs
+++ b/rust/arrow/src/util/mod.rs
@@ -17,7 +17,6 @@
pub mod bit_chunk_iterator;
pub mod bit_util;
-pub mod buffered_iterator;
pub mod display;
pub mod integration_util;
#[cfg(feature = "prettyprint")]
diff --git a/rust/benchmarks/Cargo.toml b/rust/benchmarks/Cargo.toml
index 7d7b6c9..6017d88 100644
--- a/rust/benchmarks/Cargo.toml
+++ b/rust/benchmarks/Cargo.toml
@@ -31,4 +31,4 @@ parquet = { path = "../parquet" }
datafusion = { path = "../datafusion" }
structopt = { version = "0.3", default-features = false }
tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] }
-futures = "0.3"
+futures = "0.3"
\ No newline at end of file
diff --git a/rust/benchmarks/src/bin/nyctaxi.rs b/rust/benchmarks/src/bin/nyctaxi.rs
index 02a790b..1ffa684 100644
--- a/rust/benchmarks/src/bin/nyctaxi.rs
+++ b/rust/benchmarks/src/bin/nyctaxi.rs
@@ -92,7 +92,7 @@ async fn datafusion_sql_benchmarks(
debug: bool,
) -> Result<()> {
let mut queries = HashMap::new();
- queries.insert("fare_amt_by_passenger", "SELECT passenger_count, MIN(fare_amount), MIN(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count");
+ queries.insert("fare_amt_by_passenger", "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count");
for (name, sql) in &queries {
println!("Executing '{}'", name);
for i in 0..iterations {