You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2020/11/28 20:31:29 UTC

[arrow] branch master updated: ARROW-10747: [Rust]: CSV reader optimization

This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 95a497f  ARROW-10747: [Rust]: CSV reader optimization
95a497f is described below

commit 95a497f89351ef27b114f5efbb7b64954fbee961
Author: Heres, Daniel <da...@gmail.com>
AuthorDate: Sat Nov 28 22:30:37 2020 +0200

    ARROW-10747: [Rust]: CSV reader optimization
    
    this PR makes CSV reading (quite a bit) faster by reusing allocations, and doing things a bit more manually.
    It removes usage of BufReader, which is done in rust-csv already and causes overhead.
    
    The nytaxi (entire job, with reading 1 year csv) benchmark speeds up from ~4500ms to ~1900ms.
    Loading the line item csv in memory for the tpch benchmark for goes from ~9800ms -> ~6000 ms.
    
    I think a further optimization would be to stop using the `StringRecords` altogether (e.g. by using the underlying https://docs.rs/csv-core/0.1.10/csv_core/ library instead) but that could be a next step.
    
    FYI @alamb @nevi-me @jorgecarleitao
    
    Closes #8781 from Dandandan/csv_opt
    
    Authored-by: Heres, Daniel <da...@gmail.com>
    Signed-off-by: Neville Dipale <ne...@gmail.com>
---
 rust/arrow/src/csv/reader.rs             | 129 ++++++++++++++++-------------
 rust/arrow/src/util/buffered_iterator.rs | 138 -------------------------------
 rust/arrow/src/util/mod.rs               |   1 -
 rust/benchmarks/Cargo.toml               |   2 +-
 rust/benchmarks/src/bin/nyctaxi.rs       |   2 +-
 5 files changed, 75 insertions(+), 197 deletions(-)

diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs
index e38926d..d1e7ff9 100644
--- a/rust/arrow/src/csv/reader.rs
+++ b/rust/arrow/src/csv/reader.rs
@@ -40,25 +40,23 @@
 //! let batch = csv.next().unwrap().unwrap();
 //! ```
 
+use core::cmp::min;
 use lazy_static::lazy_static;
 use regex::{Regex, RegexBuilder};
+use std::collections::HashSet;
+use std::fmt;
 use std::fs::File;
-use std::io::{BufReader, Read, Seek, SeekFrom};
+use std::io::{Read, Seek, SeekFrom};
 use std::sync::Arc;
-use std::{collections::HashSet, iter::Skip};
-use std::{fmt, iter::Take};
 
 use csv as csv_crate;
 
+use crate::array::{ArrayRef, PrimitiveArray, StringBuilder};
 use crate::datatypes::*;
 use crate::error::{ArrowError, Result};
 use crate::record_batch::RecordBatch;
-use crate::{
-    array::{ArrayRef, PrimitiveArray, StringBuilder},
-    util::buffered_iterator::Buffered,
-};
 
-use self::csv_crate::{Error, StringRecord, StringRecordsIntoIter};
+use self::csv_crate::{ByteRecord, StringRecord};
 
 lazy_static! {
     static ref DECIMAL_RE: Regex = Regex::new(r"^-?(\d+\.\d+)$").unwrap();
@@ -95,7 +93,7 @@ fn infer_field_schema(string: &str) -> DataType {
 ///
 /// Return infered schema and number of records used for inference.
 fn infer_file_schema<R: Read + Seek>(
-    reader: &mut BufReader<R>,
+    reader: &mut R,
     delimiter: u8,
     max_read_records: Option<usize>,
     has_header: bool,
@@ -131,11 +129,12 @@ fn infer_file_schema<R: Read + Seek>(
     let mut records_count = 0;
     let mut fields = vec![];
 
-    for result in csv_reader
-        .records()
-        .take(max_read_records.unwrap_or(std::usize::MAX))
-    {
-        let record = result?;
+    let mut record = StringRecord::new();
+    let max_records = max_read_records.unwrap_or(usize::MAX);
+    while records_count < max_records {
+        if !csv_reader.read_record(&mut record)? {
+            break;
+        }
         records_count += 1;
 
         for i in 0..header_length {
@@ -201,7 +200,7 @@ pub fn infer_schema_from_files(
 
     for fname in files.iter() {
         let (schema, records_read) = infer_file_schema(
-            &mut BufReader::new(File::open(fname)?),
+            &mut File::open(fname)?,
             delimiter,
             Some(records_to_read),
             has_header,
@@ -229,10 +228,15 @@ pub struct Reader<R: Read> {
     /// Optional projection for which columns to load (zero-based column indices)
     projection: Option<Vec<usize>>,
     /// File reader
-    record_iter:
-        Buffered<Skip<Take<StringRecordsIntoIter<BufReader<R>>>>, StringRecord, Error>,
+    reader: csv_crate::Reader<R>,
     /// Current line number
     line_number: usize,
+    /// Maximum number of rows to read
+    end: usize,
+    /// Number of records per batch
+    batch_size: usize,
+    /// Vector that can hold the `StringRecord`s of the batches
+    batch_records: Vec<StringRecord>,
 }
 
 impl<R> fmt::Debug for Reader<R>
@@ -263,14 +267,8 @@ impl<R: Read> Reader<R> {
         bounds: Bounds,
         projection: Option<Vec<usize>>,
     ) -> Self {
-        Self::from_buf_reader(
-            BufReader::new(reader),
-            schema,
-            has_header,
-            delimiter,
-            batch_size,
-            bounds,
-            projection,
+        Self::from_reader(
+            reader, schema, has_header, delimiter, batch_size, bounds, projection,
         )
     }
 
@@ -289,12 +287,12 @@ impl<R: Read> Reader<R> {
         }
     }
 
-    /// Create a new CsvReader from a `BufReader<R: Read>
+    /// Create a new CsvReader from a Reader
     ///
     /// This constructor allows you more flexibility in what records are processed by the
     /// csv reader.
-    pub fn from_buf_reader(
-        buf_reader: BufReader<R>,
+    pub fn from_reader(
+        reader: R,
         schema: SchemaRef,
         has_header: bool,
         delimiter: Option<u8>,
@@ -309,27 +307,40 @@ impl<R: Read> Reader<R> {
             reader_builder.delimiter(c);
         }
 
-        let csv_reader = reader_builder.from_reader(buf_reader);
-        let record_iter = csv_reader.into_records();
+        let mut csv_reader = reader_builder.from_reader(reader);
 
         let (start, end) = match bounds {
             None => (0, usize::MAX),
             Some((start, end)) => (start, end),
         };
-        // Create an iterator that:
-        // * skips the first `start` items
-        // * runs up to `end` items
-        // * buffers `batch_size` items
+
+        // First we will skip `start` rows
         // note that this skips by iteration. This is because in general it is not possible
         // to seek in CSV. However, skiping still saves the burden of creating arrow arrays,
         // which is a slow operation that scales with the number of columns
-        let record_iter = Buffered::new(record_iter.take(end).skip(start), batch_size);
+
+        let mut record = ByteRecord::new();
+        // Skip first start items
+        for _ in 0..start {
+            let res = csv_reader.read_byte_record(&mut record);
+            if !res.unwrap_or(false) {
+                break;
+            }
+        }
+
+        // Initialize batch_records with StringRecords so they
+        // can be reused accross batches
+        let mut batch_records = Vec::with_capacity(batch_size);
+        batch_records.resize_with(batch_size, Default::default);
 
         Self {
             schema,
             projection,
-            record_iter,
+            reader: csv_reader,
             line_number: if has_header { start + 1 } else { start },
+            batch_size,
+            end,
+            batch_records,
         }
     }
 }
@@ -338,32 +349,39 @@ impl<R: Read> Iterator for Reader<R> {
     type Item = Result<RecordBatch>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        let rows = match self.record_iter.next() {
-            Some(Ok(r)) => r,
-            Some(Err(e)) => {
-                return Some(Err(ArrowError::ParseError(format!(
-                    "Error parsing line {}: {:?}",
-                    self.line_number + self.record_iter.n(),
-                    e
-                ))));
+        let remaining = self.end - self.line_number;
+
+        let mut read_records = 0;
+        for i in 0..min(self.batch_size, remaining) {
+            match self.reader.read_record(&mut self.batch_records[i]) {
+                Ok(true) => {
+                    read_records += 1;
+                }
+                Ok(false) => break,
+                Err(e) => {
+                    return Some(Err(ArrowError::ParseError(format!(
+                        "Error parsing line {}: {:?}",
+                        self.line_number + i,
+                        e
+                    ))))
+                }
             }
-            None => return None,
-        };
+        }
 
         // return early if no data was loaded
-        if rows.is_empty() {
+        if read_records == 0 {
             return None;
         }
 
         // parse the batches into a RecordBatch
         let result = parse(
-            &rows,
+            &self.batch_records[..read_records],
             &self.schema.fields(),
             &self.projection,
             self.line_number,
         );
 
-        self.line_number += rows.len();
+        self.line_number += read_records;
 
         Some(result)
     }
@@ -640,15 +658,14 @@ impl ReaderBuilder {
     }
 
     /// Create a new `Reader` from the `ReaderBuilder`
-    pub fn build<R: Read + Seek>(self, reader: R) -> Result<Reader<R>> {
+    pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<Reader<R>> {
         // check if schema should be inferred
-        let mut buf_reader = BufReader::new(reader);
         let delimiter = self.delimiter.unwrap_or(b',');
         let schema = match self.schema {
             Some(schema) => schema,
             None => {
                 let (inferred_schema, _) = infer_file_schema(
-                    &mut buf_reader,
+                    &mut reader,
                     delimiter,
                     self.max_records,
                     self.has_header,
@@ -657,8 +674,8 @@ impl ReaderBuilder {
                 Arc::new(inferred_schema)
             }
         };
-        Ok(Reader::from_buf_reader(
-            buf_reader,
+        Ok(Reader::from_reader(
+            reader,
             schema,
             self.has_header,
             self.delimiter,
@@ -736,8 +753,8 @@ mod tests {
         let both_files = file_with_headers
             .chain(Cursor::new("\n".to_string()))
             .chain(file_without_headers);
-        let mut csv = Reader::from_buf_reader(
-            BufReader::new(both_files),
+        let mut csv = Reader::from_reader(
+            both_files,
             Arc::new(schema),
             true,
             None,
diff --git a/rust/arrow/src/util/buffered_iterator.rs b/rust/arrow/src/util/buffered_iterator.rs
deleted file mode 100644
index 5d42ee4..0000000
--- a/rust/arrow/src/util/buffered_iterator.rs
+++ /dev/null
@@ -1,138 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! [Buffered] is an iterator useful to build an [arrow::array::Array] and other
-//! containers that benefit from batching or chunking.
-
-use std::marker::PhantomData;
-
-/// An iterator that buffers results in a vector so that the iterator returns a vector of `size` items.
-/// The items must be a [std::result::Result] and if an error is returned, tha error is returned
-/// and the iterator continues.
-/// An invariant of this iterator is that every returned vector's size is at most the specified size.
-#[derive(Debug)]
-pub struct Buffered<I, T, R>
-where
-    T: Clone,
-    I: Iterator<Item = Result<T, R>>,
-{
-    iter: I,
-    size: usize,
-    buffer: Vec<T>,
-    phantom: PhantomData<R>,
-}
-
-impl<I, T, R> Buffered<I, T, R>
-where
-    T: Clone,
-    I: Iterator<Item = Result<T, R>>,
-{
-    pub fn new(iter: I, size: usize) -> Self {
-        Buffered {
-            iter,
-            size,
-            buffer: Vec::with_capacity(size),
-            phantom: PhantomData,
-        }
-    }
-
-    /// returns the number of items buffered so far.
-    /// Useful to extract the exact item where an error occurred
-    #[inline]
-    pub fn n(&self) -> usize {
-        self.buffer.len()
-    }
-}
-
-impl<I, T, R> Iterator for Buffered<I, T, R>
-where
-    T: Clone,
-    I: Iterator<Item = Result<T, R>>,
-{
-    type Item = Result<Vec<T>, R>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        for _ in 0..(self.size - self.n()) {
-            match self.iter.next() {
-                Some(Ok(item)) => self.buffer.push(item),
-                Some(Err(error)) => return Some(Err(error)),
-                None => break,
-            }
-        }
-        if self.buffer.is_empty() {
-            None
-        } else {
-            let result = self.buffer.clone();
-            self.buffer.clear();
-            Some(Ok(result))
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[derive(Debug, PartialEq)]
-    struct AError {}
-
-    impl std::fmt::Display for AError {
-        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-            write!(f, "Bla")
-        }
-    }
-    impl std::error::Error for AError {}
-
-    #[test]
-    fn test_basic() {
-        let a: Vec<Result<i32, AError>> = vec![Ok(1), Ok(2), Ok(3)];
-        let iter = a.into_iter();
-        let mut iter = Buffered::new(iter, 2);
-
-        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
-        assert_eq!(iter.next(), Some(Ok(vec![3])));
-        assert_eq!(iter.next(), None);
-    }
-
-    #[test]
-    fn test_error_first() {
-        let a: Vec<Result<i32, AError>> =
-            vec![Ok(1), Ok(2), Err(AError {}), Ok(4), Ok(5)];
-        let iter = a.into_iter();
-        let mut iter = Buffered::new(iter, 2);
-
-        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
-        assert_eq!(iter.next(), Some(Err(AError {})));
-        // 4 is here: it was not skipped on the previous
-        assert_eq!(iter.n(), 0);
-        assert_eq!(iter.next(), Some(Ok(vec![4, 5])));
-        assert_eq!(iter.next(), None);
-    }
-
-    #[test]
-    fn test_error_last() {
-        let a: Vec<Result<i32, AError>> = vec![Ok(1), Err(AError {}), Ok(3), Ok(4)];
-        let iter = a.into_iter();
-        let mut iter = Buffered::new(iter, 2);
-
-        assert_eq!(iter.next(), Some(Err(AError {})));
-        assert_eq!(iter.n(), 1);
-        assert_eq!(iter.next(), Some(Ok(vec![1, 3])));
-        assert_eq!(iter.next(), Some(Ok(vec![4])));
-        assert_eq!(iter.next(), None);
-    }
-}
diff --git a/rust/arrow/src/util/mod.rs b/rust/arrow/src/util/mod.rs
index 053d132..0f95043 100644
--- a/rust/arrow/src/util/mod.rs
+++ b/rust/arrow/src/util/mod.rs
@@ -17,7 +17,6 @@
 
 pub mod bit_chunk_iterator;
 pub mod bit_util;
-pub mod buffered_iterator;
 pub mod display;
 pub mod integration_util;
 #[cfg(feature = "prettyprint")]
diff --git a/rust/benchmarks/Cargo.toml b/rust/benchmarks/Cargo.toml
index 7d7b6c9..6017d88 100644
--- a/rust/benchmarks/Cargo.toml
+++ b/rust/benchmarks/Cargo.toml
@@ -31,4 +31,4 @@ parquet = { path = "../parquet" }
 datafusion = { path = "../datafusion" }
 structopt = { version = "0.3", default-features = false }
 tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] }
-futures = "0.3"
+futures = "0.3"
\ No newline at end of file
diff --git a/rust/benchmarks/src/bin/nyctaxi.rs b/rust/benchmarks/src/bin/nyctaxi.rs
index 02a790b..1ffa684 100644
--- a/rust/benchmarks/src/bin/nyctaxi.rs
+++ b/rust/benchmarks/src/bin/nyctaxi.rs
@@ -92,7 +92,7 @@ async fn datafusion_sql_benchmarks(
     debug: bool,
 ) -> Result<()> {
     let mut queries = HashMap::new();
-    queries.insert("fare_amt_by_passenger", "SELECT passenger_count, MIN(fare_amount), MIN(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count");
+    queries.insert("fare_amt_by_passenger", "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count");
     for (name, sql) in &queries {
         println!("Executing '{}'", name);
         for i in 0..iterations {