You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2020/12/09 19:41:55 UTC

[arrow] branch master updated: ARROW-10842 [Rust] decouple IO from json reader, fix crash during json schema inference with invalid json

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3deae8d  ARROW-10842 [Rust] decouple IO from json reader, fix crash during json schema inference with invalid json
3deae8d is described below

commit 3deae8dd50da773ba215704e567d9937b04b02c5
Author: Qingping Hou <da...@gmail.com>
AuthorDate: Wed Dec 9 14:40:44 2020 -0500

    ARROW-10842 [Rust] decouple IO from json reader, fix crash during json schema inference with invalid json
    
    Other related changes:
    
     * reuse the same IO code between schema inference and record batch reader.
     * reuse string buffer to avoid repeatitive memory allocation
     * ignore new lines when reading json records
     * avoid hard crash during JSON schema inference caused by invalid json
    
    Closes #8867 from houqp/qp_json
    
    Authored-by: Qingping Hou <da...@gmail.com>
    Signed-off-by: Andrew Lamb <an...@nerdnetworks.org>
---
 rust/arrow/src/json/reader.rs | 283 +++++++++++++++++++++++++++++++++---------
 1 file changed, 225 insertions(+), 58 deletions(-)

diff --git a/rust/arrow/src/json/reader.rs b/rust/arrow/src/json/reader.rs
index 118c52c..873baf8 100644
--- a/rust/arrow/src/json/reader.rs
+++ b/rust/arrow/src/json/reader.rs
@@ -189,6 +189,82 @@ fn generate_schema(spec: HashMap<String, HashSet<DataType>>) -> Result<SchemaRef
     }
 }
 
+/// JSON file reader that produces a serde_json::Value iterator from a Read trait
+///
+/// # Example
+///
+/// ```
+/// use std::fs::File;
+/// use std::io::BufReader;
+/// use arrow::json::reader::ValueIter;
+///
+/// let mut reader =
+///     BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
+/// let mut value_reader = ValueIter::new(&mut reader, None);
+/// for value in value_reader {
+///     println!("JSON value: {}", value.unwrap());
+/// }
+/// ```
+#[derive(Debug)]
+pub struct ValueIter<'a, R: Read> {
+    reader: &'a mut BufReader<R>,
+    max_read_records: Option<usize>,
+    record_count: usize,
+    // reuse line buffer to avoid allocation on each record
+    line_buf: String,
+}
+
+impl<'a, R: Read> ValueIter<'a, R> {
+    pub fn new(reader: &'a mut BufReader<R>, max_read_records: Option<usize>) -> Self {
+        Self {
+            reader,
+            max_read_records,
+            record_count: 0,
+            line_buf: String::new(),
+        }
+    }
+}
+
+impl<'a, R: Read> Iterator for ValueIter<'a, R> {
+    type Item = Result<Value>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if let Some(max) = self.max_read_records {
+            if self.record_count >= max {
+                return None;
+            }
+        }
+
+        loop {
+            self.line_buf.truncate(0);
+            match self.reader.read_line(&mut self.line_buf) {
+                Ok(0) => {
+                    // read_line returns 0 when stream reached EOF
+                    return None;
+                }
+                Err(e) => {
+                    return Some(Err(ArrowError::JsonError(format!(
+                        "Failed to read JSON record: {}",
+                        e
+                    ))));
+                }
+                _ => {
+                    let trimmed_s = self.line_buf.trim();
+                    if trimmed_s.is_empty() {
+                        // ignore empty lines
+                        continue;
+                    }
+
+                    self.record_count += 1;
+                    return Some(serde_json::from_str(trimmed_s).map_err(|e| {
+                        ArrowError::JsonError(format!("Not valid JSON: {}", e))
+                    }));
+                }
+            }
+        }
+    }
+}
+
 /// Infer the fields of a JSON file by reading the first n records of the file, with
 /// `max_read_records` controlling the maximum number of records to read.
 ///
@@ -250,19 +326,18 @@ pub fn infer_json_schema<R: Read>(
     reader: &mut BufReader<R>,
     max_read_records: Option<usize>,
 ) -> Result<SchemaRef> {
-    let mut values: HashMap<String, HashSet<DataType>> = HashMap::new();
-
-    let mut line = String::new();
-    for _ in 0..max_read_records.unwrap_or(std::usize::MAX) {
-        reader.read_line(&mut line)?;
-        if line.is_empty() {
-            break;
-        }
-        let record: Value = serde_json::from_str(&line.trim()).expect("Not valid JSON");
+    infer_json_schema_from_iterator(ValueIter::new(reader, max_read_records))
+}
 
-        line = String::new();
+/// Infer the fields of a JSON file by reading all items from the JSON Value Iterator.
+pub fn infer_json_schema_from_iterator<I>(value_iter: I) -> Result<SchemaRef>
+where
+    I: Iterator<Item = Result<Value>>,
+{
+    let mut values: HashMap<String, HashSet<DataType>> = HashMap::new();
 
-        match record {
+    for record in value_iter {
+        match record? {
             Value::Object(map) => {
                 let res = map.iter().try_for_each(|(k, v)| {
                     match v {
@@ -397,31 +472,51 @@ pub fn infer_json_schema<R: Read>(
     generate_schema(values)
 }
 
-/// JSON file reader
+/// JSON values to Arrow record batch decoder. Decoder's next_batch method takes a JSON Value
+/// iterator as input and outputs Arrow record batch.
+///
+/// # Examples
+/// ```
+/// use arrow::json::reader::{Decoder, ValueIter, infer_json_schema};
+/// use std::fs::File;
+/// use std::io::{BufReader, Seek, SeekFrom};
+///
+/// let mut reader =
+///     BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
+/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
+/// let batch_size = 1024;
+/// let decoder = Decoder::new(inferred_schema, batch_size, None);
+///
+/// // seek back to start so that the original file is usable again
+/// reader.seek(SeekFrom::Start(0)).unwrap();
+/// let mut value_reader = ValueIter::new(&mut reader, None);
+/// let batch = decoder.next_batch(&mut value_reader).unwrap().unwrap();
+/// assert_eq!(4, batch.num_rows());
+/// assert_eq!(4, batch.num_columns());
+/// ```
 #[derive(Debug)]
-pub struct Reader<R: Read> {
+pub struct Decoder {
     /// Explicit schema for the JSON file
     schema: SchemaRef,
     /// Optional projection for which columns to load (case-sensitive names)
     projection: Option<Vec<String>>,
-    /// File reader
-    reader: BufReader<R>,
     /// Batch size (number of records to load each time)
     batch_size: usize,
 }
 
-impl<R: Read> Reader<R> {
-    /// Create a new JSON Reader from any value that implements the `Read` trait.
-    ///
-    /// If reading a `File`, you can customise the Reader, such as to enable schema
-    /// inference, use `ReaderBuilder`.
+impl Decoder {
+    /// Create a new JSON decoder from any value that implements the `Iterator<Item=Result<Value>>`
+    /// trait.
     pub fn new(
-        reader: R,
         schema: SchemaRef,
         batch_size: usize,
         projection: Option<Vec<String>>,
     ) -> Self {
-        Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection)
+        Self {
+            schema,
+            projection,
+            batch_size,
+        }
     }
 
     /// Returns the schema of the reader, useful for getting the schema without reading
@@ -447,40 +542,16 @@ impl<R: Read> Reader<R> {
         }
     }
 
-    /// Create a new JSON Reader from a `BufReader<R: Read>`
-    ///
-    /// To customize the schema, such as to enable schema inference, use `ReaderBuilder`
-    pub fn from_buf_reader(
-        reader: BufReader<R>,
-        schema: SchemaRef,
-        batch_size: usize,
-        projection: Option<Vec<String>>,
-    ) -> Self {
-        Self {
-            schema,
-            projection,
-            reader,
-            batch_size,
-        }
-    }
-
     /// Read the next batch of records
-    #[allow(clippy::should_implement_trait)]
-    pub fn next(&mut self) -> Result<Option<RecordBatch>> {
+    pub fn next_batch<I>(&self, value_iter: &mut I) -> Result<Option<RecordBatch>>
+    where
+        I: Iterator<Item = Result<Value>>,
+    {
         let mut rows: Vec<Value> = Vec::with_capacity(self.batch_size);
-        let mut line = String::new();
-        for _ in 0..self.batch_size {
-            let bytes_read = self.reader.read_line(&mut line)?;
-            if bytes_read > 0 {
-                rows.push(serde_json::from_str(&line).map_err(|e| {
-                    ArrowError::JsonError(format!("Not valid JSON: {}", e))
-                })?);
-                line = String::new();
-            } else {
-                break;
-            }
-        }
 
+        for value in value_iter.by_ref().take(self.batch_size) {
+            rows.push(value?);
+        }
         if rows.is_empty() {
             // reached end of file
             return Ok(None);
@@ -1097,6 +1168,57 @@ impl<R: Read> Reader<R> {
     }
 }
 
+/// JSON file reader
+#[derive(Debug)]
+pub struct Reader<R: Read> {
+    reader: BufReader<R>,
+    /// JSON value decoder
+    decoder: Decoder,
+}
+
+impl<R: Read> Reader<R> {
+    /// Create a new JSON Reader from any value that implements the `Read` trait.
+    ///
+    /// If reading a `File`, you can customise the Reader, such as to enable schema
+    /// inference, use `ReaderBuilder`.
+    pub fn new(
+        reader: R,
+        schema: SchemaRef,
+        batch_size: usize,
+        projection: Option<Vec<String>>,
+    ) -> Self {
+        Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection)
+    }
+
+    /// Create a new JSON Reader from a `BufReader<R: Read>`
+    ///
+    /// To customize the schema, such as to enable schema inference, use `ReaderBuilder`
+    pub fn from_buf_reader(
+        reader: BufReader<R>,
+        schema: SchemaRef,
+        batch_size: usize,
+        projection: Option<Vec<String>>,
+    ) -> Self {
+        Self {
+            reader,
+            decoder: Decoder::new(schema, batch_size, projection),
+        }
+    }
+
+    /// Returns the schema of the reader, useful for getting the schema without reading
+    /// record batches
+    pub fn schema(&self) -> SchemaRef {
+        self.decoder.schema()
+    }
+
+    /// Read the next batch of records
+    #[allow(clippy::should_implement_trait)]
+    pub fn next(&mut self) -> Result<Option<RecordBatch>> {
+        self.decoder
+            .next_batch(&mut ValueIter::new(&mut self.reader, None))
+    }
+}
+
 /// JSON file reader builder
 #[derive(Debug)]
 pub struct ReaderBuilder {
@@ -1183,7 +1305,10 @@ impl ReaderBuilder {
     }
 
     /// Create a new `Reader` from the `ReaderBuilder`
-    pub fn build<R: Read + Seek>(self, source: R) -> Result<Reader<R>> {
+    pub fn build<R>(self, source: R) -> Result<Reader<R>>
+    where
+        R: Read + Seek,
+    {
         let mut buf_reader = BufReader::new(source);
 
         // check if schema should be inferred
@@ -1211,6 +1336,7 @@ mod tests {
     use super::*;
     use flate2::read::GzDecoder;
     use std::fs::File;
+    use std::io::Cursor;
 
     #[test]
     fn test_json_basic() {
@@ -1481,13 +1607,34 @@ mod tests {
     }
 
     #[test]
-    #[should_panic(expected = "Not valid JSON")]
-    fn test_invalid_file() {
-        let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
+    fn test_invalid_json_infer_schema() {
+        let re = infer_json_schema_from_seekable(
+            &mut BufReader::new(
+                File::open("test/data/uk_cities_with_headers.csv").unwrap(),
+            ),
+            None,
+        );
+        assert_eq!(
+            re.err().unwrap().to_string(),
+            "Json error: Not valid JSON: expected value at line 1 column 1",
+        );
+    }
+
+    #[test]
+    fn test_invalid_json_read_record() {
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "a",
+            DataType::Struct(vec![Field::new("a", DataType::Utf8, true)]),
+            true,
+        )]));
+        let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
         let mut reader: Reader<File> = builder
             .build::<File>(File::open("test/data/uk_cities_with_headers.csv").unwrap())
             .unwrap();
-        let _batch = reader.next().unwrap().unwrap();
+        assert_eq!(
+            reader.next().err().unwrap().to_string(),
+            "Json error: Not valid JSON: expected value at line 1 column 1",
+        );
     }
 
     #[test]
@@ -1803,6 +1950,26 @@ mod tests {
     }
 
     #[test]
+    fn test_skip_empty_lines() {
+        let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
+        let json_content = "
+        {\"a\": 1}
+
+        {\"a\": 2}
+
+        {\"a\": 3}";
+        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(3, batch.num_rows());
+
+        let schema = reader.schema();
+        let c = schema.column_with_name("a").unwrap();
+        assert_eq!(&DataType::Int64, c.1.data_type());
+    }
+
+    #[test]
     fn test_list_of_string_dictionary_from_json() {
         let schema = Schema::new(vec![Field::new(
             "events",