You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/13 12:51:20 UTC
[arrow-rs] branch master updated: Conslidate JSON reader options (#1539)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 228ee36ee Conslidate JSON reader options (#1539)
228ee36ee is described below
commit 228ee36eeec26867a951c6a381e3e129765e21cf
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Apr 13 08:51:15 2022 -0400
Conslidate JSON reader options (#1539)
---
arrow/src/json/reader.rs | 153 ++++++++++++++++++++++-------------------------
1 file changed, 73 insertions(+), 80 deletions(-)
diff --git a/arrow/src/json/reader.rs b/arrow/src/json/reader.rs
index ad953e49b..9334e1839 100644
--- a/arrow/src/json/reader.rs
+++ b/arrow/src/json/reader.rs
@@ -41,8 +41,7 @@
//! let mut json = json::Reader::new(
//! BufReader::new(file),
//! Arc::new(schema),
-//! 1024,
-//! Default::default()
+//! json::reader::DecoderOptions::new(),
//! );
//!
//! let batch = json.next().unwrap().unwrap();
@@ -561,7 +560,7 @@ where
///
/// # Examples
/// ```
-/// use arrow::json::reader::{Decoder, ValueIter, infer_json_schema};
+/// use arrow::json::reader::{Decoder, DecoderOptions, ValueIter, infer_json_schema};
/// use std::fs::File;
/// use std::io::{BufReader, Seek, SeekFrom};
/// use std::sync::Arc;
@@ -569,8 +568,9 @@ where
/// let mut reader =
/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
-/// let batch_size = 1024;
-/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, Default::default());
+/// let options = DecoderOptions::new()
+/// .with_batch_size(1024);
+/// let decoder = Decoder::new(Arc::new(inferred_schema), options);
///
/// // seek back to start so that the original file is usable again
/// reader.seek(SeekFrom::Start(0)).unwrap();
@@ -583,35 +583,68 @@ where
pub struct Decoder {
/// Explicit schema for the JSON file
schema: SchemaRef,
- /// Batch size (number of records to load each time)
- batch_size: usize,
/// This is a collection of options for json decoder
- doptions: DecoderOptions,
+ options: DecoderOptions,
}
-#[derive(Default, Debug)]
+#[derive(Debug)]
pub struct DecoderOptions {
+ /// Batch size (number of records to load each time), defaults to 1024 records
+ batch_size: usize,
/// Optional projection for which columns to load (case-sensitive names)
projection: Option<Vec<String>>,
/// optional HashMap of column names to its format string
format_strings: Option<HashMap<String, String>>,
}
+impl Default for DecoderOptions {
+ fn default() -> Self {
+ Self {
+ batch_size: 1024,
+ projection: None,
+ format_strings: None,
+ }
+ }
+}
+
+impl DecoderOptions {
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// Set the batch size (number of records to load at one time)
+ pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+ self.batch_size = batch_size;
+ self
+ }
+
+ /// Set the reader's column projection
+ pub fn with_projection(mut self, projection: Vec<String>) -> Self {
+ self.projection = Some(projection);
+ self
+ }
+
+ /// Set the decoder's format Strings param
+ pub fn with_format_strings(
+ mut self,
+ format_strings: HashMap<String, String>,
+ ) -> Self {
+ self.format_strings = Some(format_strings);
+ self
+ }
+}
+
impl Decoder {
/// Create a new JSON decoder from any value that implements the `Iterator<Item=Result<Value>>`
/// trait.
- pub fn new(schema: SchemaRef, batch_size: usize, doptions: DecoderOptions) -> Self {
- Self {
- schema,
- batch_size,
- doptions,
- }
+ pub fn new(schema: SchemaRef, options: DecoderOptions) -> Self {
+ Self { schema, options }
}
/// Returns the schema of the reader, useful for getting the schema without reading
/// record batches
pub fn schema(&self) -> SchemaRef {
- match &self.doptions.projection {
+ match &self.options.projection {
Some(projection) => {
let fields = self.schema.fields();
let projected_fields: Vec<Field> = fields
@@ -636,9 +669,10 @@ impl Decoder {
where
I: Iterator<Item = Result<Value>>,
{
- let mut rows: Vec<Value> = Vec::with_capacity(self.batch_size);
+ let batch_size = self.options.batch_size;
+ let mut rows: Vec<Value> = Vec::with_capacity(batch_size);
- for value in value_iter.by_ref().take(self.batch_size) {
+ for value in value_iter.by_ref().take(batch_size) {
let v = value?;
match v {
Value::Object(_) => rows.push(v),
@@ -656,7 +690,7 @@ impl Decoder {
}
let rows = &rows[..];
- let projection = self.doptions.projection.clone().unwrap_or_default();
+ let projection = self.options.projection.clone().unwrap_or_default();
let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);
let projected_fields: Vec<Field> = if projection.is_empty() {
@@ -934,7 +968,7 @@ impl Decoder {
T::Native: num::NumCast,
{
let format_string = self
- .doptions
+ .options
.format_strings
.as_ref()
.and_then(|fmts| fmts.get(col_name));
@@ -1556,13 +1590,8 @@ impl<R: Read> Reader<R> {
///
/// If reading a `File`, you can customise the Reader, such as to enable schema
/// inference, use `ReaderBuilder`.
- pub fn new(
- reader: R,
- schema: SchemaRef,
- batch_size: usize,
- doptions: DecoderOptions,
- ) -> Self {
- Self::from_buf_reader(BufReader::new(reader), schema, batch_size, doptions)
+ pub fn new(reader: R, schema: SchemaRef, options: DecoderOptions) -> Self {
+ Self::from_buf_reader(BufReader::new(reader), schema, options)
}
/// Create a new JSON Reader from a `BufReader<R: Read>`
@@ -1571,12 +1600,11 @@ impl<R: Read> Reader<R> {
pub fn from_buf_reader(
reader: BufReader<R>,
schema: SchemaRef,
- batch_size: usize,
- doptions: DecoderOptions,
+ options: DecoderOptions,
) -> Self {
Self {
reader,
- decoder: Decoder::new(schema, batch_size, doptions),
+ decoder: Decoder::new(schema, options),
}
}
@@ -1595,7 +1623,7 @@ impl<R: Read> Reader<R> {
}
/// JSON file reader builder
-#[derive(Debug)]
+#[derive(Debug, Default)]
pub struct ReaderBuilder {
/// Optional schema for the JSON file
///
@@ -1606,26 +1634,8 @@ pub struct ReaderBuilder {
///
/// If a number is not provided, all the records are read.
max_records: Option<usize>,
- /// Batch size (number of records to load each time)
- ///
- /// The default batch size when using the `ReaderBuilder` is 1024 records
- batch_size: usize,
- /// Optional projection for which columns to load (zero-based column indices)
- projection: Option<Vec<String>>,
- /// optional HashMap of column names to format strings
- format_strings: Option<HashMap<String, String>>,
-}
-
-impl Default for ReaderBuilder {
- fn default() -> Self {
- Self {
- schema: None,
- max_records: None,
- batch_size: 1024,
- projection: None,
- format_strings: None,
- }
- }
+ /// Options for json decoder
+ options: DecoderOptions,
}
impl ReaderBuilder {
@@ -1672,13 +1682,13 @@ impl ReaderBuilder {
/// Set the batch size (number of records to load at one time)
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
- self.batch_size = batch_size;
+ self.options = self.options.with_batch_size(batch_size);
self
}
/// Set the reader's column projection
pub fn with_projection(mut self, projection: Vec<String>) -> Self {
- self.projection = Some(projection);
+ self.options = self.options.with_projection(projection);
self
}
@@ -1687,7 +1697,7 @@ impl ReaderBuilder {
mut self,
format_strings: HashMap<String, String>,
) -> Self {
- self.format_strings = Some(format_strings);
+ self.options = self.options.with_format_strings(format_strings);
self
}
@@ -1707,15 +1717,7 @@ impl ReaderBuilder {
)?),
};
- Ok(Reader::from_buf_reader(
- buf_reader,
- schema,
- self.batch_size,
- DecoderOptions {
- projection: self.projection,
- format_strings: self.format_strings,
- },
- ))
+ Ok(Reader::from_buf_reader(buf_reader, schema, self.options))
}
}
@@ -1868,8 +1870,7 @@ mod tests {
let mut reader: Reader<File> = Reader::new(
File::open("test/data/basic.json").unwrap(),
Arc::new(schema.clone()),
- 1024,
- Default::default(),
+ DecoderOptions::new(),
);
let reader_schema = reader.schema();
assert_eq!(reader_schema, Arc::new(schema));
@@ -1919,11 +1920,7 @@ mod tests {
let mut reader: Reader<File> = Reader::new(
File::open("test/data/basic.json").unwrap(),
schema.clone(),
- 1024,
- DecoderOptions {
- format_strings: Some(fmts),
- ..Default::default()
- },
+ DecoderOptions::new().with_format_strings(fmts),
);
let reader_schema = reader.schema();
assert_eq!(reader_schema, schema);
@@ -1955,11 +1952,7 @@ mod tests {
let mut reader: Reader<File> = Reader::new(
File::open("test/data/basic.json").unwrap(),
Arc::new(schema),
- 1024,
- DecoderOptions {
- projection: Some(vec!["a".to_string(), "c".to_string()]),
- ..Default::default()
- },
+ DecoderOptions::new().with_projection(vec!["a".to_string(), "c".to_string()]),
);
let reader_schema = reader.schema();
let expected_schema = Arc::new(Schema::new(vec![
@@ -2126,8 +2119,8 @@ mod tests {
file.seek(SeekFrom::Start(0)).unwrap();
let reader = BufReader::new(GzDecoder::new(&file));
- let mut reader =
- Reader::from_buf_reader(reader, Arc::new(schema), 64, Default::default());
+ let options = DecoderOptions::new().with_batch_size(64);
+ let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), options);
let batch_gz = reader.next().unwrap().unwrap();
for batch in vec![batch, batch_gz] {
@@ -3199,7 +3192,7 @@ mod tests {
true,
)]);
- let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
+ let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
let batch = decoder
.next_batch(
&mut vec![
@@ -3234,7 +3227,7 @@ mod tests {
true,
)]);
- let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
+ let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
let batch = decoder
.next_batch(
// NOTE: total struct element count needs to be greater than
@@ -3263,7 +3256,7 @@ mod tests {
#[test]
fn test_json_read_binary_structs() {
let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]);
- let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
+ let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
let batch = decoder
.next_batch(
&mut vec![