You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/12 13:39:09 UTC

[arrow-rs] branch master updated: Allow json reader/decoder to work with format_strings for each field (#1451)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 68038f595 Allow json reader/decoder to work with format_strings for each field  (#1451)
68038f595 is described below

commit 68038f595b62202906d9a6235575b3a236c09546
Author: Sumit <su...@users.noreply.github.com>
AuthorDate: Tue Apr 12 15:39:05 2022 +0200

    Allow json reader/decoder to work with format_strings for each field  (#1451)
    
    * implement parser for remaining types used by json decoder
    
    * added format strings (hashmap) to json reader
    
    the format_string map's key is column name.
    The value will be used to parse the date64/date32 types from json
    if the read value is of string type
    
    add tests for formatted parser for date{32,64}type for json readers
    
    all-parsers start
    
    fixup! added format strings (hashmap) to json reader
    
    * add DecoderOptions struct for holding options for decoder
    
    that way later extensions to the decoder can be added to this struct
    without breaking API.
    
    * Fixup some comments
    
    * added test for string parsing json reader for time{32,64} types
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 arrow/src/json/reader.rs        | 178 ++++++++++++++++++++++++++++++++--------
 arrow/src/util/reader_parser.rs |  38 ++++++---
 arrow/test/data/basic.json      |   8 +-
 3 files changed, 171 insertions(+), 53 deletions(-)

diff --git a/arrow/src/json/reader.rs b/arrow/src/json/reader.rs
index 94b3ba060..ad953e49b 100644
--- a/arrow/src/json/reader.rs
+++ b/arrow/src/json/reader.rs
@@ -38,7 +38,13 @@
 //!
 //! let file = File::open("test/data/basic.json").unwrap();
 //!
-//! let mut json = json::Reader::new(BufReader::new(file), Arc::new(schema), 1024, None);
+//! let mut json = json::Reader::new(
+//!    BufReader::new(file),
+//!    Arc::new(schema),
+//!    1024,
+//!    Default::default()
+//! );
+//!
 //! let batch = json.next().unwrap().unwrap();
 //! ```
 
@@ -55,6 +61,7 @@ use crate::datatypes::*;
 use crate::error::{ArrowError, Result};
 use crate::record_batch::RecordBatch;
 use crate::util::bit_util;
+use crate::util::reader_parser::Parser;
 use crate::{array::*, buffer::Buffer};
 
 #[derive(Debug, Clone)]
@@ -563,7 +570,7 @@ where
 ///     BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
 /// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
 /// let batch_size = 1024;
-/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, None);
+/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, Default::default());
 ///
 /// // seek back to start so that the original file is usable again
 /// reader.seek(SeekFrom::Start(0)).unwrap();
@@ -576,31 +583,35 @@ where
 pub struct Decoder {
     /// Explicit schema for the JSON file
     schema: SchemaRef,
-    /// Optional projection for which columns to load (case-sensitive names)
-    projection: Option<Vec<String>>,
     /// Batch size (number of records to load each time)
     batch_size: usize,
+    /// This is a collection of options for json decoder
+    doptions: DecoderOptions,
+}
+
+#[derive(Default, Debug)]
+pub struct DecoderOptions {
+    /// Optional projection for which columns to load (case-sensitive names)
+    projection: Option<Vec<String>>,
+    /// optional HashMap of column names to its format string
+    format_strings: Option<HashMap<String, String>>,
 }
 
 impl Decoder {
     /// Create a new JSON decoder from any value that implements the `Iterator<Item=Result<Value>>`
     /// trait.
-    pub fn new(
-        schema: SchemaRef,
-        batch_size: usize,
-        projection: Option<Vec<String>>,
-    ) -> Self {
+    pub fn new(schema: SchemaRef, batch_size: usize, doptions: DecoderOptions) -> Self {
         Self {
             schema,
-            projection,
             batch_size,
+            doptions,
         }
     }
 
     /// Returns the schema of the reader, useful for getting the schema without reading
     /// record batches
     pub fn schema(&self) -> SchemaRef {
-        match &self.projection {
+        match &self.doptions.projection {
             Some(projection) => {
                 let fields = self.schema.fields();
                 let projected_fields: Vec<Field> = fields
@@ -645,7 +656,7 @@ impl Decoder {
         }
 
         let rows = &rows[..];
-        let projection = self.projection.clone().unwrap_or_default();
+        let projection = self.doptions.projection.clone().unwrap_or_default();
         let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);
 
         let projected_fields: Vec<Field> = if projection.is_empty() {
@@ -913,7 +924,7 @@ impl Decoder {
     }
 
     #[allow(clippy::unnecessary_wraps)]
-    fn build_primitive_array<T: ArrowPrimitiveType>(
+    fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
         &self,
         rows: &[Value],
         col_name: &str,
@@ -922,20 +933,30 @@ impl Decoder {
         T: ArrowNumericType,
         T::Native: num::NumCast,
     {
+        let format_string = self
+            .doptions
+            .format_strings
+            .as_ref()
+            .and_then(|fmts| fmts.get(col_name));
         Ok(Arc::new(
             rows.iter()
                 .map(|row| {
-                    row.get(&col_name)
-                        .and_then(|value| {
-                            if value.is_i64() {
-                                value.as_i64().map(num::cast::cast)
-                            } else if value.is_u64() {
-                                value.as_u64().map(num::cast::cast)
-                            } else {
-                                value.as_f64().map(num::cast::cast)
+                    row.get(&col_name).and_then(|value| {
+                        if value.is_i64() {
+                            value.as_i64().and_then(num::cast::cast)
+                        } else if value.is_u64() {
+                            value.as_u64().and_then(num::cast::cast)
+                        } else if value.is_string() {
+                            match format_string {
+                                Some(fmt) => {
+                                    T::parse_formatted(value.as_str().unwrap(), fmt)
+                                }
+                                None => T::parse(value.as_str().unwrap()),
                             }
-                        })
-                        .flatten()
+                        } else {
+                            value.as_f64().and_then(num::cast::cast)
+                        }
+                    })
                 })
                 .collect::<PrimitiveArray<T>>(),
         ))
@@ -1539,9 +1560,9 @@ impl<R: Read> Reader<R> {
         reader: R,
         schema: SchemaRef,
         batch_size: usize,
-        projection: Option<Vec<String>>,
+        doptions: DecoderOptions,
     ) -> Self {
-        Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection)
+        Self::from_buf_reader(BufReader::new(reader), schema, batch_size, doptions)
     }
 
     /// Create a new JSON Reader from a `BufReader<R: Read>`
@@ -1551,11 +1572,11 @@ impl<R: Read> Reader<R> {
         reader: BufReader<R>,
         schema: SchemaRef,
         batch_size: usize,
-        projection: Option<Vec<String>>,
+        doptions: DecoderOptions,
     ) -> Self {
         Self {
             reader,
-            decoder: Decoder::new(schema, batch_size, projection),
+            decoder: Decoder::new(schema, batch_size, doptions),
         }
     }
 
@@ -1591,6 +1612,8 @@ pub struct ReaderBuilder {
     batch_size: usize,
     /// Optional projection for which columns to load (zero-based column indices)
     projection: Option<Vec<String>>,
+    /// optional HashMap of column names to format strings
+    format_strings: Option<HashMap<String, String>>,
 }
 
 impl Default for ReaderBuilder {
@@ -1600,6 +1623,7 @@ impl Default for ReaderBuilder {
             max_records: None,
             batch_size: 1024,
             projection: None,
+            format_strings: None,
         }
     }
 }
@@ -1658,6 +1682,15 @@ impl ReaderBuilder {
         self
     }
 
+    /// Set the decoder's format Strings param
+    pub fn with_format_strings(
+        mut self,
+        format_strings: HashMap<String, String>,
+    ) -> Self {
+        self.format_strings = Some(format_strings);
+        self
+    }
+
     /// Create a new `Reader` from the `ReaderBuilder`
     pub fn build<R>(self, source: R) -> Result<Reader<R>>
     where
@@ -1678,7 +1711,10 @@ impl ReaderBuilder {
             buf_reader,
             schema,
             self.batch_size,
-            self.projection,
+            DecoderOptions {
+                projection: self.projection,
+                format_strings: self.format_strings,
+            },
         ))
     }
 }
@@ -1711,7 +1747,7 @@ mod tests {
             .unwrap();
         let batch = reader.next().unwrap().unwrap();
 
-        assert_eq!(4, batch.num_columns());
+        assert_eq!(5, batch.num_columns());
         assert_eq!(12, batch.num_rows());
 
         let schema = reader.schema();
@@ -1833,7 +1869,7 @@ mod tests {
             File::open("test/data/basic.json").unwrap(),
             Arc::new(schema.clone()),
             1024,
-            None,
+            Default::default(),
         );
         let reader_schema = reader.schema();
         assert_eq!(reader_schema, Arc::new(schema));
@@ -1870,6 +1906,41 @@ mod tests {
         assert_eq!(-3.5, bb.value(1));
     }
 
+    #[test]
+    fn test_json_format_strings_for_date() {
+        let schema =
+            Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, false)]));
+        let e = schema.column_with_name("e").unwrap();
+        assert_eq!(&DataType::Date32, e.1.data_type());
+        let mut fmts = HashMap::new();
+        let date_format = "%Y-%m-%d".to_string();
+        fmts.insert("e".to_string(), date_format.clone());
+
+        let mut reader: Reader<File> = Reader::new(
+            File::open("test/data/basic.json").unwrap(),
+            schema.clone(),
+            1024,
+            DecoderOptions {
+                format_strings: Some(fmts),
+                ..Default::default()
+            },
+        );
+        let reader_schema = reader.schema();
+        assert_eq!(reader_schema, schema);
+        let batch = reader.next().unwrap().unwrap();
+
+        let ee = batch
+            .column(e.0)
+            .as_any()
+            .downcast_ref::<Date32Array>()
+            .unwrap();
+        let dt = Date32Type::parse_formatted("1970-1-2", &date_format).unwrap();
+        assert_eq!(dt, ee.value(0));
+        let dt = Date32Type::parse_formatted("1969-12-31", &date_format).unwrap();
+        assert_eq!(dt, ee.value(1));
+        assert!(!ee.is_valid(2));
+    }
+
     #[test]
     fn test_json_basic_schema_projection() {
         // We test implicit and explicit projection:
@@ -1885,7 +1956,10 @@ mod tests {
             File::open("test/data/basic.json").unwrap(),
             Arc::new(schema),
             1024,
-            Some(vec!["a".to_string(), "c".to_string()]),
+            DecoderOptions {
+                projection: Some(vec!["a".to_string(), "c".to_string()]),
+                ..Default::default()
+            },
         );
         let reader_schema = reader.schema();
         let expected_schema = Arc::new(Schema::new(vec![
@@ -2052,7 +2126,8 @@ mod tests {
         file.seek(SeekFrom::Start(0)).unwrap();
 
         let reader = BufReader::new(GzDecoder::new(&file));
-        let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), 64, None);
+        let mut reader =
+            Reader::from_buf_reader(reader, Arc::new(schema), 64, Default::default());
         let batch_gz = reader.next().unwrap().unwrap();
 
         for batch in vec![batch, batch_gz] {
@@ -3081,6 +3156,37 @@ mod tests {
         assert_eq!(5, aa.value(7));
     }
 
+    #[test]
+    fn test_time_from_string() {
+        parse_string_column::<Time64NanosecondType>(4);
+        parse_string_column::<Time64MicrosecondType>(4);
+        parse_string_column::<Time32MillisecondType>(4);
+        parse_string_column::<Time32SecondType>(4);
+    }
+
+    fn parse_string_column<T>(value: T::Native)
+    where
+        T: ArrowPrimitiveType,
+    {
+        let schema = Schema::new(vec![Field::new("d", T::DATA_TYPE, true)]);
+
+        let builder = ReaderBuilder::new()
+            .with_schema(Arc::new(schema))
+            .with_batch_size(64);
+        let mut reader: Reader<File> = builder
+            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
+            .unwrap();
+
+        let batch = reader.next().unwrap().unwrap();
+        let dd = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<PrimitiveArray<T>>()
+            .unwrap();
+        assert_eq!(value, dd.value(1));
+        assert!(!dd.is_valid(2));
+    }
+
     #[test]
     fn test_json_read_nested_list() {
         let schema = Schema::new(vec![Field::new(
@@ -3093,7 +3199,7 @@ mod tests {
             true,
         )]);
 
-        let decoder = Decoder::new(Arc::new(schema), 1024, None);
+        let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
         let batch = decoder
             .next_batch(
                 &mut vec![
@@ -3128,7 +3234,7 @@ mod tests {
             true,
         )]);
 
-        let decoder = Decoder::new(Arc::new(schema), 1024, None);
+        let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
         let batch = decoder
             .next_batch(
                 // NOTE: total struct element count needs to be greater than
@@ -3157,7 +3263,7 @@ mod tests {
     #[test]
     fn test_json_read_binary_structs() {
         let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]);
-        let decoder = Decoder::new(Arc::new(schema), 1024, None);
+        let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
         let batch = decoder
             .next_batch(
                 &mut vec![
@@ -3200,7 +3306,7 @@ mod tests {
         let mut sum_a = 0;
         for batch in reader {
             let batch = batch.unwrap();
-            assert_eq!(4, batch.num_columns());
+            assert_eq!(5, batch.num_columns());
             sum_num_rows += batch.num_rows();
             num_batches += 1;
             let batch_schema = batch.schema();
diff --git a/arrow/src/util/reader_parser.rs b/arrow/src/util/reader_parser.rs
index 591a3aedf..6b6f24f82 100644
--- a/arrow/src/util/reader_parser.rs
+++ b/arrow/src/util/reader_parser.rs
@@ -60,27 +60,39 @@ impl Parser for Int8Type {}
 
 impl Parser for TimestampNanosecondType {
     fn parse(string: &str) -> Option<i64> {
-        match Self::DATA_TYPE {
-            DataType::Timestamp(TimeUnit::Nanosecond, None) => {
-                string_to_timestamp_nanos(string).ok()
-            }
-            _ => None,
-        }
+        string_to_timestamp_nanos(string).ok()
     }
 }
 
 impl Parser for TimestampMicrosecondType {
     fn parse(string: &str) -> Option<i64> {
-        match Self::DATA_TYPE {
-            DataType::Timestamp(TimeUnit::Microsecond, None) => {
-                let nanos = string_to_timestamp_nanos(string).ok();
-                nanos.map(|x| x / 1000)
-            }
-            _ => None,
-        }
+        let nanos = string_to_timestamp_nanos(string).ok();
+        nanos.map(|x| x / 1000)
+    }
+}
+
+impl Parser for TimestampMillisecondType {
+    fn parse(string: &str) -> Option<i64> {
+        let nanos = string_to_timestamp_nanos(string).ok();
+        nanos.map(|x| x / 1_000_000)
+    }
+}
+
+impl Parser for TimestampSecondType {
+    fn parse(string: &str) -> Option<i64> {
+        let nanos = string_to_timestamp_nanos(string).ok();
+        nanos.map(|x| x / 1_000_000_000)
     }
 }
 
+impl Parser for Time64NanosecondType {}
+
+impl Parser for Time64MicrosecondType {}
+
+impl Parser for Time32MillisecondType {}
+
+impl Parser for Time32SecondType {}
+
 /// Number of days between 0001-01-01 and 1970-01-01
 const EPOCH_DAYS_FROM_CE: i32 = 719_163;
 
diff --git a/arrow/test/data/basic.json b/arrow/test/data/basic.json
index dafd2dd2e..556c39c46 100644
--- a/arrow/test/data/basic.json
+++ b/arrow/test/data/basic.json
@@ -1,6 +1,6 @@
-{"a":1, "b":2.0, "c":false, "d":"4"}
-{"a":-10, "b":-3.5, "c":true, "d":"4"}
-{"a":2, "b":0.6, "c":false, "d":"text"}
+{"a":1, "b":2.0, "c":false, "d":"4", "e":"1970-1-2"}
+{"a":-10, "b":-3.5, "c":true, "d":"4", "e": "1969-12-31"}
+{"a":2, "b":0.6, "c":false, "d":"text", "e": "1970-01-02 11:11:11"}
 {"a":1, "b":2.0, "c":false, "d":"4"}
 {"a":7, "b":-3.5, "c":true, "d":"4"}
 {"a":1, "b":0.6, "c":false, "d":"text"}
@@ -9,4 +9,4 @@
 {"a":1, "b":0.6, "c":false, "d":"text"}
 {"a":1, "b":2.0, "c":false, "d":"4"}
 {"a":1, "b":-3.5, "c":true, "d":"4"}
-{"a":100000000000000, "b":0.6, "c":false, "d":"text"}
\ No newline at end of file
+{"a":100000000000000, "b":0.6, "c":false, "d":"text"}