You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/12 13:39:09 UTC
[arrow-rs] branch master updated: Allow json reader/decoder to work with format_strings for each field (#1451)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 68038f595 Allow json reader/decoder to work with format_strings for each field (#1451)
68038f595 is described below
commit 68038f595b62202906d9a6235575b3a236c09546
Author: Sumit <su...@users.noreply.github.com>
AuthorDate: Tue Apr 12 15:39:05 2022 +0200
Allow json reader/decoder to work with format_strings for each field (#1451)
* implement parser for remaining types used by json decoder
* added format strings (hashmap) to json reader
the format_string map's key is column name.
The value will be used to parse the date64/date32 types from json
if the read value is of string type
add tests for formatted parser for date{32,64}type for json readers
all-parsers start
fixup! added format strings (hashmap) to json reader
* add DecoderOptions struct for holding options for decoder
that way later extensions to the decoder can be added to this struct
without breaking API.
* Fixup some comments
* added test for string parsing json reader for time{32,64} types
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
arrow/src/json/reader.rs | 178 ++++++++++++++++++++++++++++++++--------
arrow/src/util/reader_parser.rs | 38 ++++++---
arrow/test/data/basic.json | 8 +-
3 files changed, 171 insertions(+), 53 deletions(-)
diff --git a/arrow/src/json/reader.rs b/arrow/src/json/reader.rs
index 94b3ba060..ad953e49b 100644
--- a/arrow/src/json/reader.rs
+++ b/arrow/src/json/reader.rs
@@ -38,7 +38,13 @@
//!
//! let file = File::open("test/data/basic.json").unwrap();
//!
-//! let mut json = json::Reader::new(BufReader::new(file), Arc::new(schema), 1024, None);
+//! let mut json = json::Reader::new(
+//! BufReader::new(file),
+//! Arc::new(schema),
+//! 1024,
+//! Default::default()
+//! );
+//!
//! let batch = json.next().unwrap().unwrap();
//! ```
@@ -55,6 +61,7 @@ use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
use crate::util::bit_util;
+use crate::util::reader_parser::Parser;
use crate::{array::*, buffer::Buffer};
#[derive(Debug, Clone)]
@@ -563,7 +570,7 @@ where
/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
/// let batch_size = 1024;
-/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, None);
+/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, Default::default());
///
/// // seek back to start so that the original file is usable again
/// reader.seek(SeekFrom::Start(0)).unwrap();
@@ -576,31 +583,35 @@ where
pub struct Decoder {
/// Explicit schema for the JSON file
schema: SchemaRef,
- /// Optional projection for which columns to load (case-sensitive names)
- projection: Option<Vec<String>>,
/// Batch size (number of records to load each time)
batch_size: usize,
+ /// This is a collection of options for json decoder
+ doptions: DecoderOptions,
+}
+
+#[derive(Default, Debug)]
+pub struct DecoderOptions {
+ /// Optional projection for which columns to load (case-sensitive names)
+ projection: Option<Vec<String>>,
+ /// optional HashMap of column names to its format string
+ format_strings: Option<HashMap<String, String>>,
}
impl Decoder {
/// Create a new JSON decoder from any value that implements the `Iterator<Item=Result<Value>>`
/// trait.
- pub fn new(
- schema: SchemaRef,
- batch_size: usize,
- projection: Option<Vec<String>>,
- ) -> Self {
+ pub fn new(schema: SchemaRef, batch_size: usize, doptions: DecoderOptions) -> Self {
Self {
schema,
- projection,
batch_size,
+ doptions,
}
}
/// Returns the schema of the reader, useful for getting the schema without reading
/// record batches
pub fn schema(&self) -> SchemaRef {
- match &self.projection {
+ match &self.doptions.projection {
Some(projection) => {
let fields = self.schema.fields();
let projected_fields: Vec<Field> = fields
@@ -645,7 +656,7 @@ impl Decoder {
}
let rows = &rows[..];
- let projection = self.projection.clone().unwrap_or_default();
+ let projection = self.doptions.projection.clone().unwrap_or_default();
let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);
let projected_fields: Vec<Field> = if projection.is_empty() {
@@ -913,7 +924,7 @@ impl Decoder {
}
#[allow(clippy::unnecessary_wraps)]
- fn build_primitive_array<T: ArrowPrimitiveType>(
+ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
&self,
rows: &[Value],
col_name: &str,
@@ -922,20 +933,30 @@ impl Decoder {
T: ArrowNumericType,
T::Native: num::NumCast,
{
+ let format_string = self
+ .doptions
+ .format_strings
+ .as_ref()
+ .and_then(|fmts| fmts.get(col_name));
Ok(Arc::new(
rows.iter()
.map(|row| {
- row.get(&col_name)
- .and_then(|value| {
- if value.is_i64() {
- value.as_i64().map(num::cast::cast)
- } else if value.is_u64() {
- value.as_u64().map(num::cast::cast)
- } else {
- value.as_f64().map(num::cast::cast)
+ row.get(&col_name).and_then(|value| {
+ if value.is_i64() {
+ value.as_i64().and_then(num::cast::cast)
+ } else if value.is_u64() {
+ value.as_u64().and_then(num::cast::cast)
+ } else if value.is_string() {
+ match format_string {
+ Some(fmt) => {
+ T::parse_formatted(value.as_str().unwrap(), fmt)
+ }
+ None => T::parse(value.as_str().unwrap()),
}
- })
- .flatten()
+ } else {
+ value.as_f64().and_then(num::cast::cast)
+ }
+ })
})
.collect::<PrimitiveArray<T>>(),
))
@@ -1539,9 +1560,9 @@ impl<R: Read> Reader<R> {
reader: R,
schema: SchemaRef,
batch_size: usize,
- projection: Option<Vec<String>>,
+ doptions: DecoderOptions,
) -> Self {
- Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection)
+ Self::from_buf_reader(BufReader::new(reader), schema, batch_size, doptions)
}
/// Create a new JSON Reader from a `BufReader<R: Read>`
@@ -1551,11 +1572,11 @@ impl<R: Read> Reader<R> {
reader: BufReader<R>,
schema: SchemaRef,
batch_size: usize,
- projection: Option<Vec<String>>,
+ doptions: DecoderOptions,
) -> Self {
Self {
reader,
- decoder: Decoder::new(schema, batch_size, projection),
+ decoder: Decoder::new(schema, batch_size, doptions),
}
}
@@ -1591,6 +1612,8 @@ pub struct ReaderBuilder {
batch_size: usize,
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<String>>,
+ /// optional HashMap of column names to format strings
+ format_strings: Option<HashMap<String, String>>,
}
impl Default for ReaderBuilder {
@@ -1600,6 +1623,7 @@ impl Default for ReaderBuilder {
max_records: None,
batch_size: 1024,
projection: None,
+ format_strings: None,
}
}
}
@@ -1658,6 +1682,15 @@ impl ReaderBuilder {
self
}
+ /// Set the decoder's format Strings param
+ pub fn with_format_strings(
+ mut self,
+ format_strings: HashMap<String, String>,
+ ) -> Self {
+ self.format_strings = Some(format_strings);
+ self
+ }
+
/// Create a new `Reader` from the `ReaderBuilder`
pub fn build<R>(self, source: R) -> Result<Reader<R>>
where
@@ -1678,7 +1711,10 @@ impl ReaderBuilder {
buf_reader,
schema,
self.batch_size,
- self.projection,
+ DecoderOptions {
+ projection: self.projection,
+ format_strings: self.format_strings,
+ },
))
}
}
@@ -1711,7 +1747,7 @@ mod tests {
.unwrap();
let batch = reader.next().unwrap().unwrap();
- assert_eq!(4, batch.num_columns());
+ assert_eq!(5, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = reader.schema();
@@ -1833,7 +1869,7 @@ mod tests {
File::open("test/data/basic.json").unwrap(),
Arc::new(schema.clone()),
1024,
- None,
+ Default::default(),
);
let reader_schema = reader.schema();
assert_eq!(reader_schema, Arc::new(schema));
@@ -1870,6 +1906,41 @@ mod tests {
assert_eq!(-3.5, bb.value(1));
}
+ #[test]
+ fn test_json_format_strings_for_date() {
+ let schema =
+ Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, false)]));
+ let e = schema.column_with_name("e").unwrap();
+ assert_eq!(&DataType::Date32, e.1.data_type());
+ let mut fmts = HashMap::new();
+ let date_format = "%Y-%m-%d".to_string();
+ fmts.insert("e".to_string(), date_format.clone());
+
+ let mut reader: Reader<File> = Reader::new(
+ File::open("test/data/basic.json").unwrap(),
+ schema.clone(),
+ 1024,
+ DecoderOptions {
+ format_strings: Some(fmts),
+ ..Default::default()
+ },
+ );
+ let reader_schema = reader.schema();
+ assert_eq!(reader_schema, schema);
+ let batch = reader.next().unwrap().unwrap();
+
+ let ee = batch
+ .column(e.0)
+ .as_any()
+ .downcast_ref::<Date32Array>()
+ .unwrap();
+ let dt = Date32Type::parse_formatted("1970-1-2", &date_format).unwrap();
+ assert_eq!(dt, ee.value(0));
+ let dt = Date32Type::parse_formatted("1969-12-31", &date_format).unwrap();
+ assert_eq!(dt, ee.value(1));
+ assert!(!ee.is_valid(2));
+ }
+
#[test]
fn test_json_basic_schema_projection() {
// We test implicit and explicit projection:
@@ -1885,7 +1956,10 @@ mod tests {
File::open("test/data/basic.json").unwrap(),
Arc::new(schema),
1024,
- Some(vec!["a".to_string(), "c".to_string()]),
+ DecoderOptions {
+ projection: Some(vec!["a".to_string(), "c".to_string()]),
+ ..Default::default()
+ },
);
let reader_schema = reader.schema();
let expected_schema = Arc::new(Schema::new(vec![
@@ -2052,7 +2126,8 @@ mod tests {
file.seek(SeekFrom::Start(0)).unwrap();
let reader = BufReader::new(GzDecoder::new(&file));
- let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), 64, None);
+ let mut reader =
+ Reader::from_buf_reader(reader, Arc::new(schema), 64, Default::default());
let batch_gz = reader.next().unwrap().unwrap();
for batch in vec![batch, batch_gz] {
@@ -3081,6 +3156,37 @@ mod tests {
assert_eq!(5, aa.value(7));
}
+ #[test]
+ fn test_time_from_string() {
+ parse_string_column::<Time64NanosecondType>(4);
+ parse_string_column::<Time64MicrosecondType>(4);
+ parse_string_column::<Time32MillisecondType>(4);
+ parse_string_column::<Time32SecondType>(4);
+ }
+
+ fn parse_string_column<T>(value: T::Native)
+ where
+ T: ArrowPrimitiveType,
+ {
+ let schema = Schema::new(vec![Field::new("d", T::DATA_TYPE, true)]);
+
+ let builder = ReaderBuilder::new()
+ .with_schema(Arc::new(schema))
+ .with_batch_size(64);
+ let mut reader: Reader<File> = builder
+ .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
+ .unwrap();
+
+ let batch = reader.next().unwrap().unwrap();
+ let dd = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<PrimitiveArray<T>>()
+ .unwrap();
+ assert_eq!(value, dd.value(1));
+ assert!(!dd.is_valid(2));
+ }
+
#[test]
fn test_json_read_nested_list() {
let schema = Schema::new(vec![Field::new(
@@ -3093,7 +3199,7 @@ mod tests {
true,
)]);
- let decoder = Decoder::new(Arc::new(schema), 1024, None);
+ let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let batch = decoder
.next_batch(
&mut vec![
@@ -3128,7 +3234,7 @@ mod tests {
true,
)]);
- let decoder = Decoder::new(Arc::new(schema), 1024, None);
+ let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let batch = decoder
.next_batch(
// NOTE: total struct element count needs to be greater than
@@ -3157,7 +3263,7 @@ mod tests {
#[test]
fn test_json_read_binary_structs() {
let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]);
- let decoder = Decoder::new(Arc::new(schema), 1024, None);
+ let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let batch = decoder
.next_batch(
&mut vec![
@@ -3200,7 +3306,7 @@ mod tests {
let mut sum_a = 0;
for batch in reader {
let batch = batch.unwrap();
- assert_eq!(4, batch.num_columns());
+ assert_eq!(5, batch.num_columns());
sum_num_rows += batch.num_rows();
num_batches += 1;
let batch_schema = batch.schema();
diff --git a/arrow/src/util/reader_parser.rs b/arrow/src/util/reader_parser.rs
index 591a3aedf..6b6f24f82 100644
--- a/arrow/src/util/reader_parser.rs
+++ b/arrow/src/util/reader_parser.rs
@@ -60,27 +60,39 @@ impl Parser for Int8Type {}
impl Parser for TimestampNanosecondType {
fn parse(string: &str) -> Option<i64> {
- match Self::DATA_TYPE {
- DataType::Timestamp(TimeUnit::Nanosecond, None) => {
- string_to_timestamp_nanos(string).ok()
- }
- _ => None,
- }
+ string_to_timestamp_nanos(string).ok()
}
}
impl Parser for TimestampMicrosecondType {
fn parse(string: &str) -> Option<i64> {
- match Self::DATA_TYPE {
- DataType::Timestamp(TimeUnit::Microsecond, None) => {
- let nanos = string_to_timestamp_nanos(string).ok();
- nanos.map(|x| x / 1000)
- }
- _ => None,
- }
+ let nanos = string_to_timestamp_nanos(string).ok();
+ nanos.map(|x| x / 1000)
+ }
+}
+
+impl Parser for TimestampMillisecondType {
+ fn parse(string: &str) -> Option<i64> {
+ let nanos = string_to_timestamp_nanos(string).ok();
+ nanos.map(|x| x / 1_000_000)
+ }
+}
+
+impl Parser for TimestampSecondType {
+ fn parse(string: &str) -> Option<i64> {
+ let nanos = string_to_timestamp_nanos(string).ok();
+ nanos.map(|x| x / 1_000_000_000)
}
}
+impl Parser for Time64NanosecondType {}
+
+impl Parser for Time64MicrosecondType {}
+
+impl Parser for Time32MillisecondType {}
+
+impl Parser for Time32SecondType {}
+
/// Number of days between 0001-01-01 and 1970-01-01
const EPOCH_DAYS_FROM_CE: i32 = 719_163;
diff --git a/arrow/test/data/basic.json b/arrow/test/data/basic.json
index dafd2dd2e..556c39c46 100644
--- a/arrow/test/data/basic.json
+++ b/arrow/test/data/basic.json
@@ -1,6 +1,6 @@
-{"a":1, "b":2.0, "c":false, "d":"4"}
-{"a":-10, "b":-3.5, "c":true, "d":"4"}
-{"a":2, "b":0.6, "c":false, "d":"text"}
+{"a":1, "b":2.0, "c":false, "d":"4", "e":"1970-1-2"}
+{"a":-10, "b":-3.5, "c":true, "d":"4", "e": "1969-12-31"}
+{"a":2, "b":0.6, "c":false, "d":"text", "e": "1970-01-02 11:11:11"}
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":7, "b":-3.5, "c":true, "d":"4"}
{"a":1, "b":0.6, "c":false, "d":"text"}
@@ -9,4 +9,4 @@
{"a":1, "b":0.6, "c":false, "d":"text"}
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":1, "b":-3.5, "c":true, "d":"4"}
-{"a":100000000000000, "b":0.6, "c":false, "d":"text"}
\ No newline at end of file
+{"a":100000000000000, "b":0.6, "c":false, "d":"text"}