You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by su...@apache.org on 2019/03/14 21:11:52 UTC

[arrow] branch master updated: ARROW-4705: [Rust] Improve error handling in csv reader

This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 74436f0  ARROW-4705: [Rust] Improve error handling in csv reader
74436f0 is described below

commit 74436f07c482c0d46a8a52c2b50dc9ad1191922a
Author: Andy Grove <an...@gmail.com>
AuthorDate: Thu Mar 14 14:11:40 2019 -0700

    ARROW-4705: [Rust] Improve error handling in csv reader
    
    Author: Andy Grove <an...@gmail.com>
    
    Closes #3895 from andygrove/ARROW-4705 and squashes the following commits:
    
    b32f4a8e <Andy Grove> Address PR feedback
    00d6b3b4 <Andy Grove> start line number at 1 if csv has header row
    8c9fca00 <Andy Grove> Improve error handling in csv reader
---
 rust/arrow/Cargo.toml                          |  2 +-
 rust/arrow/src/csv/reader.rs                   | 51 ++++++++++++++++++++++----
 rust/arrow/test/data/various_types_invalid.csv |  6 +++
 3 files changed, 51 insertions(+), 8 deletions(-)

diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml
index fbc9be0..5e96e0d 100644
--- a/rust/arrow/Cargo.toml
+++ b/rust/arrow/Cargo.toml
@@ -41,7 +41,7 @@ serde_derive = "1.0.80"
 serde_json = { version = "1.0.13", features = ["preserve_order"] }
 indexmap = "1.0"
 rand = "0.5"
-csv = "1.0.0"
+csv = "1.0"
 num = "0.2"
 regex = "1.1"
 lazy_static = "1.2"
diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs
index 85b2ccd..ffeffdd 100644
--- a/rust/arrow/src/csv/reader.rs
+++ b/rust/arrow/src/csv/reader.rs
@@ -191,6 +191,8 @@ pub struct Reader<R: Read> {
     record_iter: StringRecordsIntoIter<BufReader<R>>,
     /// Batch size (number of records to load each time)
     batch_size: usize,
+    /// Current line number, used in error reporting
+    line_number: usize,
 }
 
 impl<R: Read> Reader<R> {
@@ -235,6 +237,7 @@ impl<R: Read> Reader<R> {
             projection,
             record_iter,
             batch_size,
+            line_number: if has_headers { 1 } else { 0 },
         }
     }
 
@@ -242,15 +245,17 @@ impl<R: Read> Reader<R> {
     pub fn next(&mut self) -> Result<Option<RecordBatch>> {
         // read a batch of rows into memory
         let mut rows: Vec<StringRecord> = Vec::with_capacity(self.batch_size);
-        for _ in 0..self.batch_size {
+        for i in 0..self.batch_size {
             match self.record_iter.next() {
                 Some(Ok(r)) => {
                     rows.push(r);
                 }
-                Some(Err(_)) => {
-                    return Err(ArrowError::ParseError(
-                        "Error reading CSV file".to_string(),
-                    ));
+                Some(Err(e)) => {
+                    return Err(ArrowError::ParseError(format!(
+                        "Error parsing line {}: {:?}",
+                        self.line_number + i,
+                        e
+                    )));
                 }
                 None => break,
             }
@@ -319,6 +324,8 @@ impl<R: Read> Reader<R> {
             })
             .collect();
 
+        self.line_number += rows.len();
+
         let schema_fields = self.schema.fields();
 
         let projected_fields: Vec<Field> = projection
@@ -358,8 +365,9 @@ impl<R: Read> Reader<R> {
                         Err(_) => {
                             // TODO: we should surface the underlying error here.
                             return Err(ArrowError::ParseError(format!(
-                                "Error while parsing value {}",
-                                s
+                                "Error while parsing value {} at line {}",
+                                s,
+                                self.line_number + row_index
                             )));
                         }
                     }
@@ -503,6 +511,7 @@ impl ReaderBuilder {
             projection: self.projection.clone(),
             record_iter,
             batch_size: self.batch_size,
+            line_number: if self.has_headers { 1 } else { 0 },
         })
     }
 }
@@ -718,4 +727,32 @@ mod tests {
         assert_eq!(false, batch.column(1).is_null(3));
         assert_eq!(false, batch.column(1).is_null(4));
     }
+
+    #[test]
+    fn test_parse_invalid_csv() {
+        let file = File::open("test/data/various_types_invalid.csv").unwrap();
+
+        let schema = Schema::new(vec![
+            Field::new("c_int", DataType::UInt64, false),
+            Field::new("c_float", DataType::Float32, false),
+            Field::new("c_string", DataType::Utf8, false),
+            Field::new("c_bool", DataType::Boolean, false),
+        ]);
+
+        let builder = ReaderBuilder::new()
+            .with_schema(Arc::new(schema))
+            .has_headers(true)
+            .with_delimiter(b'|')
+            .with_batch_size(512)
+            .with_projection(vec![0, 1, 2, 3]);
+
+        let mut csv = builder.build(file).unwrap();
+        match csv.next() {
+            Err(e) => assert_eq!(
+                "ParseError(\"Error while parsing value 4.x4 at line 4\")",
+                format!("{:?}", e)
+            ),
+            Ok(_) => panic!("should have failed"),
+        }
+    }
 }
diff --git a/rust/arrow/test/data/various_types_invalid.csv b/rust/arrow/test/data/various_types_invalid.csv
new file mode 100644
index 0000000..6f059cb
--- /dev/null
+++ b/rust/arrow/test/data/various_types_invalid.csv
@@ -0,0 +1,6 @@
+c_int|c_float|c_string|c_bool
+1|1.1|"1.11"|true
+2|2.2|"2.22"|true
+3||"3.33"|true
+4|4.x4||false
+5|6.6|""|false
\ No newline at end of file