You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/19 15:51:17 UTC

[GitHub] [arrow] andygrove commented on a change in pull request #8482: ARROW-10332: [Rust] Allow CSV reader to iterate from start up to end

andygrove commented on a change in pull request #8482:
URL: https://github.com/apache/arrow/pull/8482#discussion_r507863412



##########
File path: rust/arrow/src/util/buffered_iterator.rs
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [Buffered] is an iterator useful to build an [arrow::array::Array] and other
+//! containers that benefit from batching or chunking.
+
+use std::marker::PhantomData;
+
+/// An iterator that buffers results in a vector so that the iterator returns a vector of `size` items.
+/// The items must be a [std::result::Result] and if an error is returned, tha error is returned
+/// and the iterator continues.
+/// An invariant of this iterator is that every returned vector's size is at most the specified size.
+#[derive(Debug)]
+pub struct Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    iter: I,
+    size: usize,
+    buffer: Vec<T>,
+    phantom: PhantomData<R>,
+}
+
+impl<I, T, R> Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    pub fn new(iter: I, size: usize) -> Self {
+        Buffered {
+            iter,
+            size,
+            buffer: Vec::with_capacity(size),
+            phantom: PhantomData,
+        }
+    }
+
+    /// returns the number of items buffered so far.
+    /// Useful to extract the exact item where an error occurred
+    #[inline]
+    pub fn n(&self) -> usize {
+        return self.buffer.len();
+    }
+}
+
+impl<I, T, R> Iterator for Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    type Item = Result<Vec<T>, R>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        for _ in 0..(self.size - self.n()) {
+            match self.iter.next() {
+                Some(Ok(item)) => self.buffer.push(item),
+                Some(Err(error)) => return Some(Err(error)),
+                None => break,
+            }
+        }
+        if self.buffer.is_empty() {
+            None
+        } else {
+            let result = self.buffer.clone();
+            self.buffer.clear();
+            Some(Ok(result))
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[derive(Debug, PartialEq)]
+    struct AError {}
+
+    impl std::fmt::Display for AError {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            write!(f, "Bla")
+        }
+    }
+    impl std::error::Error for AError {}
+
+    #[test]
+    fn test_basic() {
+        let a: Vec<Result<i32, AError>> = vec![Ok(1), Ok(2), Ok(3)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
+        assert_eq!(iter.next(), Some(Ok(vec![3])));
+        assert_eq!(iter.next(), None);
+    }
+
+    #[test]
+    fn test_error_first() {
+        let a: Vec<Result<i32, AError>> =
+            vec![Ok(1), Ok(2), Err(AError {}), Ok(4), Ok(5)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
+        assert_eq!(iter.next(), Some(Err(AError {})));

Review comment:
       It seems acceptable to me that errors can be ignored during schema inference since schema inference is a best-effort thing anyway, but I do think it would be good to log warnings in this case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org