You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/17 08:24:25 UTC

[GitHub] [arrow] jorgecarleitao opened a new pull request #8482: ARROW-10332: [Rust] Allow CSV reader to iterate from start up to end

jorgecarleitao opened a new pull request #8482:
URL: https://github.com/apache/arrow/pull/8482


   This PR proposes the following changes:
   
   1. Make the CSV reader accept an optional argument to bound its iteration
   2. Simplify the `next` code via iterators
   3. Add a new struct to perform buffered iterations (useful to any reader)
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8482: ARROW-10332: [Rust] Allow CSV reader to iterate from start up to end

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8482:
URL: https://github.com/apache/arrow/pull/8482#discussion_r507863412



##########
File path: rust/arrow/src/util/buffered_iterator.rs
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [Buffered] is an iterator useful to build an [arrow::array::Array] and other
+//! containers that benefit from batching or chunking.
+
+use std::marker::PhantomData;
+
+/// An iterator that buffers results in a vector so that the iterator returns a vector of `size` items.
+/// The items must be a [std::result::Result] and if an error is returned, tha error is returned
+/// and the iterator continues.
+/// An invariant of this iterator is that every returned vector's size is at most the specified size.
+#[derive(Debug)]
+pub struct Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    iter: I,
+    size: usize,
+    buffer: Vec<T>,
+    phantom: PhantomData<R>,
+}
+
+impl<I, T, R> Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    pub fn new(iter: I, size: usize) -> Self {
+        Buffered {
+            iter,
+            size,
+            buffer: Vec::with_capacity(size),
+            phantom: PhantomData,
+        }
+    }
+
+    /// returns the number of items buffered so far.
+    /// Useful to extract the exact item where an error occurred
+    #[inline]
+    pub fn n(&self) -> usize {
+        return self.buffer.len();
+    }
+}
+
+impl<I, T, R> Iterator for Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    type Item = Result<Vec<T>, R>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        for _ in 0..(self.size - self.n()) {
+            match self.iter.next() {
+                Some(Ok(item)) => self.buffer.push(item),
+                Some(Err(error)) => return Some(Err(error)),
+                None => break,
+            }
+        }
+        if self.buffer.is_empty() {
+            None
+        } else {
+            let result = self.buffer.clone();
+            self.buffer.clear();
+            Some(Ok(result))
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[derive(Debug, PartialEq)]
+    struct AError {}
+
+    impl std::fmt::Display for AError {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            write!(f, "Bla")
+        }
+    }
+    impl std::error::Error for AError {}
+
+    #[test]
+    fn test_basic() {
+        let a: Vec<Result<i32, AError>> = vec![Ok(1), Ok(2), Ok(3)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
+        assert_eq!(iter.next(), Some(Ok(vec![3])));
+        assert_eq!(iter.next(), None);
+    }
+
+    #[test]
+    fn test_error_first() {
+        let a: Vec<Result<i32, AError>> =
+            vec![Ok(1), Ok(2), Err(AError {}), Ok(4), Ok(5)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
+        assert_eq!(iter.next(), Some(Err(AError {})));

Review comment:
       It seems acceptable to me that errors can be ignored during schema inference since schema inference is a best-effort thing anyway, but I do think it would be good to log warnings in this case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8482: ARROW-10332: [Rust] Allow CSV reader to iterate from start up to end

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8482:
URL: https://github.com/apache/arrow/pull/8482#discussion_r507178148



##########
File path: rust/arrow/src/csv/reader.rs
##########
@@ -304,162 +311,180 @@ impl<R: Read> Reader<R> {
 
         let csv_reader = reader_builder.from_reader(buf_reader);
         let record_iter = csv_reader.into_records();
+
+        let (start, end) = match bounds {
+            None => (0, usize::MAX),
+            Some((start, end)) => (start, end),
+        };
+        // Create an iterator that:
+        // * skips the first `start` items
+        // * runs up to `end` items
+        // * buffers `batch_size` items
+        // note that this skips by iteration. This is because in general it is not possible
+        // to seek in CSV. However, skiping still saves the burden of creating arrow arrays,
+        // which is a slow operation that scales with the number of columns
+        let record_iter = Buffered::new(record_iter.skip(start).take(end), batch_size);
+
         Self {
             schema,
             projection,
             record_iter,
-            batch_size,
-            line_number: if has_header { 1 } else { 0 },
+            line_number: if has_header { start + 1 } else { start + 0 },
         }
     }
-
-    fn parse(&self, rows: &[StringRecord]) -> Result<RecordBatch> {
-        let projection: Vec<usize> = match self.projection {
-            Some(ref v) => v.clone(),
-            None => self
-                .schema
-                .fields()
-                .iter()
-                .enumerate()
-                .map(|(i, _)| i)
-                .collect(),
-        };
-
-        let arrays: Result<Vec<ArrayRef>> = projection
-            .iter()
-            .map(|i| {
-                let i = *i;
-                let field = self.schema.field(i);
-                match field.data_type() {
-                    &DataType::Boolean => {
-                        self.build_primitive_array::<BooleanType>(rows, i)
-                    }
-                    &DataType::Int8 => self.build_primitive_array::<Int8Type>(rows, i),
-                    &DataType::Int16 => self.build_primitive_array::<Int16Type>(rows, i),
-                    &DataType::Int32 => self.build_primitive_array::<Int32Type>(rows, i),
-                    &DataType::Int64 => self.build_primitive_array::<Int64Type>(rows, i),
-                    &DataType::UInt8 => self.build_primitive_array::<UInt8Type>(rows, i),
-                    &DataType::UInt16 => {
-                        self.build_primitive_array::<UInt16Type>(rows, i)
-                    }
-                    &DataType::UInt32 => {
-                        self.build_primitive_array::<UInt32Type>(rows, i)
-                    }
-                    &DataType::UInt64 => {
-                        self.build_primitive_array::<UInt64Type>(rows, i)
-                    }
-                    &DataType::Float32 => {
-                        self.build_primitive_array::<Float32Type>(rows, i)
-                    }
-                    &DataType::Float64 => {
-                        self.build_primitive_array::<Float64Type>(rows, i)
-                    }
-                    &DataType::Utf8 => {
-                        let mut builder = StringBuilder::new(rows.len());
-                        for row in rows.iter() {
-                            match row.get(i) {
-                                Some(s) => builder.append_value(s).unwrap(),
-                                _ => builder.append(false).unwrap(),
-                            }
-                        }
-                        Ok(Arc::new(builder.finish()) as ArrayRef)
-                    }
-                    other => Err(ArrowError::ParseError(format!(
-                        "Unsupported data type {:?}",
-                        other
-                    ))),
-                }
-            })
-            .collect();
-
-        let schema_fields = self.schema.fields();
-
-        let projected_fields: Vec<Field> = projection
-            .iter()
-            .map(|i| schema_fields[*i].clone())
-            .collect();
-
-        let projected_schema = Arc::new(Schema::new(projected_fields));
-
-        arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr))
-    }
-
-    fn build_primitive_array<T: ArrowPrimitiveType>(
-        &self,
-        rows: &[StringRecord],
-        col_idx: usize,
-    ) -> Result<ArrayRef> {
-        let is_boolean_type =
-            *self.schema.field(col_idx).data_type() == DataType::Boolean;
-
-        rows.iter()
-            .enumerate()
-            .map(|(row_index, row)| {
-                match row.get(col_idx) {
-                    Some(s) => {
-                        if s.is_empty() {
-                            return Ok(None);
-                        }
-                        let parsed = if is_boolean_type {
-                            s.to_lowercase().parse::<T::Native>()
-                        } else {
-                            s.parse::<T::Native>()
-                        };
-                        match parsed {
-                            Ok(e) => Ok(Some(e)),
-                            Err(_) => Err(ArrowError::ParseError(format!(
-                                // TODO: we should surface the underlying error here.
-                                "Error while parsing value {} for column {} at line {}",
-                                s,
-                                col_idx,
-                                self.line_number + row_index
-                            ))),
-                        }
-                    }
-                    None => Ok(None),
-                }
-            })
-            .collect::<Result<PrimitiveArray<T>>>()
-            .map(|e| Arc::new(e) as ArrayRef)
-    }
 }
 
 impl<R: Read> Iterator for Reader<R> {
     type Item = Result<RecordBatch>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        // read a batch of rows into memory
-        let mut rows: Vec<StringRecord> = Vec::with_capacity(self.batch_size);
-        for i in 0..self.batch_size {
-            match self.record_iter.next() {
-                Some(Ok(r)) => {
-                    rows.push(r);
-                }
-                Some(Err(e)) => {
-                    return Some(Err(ArrowError::ParseError(format!(
-                        "Error parsing line {}: {:?}",
-                        self.line_number + i,
-                        e
-                    ))));
-                }
-                None => break,
+        let rows = match self.record_iter.next() {
+            Some(Ok(r)) => r,
+            Some(Err(e)) => {
+                return Some(Err(ArrowError::ParseError(format!(
+                    "Error parsing line {}: {:?}",
+                    self.line_number + self.record_iter.n(),
+                    e
+                ))));
             }
-        }
+            None => return None,
+        };
 
         // return early if no data was loaded
         if rows.is_empty() {
             return None;
         }
 
         // parse the batches into a RecordBatch
-        let result = self.parse(&rows);
+        let result = parse(
+            &rows,
+            &self.schema.fields(),
+            &self.projection,
+            self.line_number,
+        );
 
         self.line_number += rows.len();
 
         Some(result)
     }
 }
 
+/// parses a slice of [csv_crate::StringRecord] into a RecordBatch.
+pub fn parse(

Review comment:
       Good catch. I reverted this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8482: ARROW-10332: [Rust] Allow CSV reader to iterate from start up to end

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8482:
URL: https://github.com/apache/arrow/pull/8482#discussion_r507177994



##########
File path: rust/arrow/src/util/buffered_iterator.rs
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [Buffered] is an iterator useful to build an [arrow::array::Array] and other
+//! containers that benefit from batching or chunking.
+
+use std::marker::PhantomData;
+
+/// An iterator that buffers results in a vector so that the iterator returns a vector of `size` items.
+/// The items must be a [std::result::Result] and if an error is returned, tha error is returned
+/// and the iterator continues.
+/// An invariant of this iterator is that every returned vector's size is at most the specified size.
+#[derive(Debug)]
+pub struct Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    iter: I,
+    size: usize,
+    buffer: Vec<T>,
+    phantom: PhantomData<R>,
+}
+
+impl<I, T, R> Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    pub fn new(iter: I, size: usize) -> Self {
+        Buffered {
+            iter,
+            size,
+            buffer: Vec::with_capacity(size),
+            phantom: PhantomData,
+        }
+    }
+
+    /// returns the number of items buffered so far.
+    /// Useful to extract the exact item where an error occurred
+    #[inline]
+    pub fn n(&self) -> usize {
+        return self.buffer.len();
+    }
+}
+
+impl<I, T, R> Iterator for Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    type Item = Result<Vec<T>, R>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        for _ in 0..(self.size - self.n()) {
+            match self.iter.next() {
+                Some(Ok(item)) => self.buffer.push(item),
+                Some(Err(error)) => return Some(Err(error)),
+                None => break,
+            }
+        }
+        if self.buffer.is_empty() {
+            None
+        } else {
+            let result = self.buffer.clone();
+            self.buffer.clear();
+            Some(Ok(result))
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[derive(Debug, PartialEq)]
+    struct AError {}
+
+    impl std::fmt::Display for AError {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            write!(f, "Bla")
+        }
+    }
+    impl std::error::Error for AError {}
+
+    #[test]
+    fn test_basic() {
+        let a: Vec<Result<i32, AError>> = vec![Ok(1), Ok(2), Ok(3)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
+        assert_eq!(iter.next(), Some(Ok(vec![3])));
+        assert_eq!(iter.next(), None);
+    }
+
+    #[test]
+    fn test_error_first() {
+        let a: Vec<Result<i32, AError>> =
+            vec![Ok(1), Ok(2), Err(AError {}), Ok(4), Ok(5)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
+        assert_eq!(iter.next(), Some(Err(AError {})));

Review comment:
       The iterator returns the error and continues, but I am not sure what happens during schema inference, or how the column names affect schema inference.
   
   One thing I do not like atm is that we error the whole batch if we can't parse/cast an entry to a type. IMO we should `null` the value and continue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8482: ARROW-10332: [Rust] Allow CSV reader to iterate from start up to end

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8482:
URL: https://github.com/apache/arrow/pull/8482#discussion_r507178185



##########
File path: rust/arrow/src/csv/reader.rs
##########
@@ -620,8 +645,15 @@ mod tests {
 
         let file = File::open("test/data/uk_cities.csv").unwrap();
 
-        let mut csv =
-            Reader::new(file, Arc::new(schema.clone()), false, None, 1024, None);
+        let mut csv = Reader::new(

Review comment:
       I've added a test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8482: ARROW-10332: [Rust] Allow CSV reader to iterate from start up to end

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8482:
URL: https://github.com/apache/arrow/pull/8482#discussion_r507177994



##########
File path: rust/arrow/src/util/buffered_iterator.rs
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [Buffered] is an iterator useful to build an [arrow::array::Array] and other
+//! containers that benefit from batching or chunking.
+
+use std::marker::PhantomData;
+
+/// An iterator that buffers results in a vector so that the iterator returns a vector of `size` items.
+/// The items must be a [std::result::Result] and if an error is returned, tha error is returned
+/// and the iterator continues.
+/// An invariant of this iterator is that every returned vector's size is at most the specified size.
+#[derive(Debug)]
+pub struct Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    iter: I,
+    size: usize,
+    buffer: Vec<T>,
+    phantom: PhantomData<R>,
+}
+
+impl<I, T, R> Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    pub fn new(iter: I, size: usize) -> Self {
+        Buffered {
+            iter,
+            size,
+            buffer: Vec::with_capacity(size),
+            phantom: PhantomData,
+        }
+    }
+
+    /// returns the number of items buffered so far.
+    /// Useful to extract the exact item where an error occurred
+    #[inline]
+    pub fn n(&self) -> usize {
+        return self.buffer.len();
+    }
+}
+
+impl<I, T, R> Iterator for Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    type Item = Result<Vec<T>, R>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        for _ in 0..(self.size - self.n()) {
+            match self.iter.next() {
+                Some(Ok(item)) => self.buffer.push(item),
+                Some(Err(error)) => return Some(Err(error)),
+                None => break,
+            }
+        }
+        if self.buffer.is_empty() {
+            None
+        } else {
+            let result = self.buffer.clone();
+            self.buffer.clear();
+            Some(Ok(result))
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[derive(Debug, PartialEq)]
+    struct AError {}
+
+    impl std::fmt::Display for AError {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            write!(f, "Bla")
+        }
+    }
+    impl std::error::Error for AError {}
+
+    #[test]
+    fn test_basic() {
+        let a: Vec<Result<i32, AError>> = vec![Ok(1), Ok(2), Ok(3)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
+        assert_eq!(iter.next(), Some(Ok(vec![3])));
+        assert_eq!(iter.next(), None);
+    }
+
+    #[test]
+    fn test_error_first() {
+        let a: Vec<Result<i32, AError>> =
+            vec![Ok(1), Ok(2), Err(AError {}), Ok(4), Ok(5)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
+        assert_eq!(iter.next(), Some(Err(AError {})));

Review comment:
       The iterator returns the error and continues, but I am not sure what happens during schema inference, or how the column names affect schema inference.
   
   One thing I do not like atm is that we error the whole batch if we can't parse/cast an entry. IMO we should place `null` if we can't cast/parse a value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8482: ARROW-10332: [Rust] Allow CSV reader to iterate from start up to end

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8482:
URL: https://github.com/apache/arrow/pull/8482#issuecomment-710775116


   https://issues.apache.org/jira/browse/ARROW-10332


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nevi-me closed pull request #8482: ARROW-10332: [Rust] Allow CSV reader to iterate from start up to end

Posted by GitBox <gi...@apache.org>.
nevi-me closed pull request #8482:
URL: https://github.com/apache/arrow/pull/8482


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nevi-me commented on a change in pull request #8482: ARROW-10332: [Rust] Allow CSV reader to iterate from start up to end

Posted by GitBox <gi...@apache.org>.
nevi-me commented on a change in pull request #8482:
URL: https://github.com/apache/arrow/pull/8482#discussion_r506960675



##########
File path: rust/arrow/src/csv/reader.rs
##########
@@ -304,162 +311,180 @@ impl<R: Read> Reader<R> {
 
         let csv_reader = reader_builder.from_reader(buf_reader);
         let record_iter = csv_reader.into_records();
+
+        let (start, end) = match bounds {
+            None => (0, usize::MAX),
+            Some((start, end)) => (start, end),
+        };
+        // Create an iterator that:
+        // * skips the first `start` items
+        // * runs up to `end` items
+        // * buffers `batch_size` items
+        // note that this skips by iteration. This is because in general it is not possible
+        // to seek in CSV. However, skiping still saves the burden of creating arrow arrays,
+        // which is a slow operation that scales with the number of columns
+        let record_iter = Buffered::new(record_iter.skip(start).take(end), batch_size);
+
         Self {
             schema,
             projection,
             record_iter,
-            batch_size,
-            line_number: if has_header { 1 } else { 0 },
+            line_number: if has_header { start + 1 } else { start + 0 },
         }
     }
-
-    fn parse(&self, rows: &[StringRecord]) -> Result<RecordBatch> {
-        let projection: Vec<usize> = match self.projection {
-            Some(ref v) => v.clone(),
-            None => self
-                .schema
-                .fields()
-                .iter()
-                .enumerate()
-                .map(|(i, _)| i)
-                .collect(),
-        };
-
-        let arrays: Result<Vec<ArrayRef>> = projection
-            .iter()
-            .map(|i| {
-                let i = *i;
-                let field = self.schema.field(i);
-                match field.data_type() {
-                    &DataType::Boolean => {
-                        self.build_primitive_array::<BooleanType>(rows, i)
-                    }
-                    &DataType::Int8 => self.build_primitive_array::<Int8Type>(rows, i),
-                    &DataType::Int16 => self.build_primitive_array::<Int16Type>(rows, i),
-                    &DataType::Int32 => self.build_primitive_array::<Int32Type>(rows, i),
-                    &DataType::Int64 => self.build_primitive_array::<Int64Type>(rows, i),
-                    &DataType::UInt8 => self.build_primitive_array::<UInt8Type>(rows, i),
-                    &DataType::UInt16 => {
-                        self.build_primitive_array::<UInt16Type>(rows, i)
-                    }
-                    &DataType::UInt32 => {
-                        self.build_primitive_array::<UInt32Type>(rows, i)
-                    }
-                    &DataType::UInt64 => {
-                        self.build_primitive_array::<UInt64Type>(rows, i)
-                    }
-                    &DataType::Float32 => {
-                        self.build_primitive_array::<Float32Type>(rows, i)
-                    }
-                    &DataType::Float64 => {
-                        self.build_primitive_array::<Float64Type>(rows, i)
-                    }
-                    &DataType::Utf8 => {
-                        let mut builder = StringBuilder::new(rows.len());
-                        for row in rows.iter() {
-                            match row.get(i) {
-                                Some(s) => builder.append_value(s).unwrap(),
-                                _ => builder.append(false).unwrap(),
-                            }
-                        }
-                        Ok(Arc::new(builder.finish()) as ArrayRef)
-                    }
-                    other => Err(ArrowError::ParseError(format!(
-                        "Unsupported data type {:?}",
-                        other
-                    ))),
-                }
-            })
-            .collect();
-
-        let schema_fields = self.schema.fields();
-
-        let projected_fields: Vec<Field> = projection
-            .iter()
-            .map(|i| schema_fields[*i].clone())
-            .collect();
-
-        let projected_schema = Arc::new(Schema::new(projected_fields));
-
-        arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr))
-    }
-
-    fn build_primitive_array<T: ArrowPrimitiveType>(
-        &self,
-        rows: &[StringRecord],
-        col_idx: usize,
-    ) -> Result<ArrayRef> {
-        let is_boolean_type =
-            *self.schema.field(col_idx).data_type() == DataType::Boolean;
-
-        rows.iter()
-            .enumerate()
-            .map(|(row_index, row)| {
-                match row.get(col_idx) {
-                    Some(s) => {
-                        if s.is_empty() {
-                            return Ok(None);
-                        }
-                        let parsed = if is_boolean_type {
-                            s.to_lowercase().parse::<T::Native>()
-                        } else {
-                            s.parse::<T::Native>()
-                        };
-                        match parsed {
-                            Ok(e) => Ok(Some(e)),
-                            Err(_) => Err(ArrowError::ParseError(format!(
-                                // TODO: we should surface the underlying error here.
-                                "Error while parsing value {} for column {} at line {}",
-                                s,
-                                col_idx,
-                                self.line_number + row_index
-                            ))),
-                        }
-                    }
-                    None => Ok(None),
-                }
-            })
-            .collect::<Result<PrimitiveArray<T>>>()
-            .map(|e| Arc::new(e) as ArrayRef)
-    }
 }
 
 impl<R: Read> Iterator for Reader<R> {
     type Item = Result<RecordBatch>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        // read a batch of rows into memory
-        let mut rows: Vec<StringRecord> = Vec::with_capacity(self.batch_size);
-        for i in 0..self.batch_size {
-            match self.record_iter.next() {
-                Some(Ok(r)) => {
-                    rows.push(r);
-                }
-                Some(Err(e)) => {
-                    return Some(Err(ArrowError::ParseError(format!(
-                        "Error parsing line {}: {:?}",
-                        self.line_number + i,
-                        e
-                    ))));
-                }
-                None => break,
+        let rows = match self.record_iter.next() {
+            Some(Ok(r)) => r,
+            Some(Err(e)) => {
+                return Some(Err(ArrowError::ParseError(format!(
+                    "Error parsing line {}: {:?}",
+                    self.line_number + self.record_iter.n(),
+                    e
+                ))));
             }
-        }
+            None => return None,
+        };
 
         // return early if no data was loaded
         if rows.is_empty() {
             return None;
         }
 
         // parse the batches into a RecordBatch
-        let result = self.parse(&rows);
+        let result = parse(
+            &rows,
+            &self.schema.fields(),
+            &self.projection,
+            self.line_number,
+        );
 
         self.line_number += rows.len();
 
         Some(result)
     }
 }
 
+/// parses a slice of [csv_crate::StringRecord] into a RecordBatch.
+pub fn parse(

Review comment:
       Does this need to be public?

##########
File path: rust/arrow/src/csv/reader.rs
##########
@@ -620,8 +645,15 @@ mod tests {
 
         let file = File::open("test/data/uk_cities.csv").unwrap();
 
-        let mut csv =
-            Reader::new(file, Arc::new(schema.clone()), false, None, 1024, None);
+        let mut csv = Reader::new(

Review comment:
       I can't see where you test the bounds

##########
File path: rust/arrow/src/csv/reader.rs
##########
@@ -483,6 +508,8 @@ pub struct ReaderBuilder {
     ///
     /// The default batch size when using the `ReaderBuilder` is 1024 records
     batch_size: usize,
+    ///

Review comment:
       doc

##########
File path: rust/arrow/src/util/buffered_iterator.rs
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [Buffered] is an iterator useful to build an [arrow::array::Array] and other
+//! containers that benefit from batching or chunking.
+
+use std::marker::PhantomData;
+
+/// An iterator that buffers results in a vector so that the iterator returns a vector of `size` items.
+/// The items must be a [std::result::Result] and if an error is returned, tha error is returned
+/// and the iterator continues.
+/// An invariant of this iterator is that every returned vector's size is at most the specified size.
+#[derive(Debug)]
+pub struct Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    iter: I,
+    size: usize,
+    buffer: Vec<T>,
+    phantom: PhantomData<R>,
+}
+
+impl<I, T, R> Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    pub fn new(iter: I, size: usize) -> Self {
+        Buffered {
+            iter,
+            size,
+            buffer: Vec::with_capacity(size),
+            phantom: PhantomData,
+        }
+    }
+
+    /// returns the number of items buffered so far.
+    /// Useful to extract the exact item where an error occurred
+    #[inline]
+    pub fn n(&self) -> usize {
+        return self.buffer.len();
+    }
+}
+
+impl<I, T, R> Iterator for Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    type Item = Result<Vec<T>, R>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        for _ in 0..(self.size - self.n()) {
+            match self.iter.next() {
+                Some(Ok(item)) => self.buffer.push(item),
+                Some(Err(error)) => return Some(Err(error)),
+                None => break,
+            }
+        }
+        if self.buffer.is_empty() {
+            None
+        } else {
+            let result = self.buffer.clone();
+            self.buffer.clear();
+            Some(Ok(result))
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[derive(Debug, PartialEq)]
+    struct AError {}
+
+    impl std::fmt::Display for AError {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            write!(f, "Bla")
+        }
+    }
+    impl std::error::Error for AError {}
+
+    #[test]
+    fn test_basic() {
+        let a: Vec<Result<i32, AError>> = vec![Ok(1), Ok(2), Ok(3)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
+        assert_eq!(iter.next(), Some(Ok(vec![3])));
+        assert_eq!(iter.next(), None);
+    }
+
+    #[test]
+    fn test_error_first() {
+        let a: Vec<Result<i32, AError>> =
+            vec![Ok(1), Ok(2), Err(AError {}), Ok(4), Ok(5)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
+        assert_eq!(iter.next(), Some(Err(AError {})));

Review comment:
       Are we skipping errors to allow users to read files that have either corrupted records or non-UTF8 data in some columns? I've come across the irritating situation where Spark fails because a name column in a CSV contains non-UTF8 characters.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nevi-me commented on a change in pull request #8482: ARROW-10332: [Rust] Allow CSV reader to iterate from start up to end

Posted by GitBox <gi...@apache.org>.
nevi-me commented on a change in pull request #8482:
URL: https://github.com/apache/arrow/pull/8482#discussion_r507200840



##########
File path: rust/arrow/src/util/buffered_iterator.rs
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [Buffered] is an iterator useful to build an [arrow::array::Array] and other
+//! containers that benefit from batching or chunking.
+
+use std::marker::PhantomData;
+
+/// An iterator that buffers results in a vector so that the iterator returns a vector of `size` items.
+/// The items must be a [std::result::Result] and if an error is returned, tha error is returned
+/// and the iterator continues.
+/// An invariant of this iterator is that every returned vector's size is at most the specified size.
+#[derive(Debug)]
+pub struct Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    iter: I,
+    size: usize,
+    buffer: Vec<T>,
+    phantom: PhantomData<R>,
+}
+
+impl<I, T, R> Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    pub fn new(iter: I, size: usize) -> Self {
+        Buffered {
+            iter,
+            size,
+            buffer: Vec::with_capacity(size),
+            phantom: PhantomData,
+        }
+    }
+
+    /// returns the number of items buffered so far.
+    /// Useful to extract the exact item where an error occurred
+    #[inline]
+    pub fn n(&self) -> usize {
+        return self.buffer.len();
+    }
+}
+
+impl<I, T, R> Iterator for Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    type Item = Result<Vec<T>, R>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        for _ in 0..(self.size - self.n()) {
+            match self.iter.next() {
+                Some(Ok(item)) => self.buffer.push(item),
+                Some(Err(error)) => return Some(Err(error)),
+                None => break,
+            }
+        }
+        if self.buffer.is_empty() {
+            None
+        } else {
+            let result = self.buffer.clone();
+            self.buffer.clear();
+            Some(Ok(result))
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[derive(Debug, PartialEq)]
+    struct AError {}
+
+    impl std::fmt::Display for AError {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            write!(f, "Bla")
+        }
+    }
+    impl std::error::Error for AError {}
+
+    #[test]
+    fn test_basic() {
+        let a: Vec<Result<i32, AError>> = vec![Ok(1), Ok(2), Ok(3)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
+        assert_eq!(iter.next(), Some(Ok(vec![3])));
+        assert_eq!(iter.next(), None);
+    }
+
+    #[test]
+    fn test_error_first() {
+        let a: Vec<Result<i32, AError>> =
+            vec![Ok(1), Ok(2), Err(AError {}), Ok(4), Ok(5)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
+        assert_eq!(iter.next(), Some(Err(AError {})));

Review comment:
       It could be fine if we have some way of notifying the user (logging a warning?) that some records weren't parsed, so they're aware of the incompleteness.
   
   I'd like to hear the opinions of others, my opinion would be to make this a configurable option if possible.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org