You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/13 12:51:20 UTC

[arrow-rs] branch master updated: Conslidate JSON reader options (#1539)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 228ee36ee Conslidate JSON reader options (#1539)
228ee36ee is described below

commit 228ee36eeec26867a951c6a381e3e129765e21cf
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Apr 13 08:51:15 2022 -0400

    Conslidate JSON reader options (#1539)
---
 arrow/src/json/reader.rs | 153 ++++++++++++++++++++++-------------------------
 1 file changed, 73 insertions(+), 80 deletions(-)

diff --git a/arrow/src/json/reader.rs b/arrow/src/json/reader.rs
index ad953e49b..9334e1839 100644
--- a/arrow/src/json/reader.rs
+++ b/arrow/src/json/reader.rs
@@ -41,8 +41,7 @@
 //! let mut json = json::Reader::new(
 //!    BufReader::new(file),
 //!    Arc::new(schema),
-//!    1024,
-//!    Default::default()
+//!    json::reader::DecoderOptions::new(),
 //! );
 //!
 //! let batch = json.next().unwrap().unwrap();
@@ -561,7 +560,7 @@ where
 ///
 /// # Examples
 /// ```
-/// use arrow::json::reader::{Decoder, ValueIter, infer_json_schema};
+/// use arrow::json::reader::{Decoder, DecoderOptions, ValueIter, infer_json_schema};
 /// use std::fs::File;
 /// use std::io::{BufReader, Seek, SeekFrom};
 /// use std::sync::Arc;
@@ -569,8 +568,9 @@ where
 /// let mut reader =
 ///     BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
 /// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
-/// let batch_size = 1024;
-/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, Default::default());
+/// let options = DecoderOptions::new()
+///     .with_batch_size(1024);
+/// let decoder = Decoder::new(Arc::new(inferred_schema), options);
 ///
 /// // seek back to start so that the original file is usable again
 /// reader.seek(SeekFrom::Start(0)).unwrap();
@@ -583,35 +583,68 @@ where
 pub struct Decoder {
     /// Explicit schema for the JSON file
     schema: SchemaRef,
-    /// Batch size (number of records to load each time)
-    batch_size: usize,
     /// This is a collection of options for json decoder
-    doptions: DecoderOptions,
+    options: DecoderOptions,
 }
 
-#[derive(Default, Debug)]
+#[derive(Debug)]
 pub struct DecoderOptions {
+    /// Batch size (number of records to load each time), defaults to 1024 records
+    batch_size: usize,
     /// Optional projection for which columns to load (case-sensitive names)
     projection: Option<Vec<String>>,
     /// optional HashMap of column names to its format string
     format_strings: Option<HashMap<String, String>>,
 }
 
+impl Default for DecoderOptions {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            projection: None,
+            format_strings: None,
+        }
+    }
+}
+
+impl DecoderOptions {
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Set the batch size (number of records to load at one time)
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
+    /// Set the reader's column projection
+    pub fn with_projection(mut self, projection: Vec<String>) -> Self {
+        self.projection = Some(projection);
+        self
+    }
+
+    /// Set the decoder's format Strings param
+    pub fn with_format_strings(
+        mut self,
+        format_strings: HashMap<String, String>,
+    ) -> Self {
+        self.format_strings = Some(format_strings);
+        self
+    }
+}
+
 impl Decoder {
     /// Create a new JSON decoder from any value that implements the `Iterator<Item=Result<Value>>`
     /// trait.
-    pub fn new(schema: SchemaRef, batch_size: usize, doptions: DecoderOptions) -> Self {
-        Self {
-            schema,
-            batch_size,
-            doptions,
-        }
+    pub fn new(schema: SchemaRef, options: DecoderOptions) -> Self {
+        Self { schema, options }
     }
 
     /// Returns the schema of the reader, useful for getting the schema without reading
     /// record batches
     pub fn schema(&self) -> SchemaRef {
-        match &self.doptions.projection {
+        match &self.options.projection {
             Some(projection) => {
                 let fields = self.schema.fields();
                 let projected_fields: Vec<Field> = fields
@@ -636,9 +669,10 @@ impl Decoder {
     where
         I: Iterator<Item = Result<Value>>,
     {
-        let mut rows: Vec<Value> = Vec::with_capacity(self.batch_size);
+        let batch_size = self.options.batch_size;
+        let mut rows: Vec<Value> = Vec::with_capacity(batch_size);
 
-        for value in value_iter.by_ref().take(self.batch_size) {
+        for value in value_iter.by_ref().take(batch_size) {
             let v = value?;
             match v {
                 Value::Object(_) => rows.push(v),
@@ -656,7 +690,7 @@ impl Decoder {
         }
 
         let rows = &rows[..];
-        let projection = self.doptions.projection.clone().unwrap_or_default();
+        let projection = self.options.projection.clone().unwrap_or_default();
         let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);
 
         let projected_fields: Vec<Field> = if projection.is_empty() {
@@ -934,7 +968,7 @@ impl Decoder {
         T::Native: num::NumCast,
     {
         let format_string = self
-            .doptions
+            .options
             .format_strings
             .as_ref()
             .and_then(|fmts| fmts.get(col_name));
@@ -1556,13 +1590,8 @@ impl<R: Read> Reader<R> {
     ///
     /// If reading a `File`, you can customise the Reader, such as to enable schema
     /// inference, use `ReaderBuilder`.
-    pub fn new(
-        reader: R,
-        schema: SchemaRef,
-        batch_size: usize,
-        doptions: DecoderOptions,
-    ) -> Self {
-        Self::from_buf_reader(BufReader::new(reader), schema, batch_size, doptions)
+    pub fn new(reader: R, schema: SchemaRef, options: DecoderOptions) -> Self {
+        Self::from_buf_reader(BufReader::new(reader), schema, options)
     }
 
     /// Create a new JSON Reader from a `BufReader<R: Read>`
@@ -1571,12 +1600,11 @@ impl<R: Read> Reader<R> {
     pub fn from_buf_reader(
         reader: BufReader<R>,
         schema: SchemaRef,
-        batch_size: usize,
-        doptions: DecoderOptions,
+        options: DecoderOptions,
     ) -> Self {
         Self {
             reader,
-            decoder: Decoder::new(schema, batch_size, doptions),
+            decoder: Decoder::new(schema, options),
         }
     }
 
@@ -1595,7 +1623,7 @@ impl<R: Read> Reader<R> {
 }
 
 /// JSON file reader builder
-#[derive(Debug)]
+#[derive(Debug, Default)]
 pub struct ReaderBuilder {
     /// Optional schema for the JSON file
     ///
@@ -1606,26 +1634,8 @@ pub struct ReaderBuilder {
     ///
     /// If a number is not provided, all the records are read.
     max_records: Option<usize>,
-    /// Batch size (number of records to load each time)
-    ///
-    /// The default batch size when using the `ReaderBuilder` is 1024 records
-    batch_size: usize,
-    /// Optional projection for which columns to load (zero-based column indices)
-    projection: Option<Vec<String>>,
-    /// optional HashMap of column names to format strings
-    format_strings: Option<HashMap<String, String>>,
-}
-
-impl Default for ReaderBuilder {
-    fn default() -> Self {
-        Self {
-            schema: None,
-            max_records: None,
-            batch_size: 1024,
-            projection: None,
-            format_strings: None,
-        }
-    }
+    /// Options for json decoder
+    options: DecoderOptions,
 }
 
 impl ReaderBuilder {
@@ -1672,13 +1682,13 @@ impl ReaderBuilder {
 
     /// Set the batch size (number of records to load at one time)
     pub fn with_batch_size(mut self, batch_size: usize) -> Self {
-        self.batch_size = batch_size;
+        self.options = self.options.with_batch_size(batch_size);
         self
     }
 
     /// Set the reader's column projection
     pub fn with_projection(mut self, projection: Vec<String>) -> Self {
-        self.projection = Some(projection);
+        self.options = self.options.with_projection(projection);
         self
     }
 
@@ -1687,7 +1697,7 @@ impl ReaderBuilder {
         mut self,
         format_strings: HashMap<String, String>,
     ) -> Self {
-        self.format_strings = Some(format_strings);
+        self.options = self.options.with_format_strings(format_strings);
         self
     }
 
@@ -1707,15 +1717,7 @@ impl ReaderBuilder {
             )?),
         };
 
-        Ok(Reader::from_buf_reader(
-            buf_reader,
-            schema,
-            self.batch_size,
-            DecoderOptions {
-                projection: self.projection,
-                format_strings: self.format_strings,
-            },
-        ))
+        Ok(Reader::from_buf_reader(buf_reader, schema, self.options))
     }
 }
 
@@ -1868,8 +1870,7 @@ mod tests {
         let mut reader: Reader<File> = Reader::new(
             File::open("test/data/basic.json").unwrap(),
             Arc::new(schema.clone()),
-            1024,
-            Default::default(),
+            DecoderOptions::new(),
         );
         let reader_schema = reader.schema();
         assert_eq!(reader_schema, Arc::new(schema));
@@ -1919,11 +1920,7 @@ mod tests {
         let mut reader: Reader<File> = Reader::new(
             File::open("test/data/basic.json").unwrap(),
             schema.clone(),
-            1024,
-            DecoderOptions {
-                format_strings: Some(fmts),
-                ..Default::default()
-            },
+            DecoderOptions::new().with_format_strings(fmts),
         );
         let reader_schema = reader.schema();
         assert_eq!(reader_schema, schema);
@@ -1955,11 +1952,7 @@ mod tests {
         let mut reader: Reader<File> = Reader::new(
             File::open("test/data/basic.json").unwrap(),
             Arc::new(schema),
-            1024,
-            DecoderOptions {
-                projection: Some(vec!["a".to_string(), "c".to_string()]),
-                ..Default::default()
-            },
+            DecoderOptions::new().with_projection(vec!["a".to_string(), "c".to_string()]),
         );
         let reader_schema = reader.schema();
         let expected_schema = Arc::new(Schema::new(vec![
@@ -2126,8 +2119,8 @@ mod tests {
         file.seek(SeekFrom::Start(0)).unwrap();
 
         let reader = BufReader::new(GzDecoder::new(&file));
-        let mut reader =
-            Reader::from_buf_reader(reader, Arc::new(schema), 64, Default::default());
+        let options = DecoderOptions::new().with_batch_size(64);
+        let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), options);
         let batch_gz = reader.next().unwrap().unwrap();
 
         for batch in vec![batch, batch_gz] {
@@ -3199,7 +3192,7 @@ mod tests {
             true,
         )]);
 
-        let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
+        let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
         let batch = decoder
             .next_batch(
                 &mut vec![
@@ -3234,7 +3227,7 @@ mod tests {
             true,
         )]);
 
-        let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
+        let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
         let batch = decoder
             .next_batch(
                 // NOTE: total struct element count needs to be greater than
@@ -3263,7 +3256,7 @@ mod tests {
     #[test]
     fn test_json_read_binary_structs() {
         let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]);
-        let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
+        let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
         let batch = decoder
             .next_batch(
                 &mut vec![