You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2020/12/09 19:41:55 UTC
[arrow] branch master updated: ARROW-10842 [Rust] decouple IO from
json reader, fix crash during json schema inference with invalid json
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 3deae8d ARROW-10842 [Rust] decouple IO from json reader, fix crash during json schema inference with invalid json
3deae8d is described below
commit 3deae8dd50da773ba215704e567d9937b04b02c5
Author: Qingping Hou <da...@gmail.com>
AuthorDate: Wed Dec 9 14:40:44 2020 -0500
ARROW-10842 [Rust] decouple IO from json reader, fix crash during json schema inference with invalid json
Other related changes:
* reuse the same IO code between schema inference and record batch reader.
* reuse string buffer to avoid repeatitive memory allocation
* ignore new lines when reading json records
* avoid hard crash during JSON schema inference caused by invalid json
Closes #8867 from houqp/qp_json
Authored-by: Qingping Hou <da...@gmail.com>
Signed-off-by: Andrew Lamb <an...@nerdnetworks.org>
---
rust/arrow/src/json/reader.rs | 283 +++++++++++++++++++++++++++++++++---------
1 file changed, 225 insertions(+), 58 deletions(-)
diff --git a/rust/arrow/src/json/reader.rs b/rust/arrow/src/json/reader.rs
index 118c52c..873baf8 100644
--- a/rust/arrow/src/json/reader.rs
+++ b/rust/arrow/src/json/reader.rs
@@ -189,6 +189,82 @@ fn generate_schema(spec: HashMap<String, HashSet<DataType>>) -> Result<SchemaRef
}
}
+/// JSON file reader that produces a serde_json::Value iterator from a Read trait
+///
+/// # Example
+///
+/// ```
+/// use std::fs::File;
+/// use std::io::BufReader;
+/// use arrow::json::reader::ValueIter;
+///
+/// let mut reader =
+/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
+/// let mut value_reader = ValueIter::new(&mut reader, None);
+/// for value in value_reader {
+/// println!("JSON value: {}", value.unwrap());
+/// }
+/// ```
+#[derive(Debug)]
+pub struct ValueIter<'a, R: Read> {
+ reader: &'a mut BufReader<R>,
+ max_read_records: Option<usize>,
+ record_count: usize,
+ // reuse line buffer to avoid allocation on each record
+ line_buf: String,
+}
+
+impl<'a, R: Read> ValueIter<'a, R> {
+ pub fn new(reader: &'a mut BufReader<R>, max_read_records: Option<usize>) -> Self {
+ Self {
+ reader,
+ max_read_records,
+ record_count: 0,
+ line_buf: String::new(),
+ }
+ }
+}
+
+impl<'a, R: Read> Iterator for ValueIter<'a, R> {
+ type Item = Result<Value>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if let Some(max) = self.max_read_records {
+ if self.record_count >= max {
+ return None;
+ }
+ }
+
+ loop {
+ self.line_buf.truncate(0);
+ match self.reader.read_line(&mut self.line_buf) {
+ Ok(0) => {
+ // read_line returns 0 when stream reached EOF
+ return None;
+ }
+ Err(e) => {
+ return Some(Err(ArrowError::JsonError(format!(
+ "Failed to read JSON record: {}",
+ e
+ ))));
+ }
+ _ => {
+ let trimmed_s = self.line_buf.trim();
+ if trimmed_s.is_empty() {
+ // ignore empty lines
+ continue;
+ }
+
+ self.record_count += 1;
+ return Some(serde_json::from_str(trimmed_s).map_err(|e| {
+ ArrowError::JsonError(format!("Not valid JSON: {}", e))
+ }));
+ }
+ }
+ }
+ }
+}
+
/// Infer the fields of a JSON file by reading the first n records of the file, with
/// `max_read_records` controlling the maximum number of records to read.
///
@@ -250,19 +326,18 @@ pub fn infer_json_schema<R: Read>(
reader: &mut BufReader<R>,
max_read_records: Option<usize>,
) -> Result<SchemaRef> {
- let mut values: HashMap<String, HashSet<DataType>> = HashMap::new();
-
- let mut line = String::new();
- for _ in 0..max_read_records.unwrap_or(std::usize::MAX) {
- reader.read_line(&mut line)?;
- if line.is_empty() {
- break;
- }
- let record: Value = serde_json::from_str(&line.trim()).expect("Not valid JSON");
+ infer_json_schema_from_iterator(ValueIter::new(reader, max_read_records))
+}
- line = String::new();
+/// Infer the fields of a JSON file by reading all items from the JSON Value Iterator.
+pub fn infer_json_schema_from_iterator<I>(value_iter: I) -> Result<SchemaRef>
+where
+ I: Iterator<Item = Result<Value>>,
+{
+ let mut values: HashMap<String, HashSet<DataType>> = HashMap::new();
- match record {
+ for record in value_iter {
+ match record? {
Value::Object(map) => {
let res = map.iter().try_for_each(|(k, v)| {
match v {
@@ -397,31 +472,51 @@ pub fn infer_json_schema<R: Read>(
generate_schema(values)
}
-/// JSON file reader
+/// JSON values to Arrow record batch decoder. Decoder's next_batch method takes a JSON Value
+/// iterator as input and outputs Arrow record batch.
+///
+/// # Examples
+/// ```
+/// use arrow::json::reader::{Decoder, ValueIter, infer_json_schema};
+/// use std::fs::File;
+/// use std::io::{BufReader, Seek, SeekFrom};
+///
+/// let mut reader =
+/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
+/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
+/// let batch_size = 1024;
+/// let decoder = Decoder::new(inferred_schema, batch_size, None);
+///
+/// // seek back to start so that the original file is usable again
+/// reader.seek(SeekFrom::Start(0)).unwrap();
+/// let mut value_reader = ValueIter::new(&mut reader, None);
+/// let batch = decoder.next_batch(&mut value_reader).unwrap().unwrap();
+/// assert_eq!(4, batch.num_rows());
+/// assert_eq!(4, batch.num_columns());
+/// ```
#[derive(Debug)]
-pub struct Reader<R: Read> {
+pub struct Decoder {
/// Explicit schema for the JSON file
schema: SchemaRef,
/// Optional projection for which columns to load (case-sensitive names)
projection: Option<Vec<String>>,
- /// File reader
- reader: BufReader<R>,
/// Batch size (number of records to load each time)
batch_size: usize,
}
-impl<R: Read> Reader<R> {
- /// Create a new JSON Reader from any value that implements the `Read` trait.
- ///
- /// If reading a `File`, you can customise the Reader, such as to enable schema
- /// inference, use `ReaderBuilder`.
+impl Decoder {
+ /// Create a new JSON decoder from any value that implements the `Iterator<Item=Result<Value>>`
+ /// trait.
pub fn new(
- reader: R,
schema: SchemaRef,
batch_size: usize,
projection: Option<Vec<String>>,
) -> Self {
- Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection)
+ Self {
+ schema,
+ projection,
+ batch_size,
+ }
}
/// Returns the schema of the reader, useful for getting the schema without reading
@@ -447,40 +542,16 @@ impl<R: Read> Reader<R> {
}
}
- /// Create a new JSON Reader from a `BufReader<R: Read>`
- ///
- /// To customize the schema, such as to enable schema inference, use `ReaderBuilder`
- pub fn from_buf_reader(
- reader: BufReader<R>,
- schema: SchemaRef,
- batch_size: usize,
- projection: Option<Vec<String>>,
- ) -> Self {
- Self {
- schema,
- projection,
- reader,
- batch_size,
- }
- }
-
/// Read the next batch of records
- #[allow(clippy::should_implement_trait)]
- pub fn next(&mut self) -> Result<Option<RecordBatch>> {
+ pub fn next_batch<I>(&self, value_iter: &mut I) -> Result<Option<RecordBatch>>
+ where
+ I: Iterator<Item = Result<Value>>,
+ {
let mut rows: Vec<Value> = Vec::with_capacity(self.batch_size);
- let mut line = String::new();
- for _ in 0..self.batch_size {
- let bytes_read = self.reader.read_line(&mut line)?;
- if bytes_read > 0 {
- rows.push(serde_json::from_str(&line).map_err(|e| {
- ArrowError::JsonError(format!("Not valid JSON: {}", e))
- })?);
- line = String::new();
- } else {
- break;
- }
- }
+ for value in value_iter.by_ref().take(self.batch_size) {
+ rows.push(value?);
+ }
if rows.is_empty() {
// reached end of file
return Ok(None);
@@ -1097,6 +1168,57 @@ impl<R: Read> Reader<R> {
}
}
+/// JSON file reader
+#[derive(Debug)]
+pub struct Reader<R: Read> {
+ reader: BufReader<R>,
+ /// JSON value decoder
+ decoder: Decoder,
+}
+
+impl<R: Read> Reader<R> {
+ /// Create a new JSON Reader from any value that implements the `Read` trait.
+ ///
+ /// If reading a `File`, you can customise the Reader, such as to enable schema
+ /// inference, use `ReaderBuilder`.
+ pub fn new(
+ reader: R,
+ schema: SchemaRef,
+ batch_size: usize,
+ projection: Option<Vec<String>>,
+ ) -> Self {
+ Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection)
+ }
+
+ /// Create a new JSON Reader from a `BufReader<R: Read>`
+ ///
+ /// To customize the schema, such as to enable schema inference, use `ReaderBuilder`
+ pub fn from_buf_reader(
+ reader: BufReader<R>,
+ schema: SchemaRef,
+ batch_size: usize,
+ projection: Option<Vec<String>>,
+ ) -> Self {
+ Self {
+ reader,
+ decoder: Decoder::new(schema, batch_size, projection),
+ }
+ }
+
+ /// Returns the schema of the reader, useful for getting the schema without reading
+ /// record batches
+ pub fn schema(&self) -> SchemaRef {
+ self.decoder.schema()
+ }
+
+ /// Read the next batch of records
+ #[allow(clippy::should_implement_trait)]
+ pub fn next(&mut self) -> Result<Option<RecordBatch>> {
+ self.decoder
+ .next_batch(&mut ValueIter::new(&mut self.reader, None))
+ }
+}
+
/// JSON file reader builder
#[derive(Debug)]
pub struct ReaderBuilder {
@@ -1183,7 +1305,10 @@ impl ReaderBuilder {
}
/// Create a new `Reader` from the `ReaderBuilder`
- pub fn build<R: Read + Seek>(self, source: R) -> Result<Reader<R>> {
+ pub fn build<R>(self, source: R) -> Result<Reader<R>>
+ where
+ R: Read + Seek,
+ {
let mut buf_reader = BufReader::new(source);
// check if schema should be inferred
@@ -1211,6 +1336,7 @@ mod tests {
use super::*;
use flate2::read::GzDecoder;
use std::fs::File;
+ use std::io::Cursor;
#[test]
fn test_json_basic() {
@@ -1481,13 +1607,34 @@ mod tests {
}
#[test]
- #[should_panic(expected = "Not valid JSON")]
- fn test_invalid_file() {
- let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
+ fn test_invalid_json_infer_schema() {
+ let re = infer_json_schema_from_seekable(
+ &mut BufReader::new(
+ File::open("test/data/uk_cities_with_headers.csv").unwrap(),
+ ),
+ None,
+ );
+ assert_eq!(
+ re.err().unwrap().to_string(),
+ "Json error: Not valid JSON: expected value at line 1 column 1",
+ );
+ }
+
+ #[test]
+ fn test_invalid_json_read_record() {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ DataType::Struct(vec![Field::new("a", DataType::Utf8, true)]),
+ true,
+ )]));
+ let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/uk_cities_with_headers.csv").unwrap())
.unwrap();
- let _batch = reader.next().unwrap().unwrap();
+ assert_eq!(
+ reader.next().err().unwrap().to_string(),
+ "Json error: Not valid JSON: expected value at line 1 column 1",
+ );
}
#[test]
@@ -1803,6 +1950,26 @@ mod tests {
}
#[test]
+ fn test_skip_empty_lines() {
+ let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
+ let json_content = "
+ {\"a\": 1}
+
+ {\"a\": 2}
+
+ {\"a\": 3}";
+ let mut reader = builder.build(Cursor::new(json_content)).unwrap();
+ let batch = reader.next().unwrap().unwrap();
+
+ assert_eq!(1, batch.num_columns());
+ assert_eq!(3, batch.num_rows());
+
+ let schema = reader.schema();
+ let c = schema.column_with_name("a").unwrap();
+ assert_eq!(&DataType::Int64, c.1.data_type());
+ }
+
+ #[test]
fn test_list_of_string_dictionary_from_json() {
let schema = Schema::new(vec![Field::new(
"events",