You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/01/26 15:03:36 UTC
[arrow-rs] branch master updated: Add Raw JSON Reader (~2.5x faster) (#3479)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 0f1a92a5f Add Raw JSON Reader (~2.5x faster) (#3479)
0f1a92a5f is described below
commit 0f1a92a5f31916570d70b78562913cf877e8929c
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jan 26 15:03:30 2023 +0000
Add Raw JSON Reader (~2.5x faster) (#3479)
* Add Raw JSON Reader
* Custom tape decoder
* RAT
* Cleanup
* More columns in benchmark
* CI fixes
* Tweaks
* Add List support
* Add support for nested nulls
* Remove unnecessary dependency
* Add RawDecoder
* Clippy
* Fix List
* Fix buffering
* More tests
* Add Send bounds
* Fix variance
* Review feedback
* Add deprecation notices
* Build RawDecoder with builder
* Improve field estimate
* Format
* Handle unicode split over strings
* Improve detection of invalid UTF-8 sequences
---
arrow-json/Cargo.toml | 1 +
arrow-json/src/lib.rs | 6 +-
arrow-json/src/raw/boolean_array.rs | 43 ++
arrow-json/src/raw/list_array.rs | 116 +++++
arrow-json/src/raw/mod.rs | 570 ++++++++++++++++++++++++
arrow-json/src/raw/primitive_array.rs | 88 ++++
arrow-json/src/raw/string_array.rs | 67 +++
arrow-json/src/raw/struct_array.rs | 129 ++++++
arrow-json/src/raw/tape.rs | 801 ++++++++++++++++++++++++++++++++++
arrow-json/src/reader.rs | 19 +
arrow/Cargo.toml | 2 +-
arrow/benches/json_reader.rs | 101 ++++-
12 files changed, 1916 insertions(+), 27 deletions(-)
diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml
index 9b9095b27..c6aa9b486 100644
--- a/arrow-json/Cargo.toml
+++ b/arrow-json/Cargo.toml
@@ -48,6 +48,7 @@ indexmap = { version = "1.9", default-features = false, features = ["std"] }
num = { version = "0.4", default-features = false, features = ["std"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
chrono = { version = "0.4.23", default-features = false, features = ["clock"] }
+lexical-core = { version = "0.8", default-features = false }
[dev-dependencies]
tempfile = "3.3"
diff --git a/arrow-json/src/lib.rs b/arrow-json/src/lib.rs
index 0f1c0064f..7e582c335 100644
--- a/arrow-json/src/lib.rs
+++ b/arrow-json/src/lib.rs
@@ -25,8 +25,10 @@
pub mod reader;
pub mod writer;
-pub use self::reader::Reader;
-pub use self::reader::ReaderBuilder;
+mod raw;
+
+pub use self::raw::{RawDecoder, RawReader, RawReaderBuilder};
+pub use self::reader::{Reader, ReaderBuilder};
pub use self::writer::{ArrayWriter, LineDelimitedWriter, Writer};
use half::f16;
use serde_json::{Number, Value};
diff --git a/arrow-json/src/raw/boolean_array.rs b/arrow-json/src/raw/boolean_array.rs
new file mode 100644
index 000000000..12917785e
--- /dev/null
+++ b/arrow-json/src/raw/boolean_array.rs
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::builder::BooleanBuilder;
+use arrow_array::Array;
+use arrow_data::ArrayData;
+use arrow_schema::ArrowError;
+
+use crate::raw::tape::{Tape, TapeElement};
+use crate::raw::{tape_error, ArrayDecoder};
+
+#[derive(Default)]
+pub struct BooleanArrayDecoder {}
+
+impl ArrayDecoder for BooleanArrayDecoder {
+ fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
+ let mut builder = BooleanBuilder::with_capacity(pos.len());
+ for p in pos {
+ match tape.get(*p) {
+ TapeElement::Null => builder.append_null(),
+ TapeElement::True => builder.append_value(true),
+ TapeElement::False => builder.append_value(false),
+ d => return Err(tape_error(d, "boolean")),
+ }
+ }
+
+ Ok(builder.finish().into_data())
+ }
+}
diff --git a/arrow-json/src/raw/list_array.rs b/arrow-json/src/raw/list_array.rs
new file mode 100644
index 000000000..9d96885f9
--- /dev/null
+++ b/arrow-json/src/raw/list_array.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::tape::{Tape, TapeElement};
+use crate::raw::{make_decoder, tape_error, ArrayDecoder};
+use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
+use arrow_array::OffsetSizeTrait;
+use arrow_data::{ArrayData, ArrayDataBuilder};
+use arrow_schema::{ArrowError, DataType};
+use std::marker::PhantomData;
+
+pub struct ListArrayDecoder<O> {
+ data_type: DataType,
+ decoder: Box<dyn ArrayDecoder>,
+ phantom: PhantomData<O>,
+ is_nullable: bool,
+}
+
+impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
+ pub fn new(data_type: DataType, is_nullable: bool) -> Result<Self, ArrowError> {
+ let field = match &data_type {
+ DataType::List(f) if !O::IS_LARGE => f,
+ DataType::LargeList(f) if O::IS_LARGE => f,
+ _ => unreachable!(),
+ };
+ let decoder = make_decoder(field.data_type().clone(), field.is_nullable())?;
+
+ Ok(Self {
+ data_type,
+ decoder,
+ phantom: Default::default(),
+ is_nullable,
+ })
+ }
+}
+
+impl<O: OffsetSizeTrait> ArrayDecoder for ListArrayDecoder<O> {
+ fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
+ let mut child_pos = Vec::with_capacity(pos.len());
+ let mut offsets = BufferBuilder::<O>::new(pos.len() + 1);
+ offsets.append(O::from_usize(0).unwrap());
+
+ let mut null_count = 0;
+ let mut nulls = self
+ .is_nullable
+ .then(|| BooleanBufferBuilder::new(pos.len()));
+
+ for p in pos {
+ let end_idx = match (tape.get(*p), nulls.as_mut()) {
+ (TapeElement::StartList(end_idx), None) => end_idx,
+ (TapeElement::StartList(end_idx), Some(nulls)) => {
+ nulls.append(true);
+ end_idx
+ }
+ (TapeElement::Null, Some(nulls)) => {
+ nulls.append(false);
+ null_count += 1;
+ *p + 1
+ }
+ (d, _) => return Err(tape_error(d, "[")),
+ };
+
+ let mut cur_idx = *p + 1;
+ while cur_idx < end_idx {
+ child_pos.push(cur_idx);
+
+ // Advance to next field
+ cur_idx = match tape.get(cur_idx) {
+ TapeElement::String(_)
+ | TapeElement::Number(_)
+ | TapeElement::True
+ | TapeElement::False
+ | TapeElement::Null => cur_idx + 1,
+ TapeElement::StartList(end_idx) => end_idx + 1,
+ TapeElement::StartObject(end_idx) => end_idx + 1,
+ d => return Err(tape_error(d, "list value")),
+ }
+ }
+
+ let offset = O::from_usize(child_pos.len()).ok_or_else(|| {
+ ArrowError::JsonError(format!(
+ "offset overflow decoding {}",
+ self.data_type
+ ))
+ })?;
+ offsets.append(offset)
+ }
+
+ let child_data = self.decoder.decode(tape, &child_pos).unwrap();
+
+ let data = ArrayDataBuilder::new(self.data_type.clone())
+ .len(pos.len())
+ .null_bit_buffer(nulls.as_mut().map(|x| x.finish()))
+ .null_count(null_count)
+ .add_buffer(offsets.finish())
+ .child_data(vec![child_data]);
+
+ // Safety
+ // Validated lengths above
+ Ok(unsafe { data.build_unchecked() })
+ }
+}
diff --git a/arrow-json/src/raw/mod.rs b/arrow-json/src/raw/mod.rs
new file mode 100644
index 000000000..9ffa7d213
--- /dev/null
+++ b/arrow-json/src/raw/mod.rs
@@ -0,0 +1,570 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A faster JSON reader that will eventually replace [`Reader`]
+//!
+//! [`Reader`]: crate::reader::Reader
+
+use crate::raw::boolean_array::BooleanArrayDecoder;
+use crate::raw::list_array::ListArrayDecoder;
+use crate::raw::primitive_array::PrimitiveArrayDecoder;
+use crate::raw::string_array::StringArrayDecoder;
+use crate::raw::struct_array::StructArrayDecoder;
+use crate::raw::tape::{Tape, TapeDecoder, TapeElement};
+use arrow_array::types::*;
+use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader};
+use arrow_data::ArrayData;
+use arrow_schema::{ArrowError, DataType, SchemaRef};
+use std::io::BufRead;
+
+mod boolean_array;
+mod list_array;
+mod primitive_array;
+mod string_array;
+mod struct_array;
+mod tape;
+
+/// A builder for [`RawReader`] and [`RawDecoder`]
+pub struct RawReaderBuilder {
+ batch_size: usize,
+
+ schema: SchemaRef,
+}
+
+impl RawReaderBuilder {
+ /// Create a new [`RawReaderBuilder`] with the provided [`SchemaRef`]
+ ///
+ /// This could be obtained using [`infer_json_schema`] if not known
+ ///
+ /// Any columns not present in `schema` will be ignored
+ ///
+ /// [`infer_json_schema`]: crate::reader::infer_json_schema
+ pub fn new(schema: SchemaRef) -> Self {
+ Self {
+ batch_size: 1024,
+ schema,
+ }
+ }
+
+ /// Sets the batch size in rows to read
+ pub fn with_batch_size(self, batch_size: usize) -> Self {
+ Self { batch_size, ..self }
+ }
+
+ /// Create a [`RawReader`] with the provided [`BufRead`]
+ pub fn build<R: BufRead>(self, reader: R) -> Result<RawReader<R>, ArrowError> {
+ Ok(RawReader {
+ reader,
+ decoder: self.build_decoder()?,
+ })
+ }
+
+ /// Create a [`RawDecoder`]
+ pub fn build_decoder(self) -> Result<RawDecoder, ArrowError> {
+ let decoder = make_decoder(DataType::Struct(self.schema.fields.clone()), false)?;
+ let num_fields = self.schema.all_fields().len();
+
+ Ok(RawDecoder {
+ decoder,
+ tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
+ batch_size: self.batch_size,
+ schema: self.schema,
+ })
+ }
+}
+
+/// Reads JSON data with a known schema directly into arrow [`RecordBatch`]
+///
+/// This is significantly faster than [`Reader`] and eventually intended
+/// to replace it ([#3610](https://github.com/apache/arrow-rs/issues/3610))
+///
+/// Lines consisting solely of ASCII whitespace are ignored
+///
+/// [`Reader`]: crate::reader::Reader
+pub struct RawReader<R> {
+ reader: R,
+ decoder: RawDecoder,
+}
+
+impl<R> std::fmt::Debug for RawReader<R> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("RawReader")
+ .field("decoder", &self.decoder)
+ .finish()
+ }
+}
+
+impl<R: BufRead> RawReader<R> {
+ /// Reads the next [`RecordBatch`] returning `Ok(None)` if EOF
+ fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+ loop {
+ let buf = self.reader.fill_buf()?;
+ if buf.is_empty() {
+ break;
+ }
+ let read = buf.len();
+
+ let decoded = self.decoder.decode(buf)?;
+ self.reader.consume(decoded);
+ if decoded != read {
+ break;
+ }
+ }
+ self.decoder.flush()
+ }
+}
+
+impl<R: BufRead> Iterator for RawReader<R> {
+ type Item = Result<RecordBatch, ArrowError>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.read().transpose()
+ }
+}
+
+impl<R: BufRead> RecordBatchReader for RawReader<R> {
+ fn schema(&self) -> SchemaRef {
+ self.decoder.schema.clone()
+ }
+}
+
+/// A low-level interface for reading JSON data from a byte stream
+///
+/// See [`RawReader`] for a higher-level interface for interface with [`BufRead`]
+///
+/// The push-based interface facilitates integration with sources that yield arbitrarily
+/// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from
+/// object storage
+///
+/// ```
+/// # use std::io::BufRead;
+/// # use arrow_array::RecordBatch;
+/// # use arrow_json::{RawDecoder, RawReaderBuilder};
+/// # use arrow_schema::{ArrowError, SchemaRef};
+/// #
+/// fn read_from_json<R: BufRead>(
+/// mut reader: R,
+/// schema: SchemaRef,
+/// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
+/// let mut decoder = RawReaderBuilder::new(schema).build_decoder()?;
+/// let mut next = move || {
+/// loop {
+/// // RawDecoder is agnostic that buf doesn't contain whole records
+/// let buf = reader.fill_buf()?;
+/// if buf.is_empty() {
+/// break; // Input exhausted
+/// }
+/// let read = buf.len();
+/// let decoded = decoder.decode(buf)?;
+///
+/// // Consume the number of bytes read
+/// reader.consume(decoded);
+/// if decoded != read {
+/// break; // Read batch size
+/// }
+/// }
+/// decoder.flush()
+/// };
+/// Ok(std::iter::from_fn(move || next().transpose()))
+/// }
+/// ```
+pub struct RawDecoder {
+ tape_decoder: TapeDecoder,
+ decoder: Box<dyn ArrayDecoder>,
+ batch_size: usize,
+ schema: SchemaRef,
+}
+
+impl std::fmt::Debug for RawDecoder {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("RawDecoder")
+ .field("schema", &self.schema)
+ .field("batch_size", &self.batch_size)
+ .finish()
+ }
+}
+
+impl RawDecoder {
+ /// Read JSON objects from `buf`, returning the number of bytes read
+ ///
+ /// This method returns once `batch_size` objects have been parsed since the
+ /// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes
+ /// should be included in the next call to [`Self::decode`]
+ ///
+ /// There is no requirement that `buf` contains a whole number of records, facilitating
+ /// integration with arbitrary byte streams, such as that yielded by [`BufRead`]
+ pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
+ self.tape_decoder.decode(buf)
+ }
+
+ /// Flushes the currently buffered data to a [`RecordBatch`]
+ ///
+ /// Returns `Ok(None)` if no buffered data
+ ///
+ /// Note: if called part way through decoding a record, this will return an error
+ pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+ let tape = self.tape_decoder.finish()?;
+
+ if tape.num_rows() == 0 {
+ return Ok(None);
+ }
+
+ // First offset is null sentinel
+ let mut next_object = 1;
+ let pos: Vec<_> = (0..tape.num_rows())
+ .map(|_| {
+ let end = match tape.get(next_object) {
+ TapeElement::StartObject(end) => end,
+ _ => unreachable!("corrupt tape"),
+ };
+ std::mem::replace(&mut next_object, end + 1)
+ })
+ .collect();
+
+ let decoded = self.decoder.decode(&tape, &pos)?;
+ self.tape_decoder.clear();
+
+ // Sanity check
+ assert!(matches!(decoded.data_type(), DataType::Struct(_)));
+ assert_eq!(decoded.null_count(), 0);
+ assert_eq!(decoded.len(), pos.len());
+
+ // Clear out buffer
+ let columns = decoded
+ .child_data()
+ .iter()
+ .map(|x| make_array(x.clone()))
+ .collect();
+
+ let batch = RecordBatch::try_new(self.schema.clone(), columns)?;
+ Ok(Some(batch))
+ }
+}
+
+trait ArrayDecoder: Send {
+ /// Decode elements from `tape` starting at the indexes contained in `pos`
+ fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
+}
+
+macro_rules! primitive_decoder {
+ ($t:ty, $data_type:expr) => {
+ Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
+ };
+}
+
+fn make_decoder(
+ data_type: DataType,
+ is_nullable: bool,
+) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
+ downcast_integer! {
+ data_type => (primitive_decoder, data_type),
+ DataType::Float32 => primitive_decoder!(Float32Type, data_type),
+ DataType::Float64 => primitive_decoder!(Float64Type, data_type),
+ DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
+ DataType::Utf8 => Ok(Box::<StringArrayDecoder::<i32>>::default()),
+ DataType::LargeUtf8 => Ok(Box::<StringArrayDecoder::<i64>>::default()),
+ DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, is_nullable)?)),
+ DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, is_nullable)?)),
+ DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, is_nullable)?)),
+ DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
+ Err(ArrowError::JsonError(format!("{} is not supported by JSON", data_type)))
+ }
+ d => Err(ArrowError::NotYetImplemented(format!("Support for {} in JSON reader", d)))
+ }
+}
+
+fn tape_error(d: TapeElement, expected: &str) -> ArrowError {
+ ArrowError::JsonError(format!("expected {expected} got {d}"))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::reader::infer_json_schema;
+ use crate::ReaderBuilder;
+ use arrow_array::cast::{
+ as_boolean_array, as_largestring_array, as_list_array, as_primitive_array,
+ as_string_array, as_struct_array,
+ };
+ use arrow_array::types::Int32Type;
+ use arrow_array::Array;
+ use arrow_schema::{DataType, Field, Schema};
+ use std::fs::File;
+ use std::io::{BufReader, Cursor, Seek};
+ use std::sync::Arc;
+
+ fn do_read(buf: &str, batch_size: usize, schema: SchemaRef) -> Vec<RecordBatch> {
+ let mut unbuffered = vec![];
+
+ // Test with different batch sizes to test for boundary conditions
+ for batch_size in [1, 3, 100, batch_size] {
+ unbuffered = RawReaderBuilder::new(schema.clone())
+ .with_batch_size(batch_size)
+ .build(Cursor::new(buf.as_bytes()))
+ .unwrap()
+ .collect::<Result<Vec<_>, _>>()
+ .unwrap();
+
+ for b in unbuffered.iter().take(unbuffered.len() - 1) {
+ assert_eq!(b.num_rows(), batch_size)
+ }
+
+ // Test with different buffer sizes to test for boundary conditions
+ for b in [1, 3, 5] {
+ let buffered = RawReaderBuilder::new(schema.clone())
+ .with_batch_size(batch_size)
+ .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
+ .unwrap()
+ .collect::<Result<Vec<_>, _>>()
+ .unwrap();
+ assert_eq!(unbuffered, buffered);
+ }
+ }
+
+ unbuffered
+ }
+
+ #[test]
+ fn test_basic() {
+ let buf = r#"
+ {"a": 1, "b": 2, "c": true}
+ {"a": 2E0, "b": 4, "c": false}
+
+ {"b": 6, "a": 2.0}
+ {"b": "5", "a": 2}
+ {"b": 4e0}
+ {"b": 7, "a": null}
+ "#;
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int64, true),
+ Field::new("b", DataType::Int32, true),
+ Field::new("c", DataType::Boolean, true),
+ ]));
+
+ let batches = do_read(buf, 1024, schema);
+ assert_eq!(batches.len(), 1);
+
+ let col1 = as_primitive_array::<Int64Type>(batches[0].column(0));
+ assert_eq!(col1.null_count(), 2);
+ assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
+ assert!(col1.is_null(4));
+ assert!(col1.is_null(5));
+
+ let col2 = as_primitive_array::<Int32Type>(batches[0].column(1));
+ assert_eq!(col2.null_count(), 0);
+ assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
+
+ let col3 = as_boolean_array(batches[0].column(2));
+ assert_eq!(col3.null_count(), 4);
+ assert!(col3.value(0));
+ assert!(!col3.is_null(0));
+ assert!(!col3.value(1));
+ assert!(!col3.is_null(1));
+ }
+
+ #[test]
+ fn test_string() {
+ let buf = r#"
+ {"a": "1", "b": "2"}
+ {"a": "hello", "b": "shoo"}
+ {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
+
+ {"b": null}
+ {"b": "", "a": null}
+
+ "#;
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, true),
+ Field::new("b", DataType::LargeUtf8, true),
+ ]));
+
+ let batches = do_read(buf, 1024, schema);
+ assert_eq!(batches.len(), 1);
+
+ let col1 = as_string_array(batches[0].column(0));
+ assert_eq!(col1.null_count(), 2);
+ assert_eq!(col1.value(0), "1");
+ assert_eq!(col1.value(1), "hello");
+ assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
+ assert!(col1.is_null(3));
+ assert!(col1.is_null(4));
+
+ let col2 = as_largestring_array(batches[0].column(1));
+ assert_eq!(col2.null_count(), 1);
+ assert_eq!(col2.value(0), "2");
+ assert_eq!(col2.value(1), "shoo");
+ assert_eq!(col2.value(2), "\t😁foo");
+ assert!(col2.is_null(3));
+ assert_eq!(col2.value(4), "");
+ }
+
+ #[test]
+ fn test_complex() {
+ let buf = r#"
+ {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
+ {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
+ {"list": null, "nested": {"a": null}}
+ "#;
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new(
+ "list",
+ DataType::List(Box::new(Field::new("element", DataType::Int32, false))),
+ true,
+ ),
+ Field::new(
+ "nested",
+ DataType::Struct(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ ]),
+ true,
+ ),
+ Field::new(
+ "nested_list",
+ DataType::Struct(vec![Field::new(
+ "list2",
+ DataType::List(Box::new(Field::new(
+ "element",
+ DataType::Struct(vec![Field::new("c", DataType::Int32, false)]),
+ false,
+ ))),
+ true,
+ )]),
+ true,
+ ),
+ ]));
+
+ let batches = do_read(buf, 1024, schema);
+ assert_eq!(batches.len(), 1);
+
+ let list = as_list_array(batches[0].column(0).as_ref());
+ assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
+ assert_eq!(list.null_count(), 1);
+ assert!(list.is_null(4));
+ let list_values = as_primitive_array::<Int32Type>(list.values().as_ref());
+ assert_eq!(list_values.values(), &[5, 6]);
+
+ let nested = as_struct_array(batches[0].column(1).as_ref());
+ let a = as_primitive_array::<Int32Type>(nested.column(0).as_ref());
+ assert_eq!(list.null_count(), 1);
+ assert_eq!(a.values(), &[1, 7, 0]);
+ assert!(list.is_null(2));
+
+ let b = as_primitive_array::<Int32Type>(nested.column(1).as_ref());
+ assert_eq!(b.null_count(), 2);
+ assert_eq!(b.len(), 3);
+ assert_eq!(b.value(0), 2);
+ assert!(b.is_null(1));
+ assert!(b.is_null(2));
+
+ let nested_list = as_struct_array(batches[0].column(2).as_ref());
+ let list2 = as_list_array(nested_list.column(0).as_ref());
+ assert_eq!(list2.null_count(), 1);
+ assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
+ assert!(list2.is_null(3));
+
+ let list2_values = as_struct_array(list2.values().as_ref());
+
+ let c = as_primitive_array::<Int32Type>(list2_values.column(0));
+ assert_eq!(c.values(), &[3, 4]);
+ }
+
+ #[test]
+ fn test_projection() {
+ let buf = r#"
+ {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
+ {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
+ "#;
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new(
+ "nested",
+ DataType::Struct(vec![Field::new("a", DataType::Int32, false)]),
+ true,
+ ),
+ Field::new(
+ "nested_list",
+ DataType::Struct(vec![Field::new(
+ "list2",
+ DataType::List(Box::new(Field::new(
+ "element",
+ DataType::Struct(vec![Field::new("d", DataType::Int32, false)]),
+ false,
+ ))),
+ true,
+ )]),
+ true,
+ ),
+ ]));
+
+ let batches = do_read(buf, 1024, schema);
+ assert_eq!(batches.len(), 1);
+
+ let nested = as_struct_array(batches[0].column(0).as_ref());
+ assert_eq!(nested.num_columns(), 1);
+ let a = as_primitive_array::<Int32Type>(nested.column(0).as_ref());
+ assert_eq!(a.null_count(), 0);
+ assert_eq!(a.values(), &[1, 7]);
+
+ let nested_list = as_struct_array(batches[0].column(1).as_ref());
+ assert_eq!(nested_list.num_columns(), 1);
+ assert_eq!(nested_list.null_count(), 0);
+
+ let list2 = as_list_array(nested_list.column(0).as_ref());
+ assert_eq!(list2.value_offsets(), &[0, 2, 2]);
+ assert_eq!(list2.null_count(), 0);
+
+ let child = as_struct_array(list2.values().as_ref());
+ assert_eq!(child.num_columns(), 1);
+ assert_eq!(child.len(), 2);
+ assert_eq!(child.null_count(), 0);
+
+ let c = as_primitive_array::<Int32Type>(child.column(0).as_ref());
+ assert_eq!(c.values(), &[5, 0]);
+ assert_eq!(c.null_count(), 1);
+ assert!(c.is_null(1));
+ }
+
+ #[test]
+ fn integration_test() {
+ let files = [
+ "test/data/basic.json",
+ "test/data/basic_nulls.json",
+ "test/data/list_string_dict_nested_nulls.json",
+ ];
+
+ for file in files {
+ let mut f = BufReader::new(File::open(file).unwrap());
+ let schema = Arc::new(infer_json_schema(&mut f, None).unwrap());
+
+ f.rewind().unwrap();
+ let a = ReaderBuilder::new()
+ .with_schema(schema.clone())
+ .build(&mut f)
+ .unwrap();
+ let a_result = a.into_iter().collect::<Result<Vec<_>, _>>().unwrap();
+
+ f.rewind().unwrap();
+ let b = RawReaderBuilder::new(schema).build(f).unwrap();
+ let b_result = b.into_iter().collect::<Result<Vec<_>, _>>().unwrap();
+
+ assert_eq!(a_result, b_result);
+ }
+ }
+}
diff --git a/arrow-json/src/raw/primitive_array.rs b/arrow-json/src/raw/primitive_array.rs
new file mode 100644
index 000000000..72ce30203
--- /dev/null
+++ b/arrow-json/src/raw/primitive_array.rs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use num::NumCast;
+use std::marker::PhantomData;
+
+use arrow_array::builder::PrimitiveBuilder;
+use arrow_array::{Array, ArrowPrimitiveType};
+use arrow_cast::parse::Parser;
+use arrow_data::ArrayData;
+use arrow_schema::{ArrowError, DataType};
+
+use crate::raw::tape::{Tape, TapeElement};
+use crate::raw::{tape_error, ArrayDecoder};
+
+pub struct PrimitiveArrayDecoder<P: ArrowPrimitiveType> {
+ data_type: DataType,
+ // Invariant and Send
+ phantom: PhantomData<fn(P) -> P>,
+}
+
+impl<P: ArrowPrimitiveType> PrimitiveArrayDecoder<P> {
+ pub fn new(data_type: DataType) -> Self {
+ Self {
+ data_type,
+ phantom: Default::default(),
+ }
+ }
+}
+
+impl<P> ArrayDecoder for PrimitiveArrayDecoder<P>
+where
+ P: ArrowPrimitiveType + Parser,
+ P::Native: NumCast,
+{
+ fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
+ let mut builder = PrimitiveBuilder::<P>::with_capacity(pos.len())
+ .with_data_type(self.data_type.clone());
+
+ for p in pos {
+ match tape.get(*p) {
+ TapeElement::Null => builder.append_null(),
+ TapeElement::String(idx) => {
+ let s = tape.get_string(idx);
+ let value = P::parse(s).ok_or_else(|| {
+ ArrowError::JsonError(format!(
+ "failed to parse \"{s}\" as {}",
+ self.data_type
+ ))
+ })?;
+
+ builder.append_value(value)
+ }
+ TapeElement::Number(idx) => {
+ let s = tape.get_string(idx);
+ let value = lexical_core::parse::<f64>(s.as_bytes())
+ .ok()
+ .and_then(NumCast::from)
+ .ok_or_else(|| {
+ ArrowError::JsonError(format!(
+ "failed to parse {s} as {}",
+ self.data_type
+ ))
+ })?;
+
+ builder.append_value(value)
+ }
+ d => return Err(tape_error(d, "primitive")),
+ }
+ }
+
+ Ok(builder.finish().into_data())
+ }
+}
diff --git a/arrow-json/src/raw/string_array.rs b/arrow-json/src/raw/string_array.rs
new file mode 100644
index 000000000..31a7a99be
--- /dev/null
+++ b/arrow-json/src/raw/string_array.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::builder::GenericStringBuilder;
+use arrow_array::{Array, GenericStringArray, OffsetSizeTrait};
+use arrow_data::ArrayData;
+use arrow_schema::ArrowError;
+use std::marker::PhantomData;
+
+use crate::raw::tape::{Tape, TapeElement};
+use crate::raw::{tape_error, ArrayDecoder};
+
+#[derive(Default)]
+pub struct StringArrayDecoder<O: OffsetSizeTrait> {
+ phantom: PhantomData<O>,
+}
+
+impl<O: OffsetSizeTrait> ArrayDecoder for StringArrayDecoder<O> {
+ fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
+ let mut data_capacity = 0;
+ for p in pos {
+ match tape.get(*p) {
+ TapeElement::String(idx) => {
+ data_capacity += tape.get_string(idx).len();
+ }
+ TapeElement::Null => {}
+ d => return Err(tape_error(d, "string")),
+ }
+ }
+
+ if O::from_usize(data_capacity).is_none() {
+ return Err(ArrowError::JsonError(format!(
+ "offset overflow decoding {}",
+ GenericStringArray::<O>::DATA_TYPE
+ )));
+ }
+
+ let mut builder =
+ GenericStringBuilder::<O>::with_capacity(pos.len(), data_capacity);
+
+ for p in pos {
+ match tape.get(*p) {
+ TapeElement::String(idx) => {
+ builder.append_value(tape.get_string(idx));
+ }
+ TapeElement::Null => builder.append_null(),
+ _ => unreachable!(),
+ }
+ }
+
+ Ok(builder.finish().into_data())
+ }
+}
diff --git a/arrow-json/src/raw/struct_array.rs b/arrow-json/src/raw/struct_array.rs
new file mode 100644
index 000000000..3b7895f37
--- /dev/null
+++ b/arrow-json/src/raw/struct_array.rs
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::tape::{Tape, TapeElement};
+use crate::raw::{make_decoder, tape_error, ArrayDecoder};
+use arrow_array::builder::BooleanBufferBuilder;
+use arrow_data::{ArrayData, ArrayDataBuilder};
+use arrow_schema::{ArrowError, DataType, Field};
+
+pub struct StructArrayDecoder {
+ data_type: DataType,
+ decoders: Vec<Box<dyn ArrayDecoder>>,
+ is_nullable: bool,
+}
+
+impl StructArrayDecoder {
+ pub fn new(data_type: DataType, is_nullable: bool) -> Result<Self, ArrowError> {
+ let decoders = struct_fields(&data_type)
+ .iter()
+ .map(|f| make_decoder(f.data_type().clone(), f.is_nullable()))
+ .collect::<Result<Vec<_>, ArrowError>>()?;
+
+ Ok(Self {
+ data_type,
+ decoders,
+ is_nullable,
+ })
+ }
+}
+
+impl ArrayDecoder for StructArrayDecoder {
+ fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
+ let fields = struct_fields(&self.data_type);
+ let mut child_pos: Vec<_> =
+ (0..fields.len()).map(|_| vec![0; pos.len()]).collect();
+
+ let mut null_count = 0;
+ let mut nulls = self
+ .is_nullable
+ .then(|| BooleanBufferBuilder::new(pos.len()));
+
+ for (row, p) in pos.iter().enumerate() {
+ let end_idx = match (tape.get(*p), nulls.as_mut()) {
+ (TapeElement::StartObject(end_idx), None) => end_idx,
+ (TapeElement::StartObject(end_idx), Some(nulls)) => {
+ nulls.append(true);
+ end_idx
+ }
+ (TapeElement::Null, Some(nulls)) => {
+ nulls.append(false);
+ null_count += 1;
+ continue;
+ }
+ (d, _) => return Err(tape_error(d, "{")),
+ };
+
+ let mut cur_idx = *p + 1;
+ while cur_idx < end_idx {
+ // Read field name
+ let field_name = match tape.get(cur_idx) {
+ TapeElement::String(s) => tape.get_string(s),
+ d => return Err(tape_error(d, "field name")),
+ };
+
+ // Update child pos if match found
+ if let Some(field_idx) =
+ fields.iter().position(|x| x.name() == field_name)
+ {
+ child_pos[field_idx][row] = cur_idx + 1;
+ }
+
+ // Advance to next field
+ cur_idx = match tape.get(cur_idx + 1) {
+ TapeElement::String(_)
+ | TapeElement::Number(_)
+ | TapeElement::True
+ | TapeElement::False
+ | TapeElement::Null => cur_idx + 2,
+ TapeElement::StartList(end_idx) => end_idx + 1,
+ TapeElement::StartObject(end_idx) => end_idx + 1,
+ d => return Err(tape_error(d, "field value")),
+ }
+ }
+ }
+
+ let child_data = self
+ .decoders
+ .iter_mut()
+ .zip(child_pos)
+ .map(|(d, pos)| d.decode(tape, &pos))
+ .collect::<Result<Vec<_>, ArrowError>>()?;
+
+ // Sanity check
+ child_data
+ .iter()
+ .for_each(|x| assert_eq!(x.len(), pos.len()));
+
+ let data = ArrayDataBuilder::new(self.data_type.clone())
+ .len(pos.len())
+ .null_count(null_count)
+ .null_bit_buffer(nulls.as_mut().map(|x| x.finish()))
+ .child_data(child_data);
+
+ // Safety
+ // Validated lengths above
+ Ok(unsafe { data.build_unchecked() })
+ }
+}
+
+fn struct_fields(data_type: &DataType) -> &[Field] {
+ match &data_type {
+ DataType::Struct(f) => f,
+ _ => unreachable!(),
+ }
+}
diff --git a/arrow-json/src/raw/tape.rs b/arrow-json/src/raw/tape.rs
new file mode 100644
index 000000000..6ca4e2d3f
--- /dev/null
+++ b/arrow-json/src/raw/tape.rs
@@ -0,0 +1,801 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_schema::ArrowError;
+use std::fmt::{Display, Formatter};
+
+/// We decode JSON to a flattened tape representation,
+/// allowing for efficient traversal of the JSON data
+///
+/// This approach is inspired by [simdjson]
+///
+/// Uses `u32` for offsets to ensure `TapeElement` is 64-bits. A future
+/// iteration may increase this to a custom `u56` type.
+///
+/// [simdjson]: https://github.com/simdjson/simdjson/blob/master/doc/tape.md
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub enum TapeElement {
+ /// The start of an object, i.e. `{`
+ ///
+ /// Contains the offset of the corresponding [`Self::EndObject`]
+ StartObject(u32),
+ /// The end of an object, i.e. `}`
+ ///
+ /// Contains the offset of the corresponding [`Self::StartObject`]
+ EndObject(u32),
+ /// The start of a list , i.e. `[`
+ ///
+ /// Contains the offset of the corresponding [`Self::EndList`]
+ StartList(u32),
+ /// The end of a list , i.e. `]`
+ ///
+ /// Contains the offset of the corresponding [`Self::StartList`]
+ EndList(u32),
+ /// A string value
+ ///
+ /// Contains the offset into the [`Tape`] string data
+ String(u32),
+ /// A numeric value
+ ///
+ /// Contains the offset into the [`Tape`] string data
+ Number(u32),
+ /// A true literal
+ True,
+ /// A false literal
+ False,
+ /// A null literal
+ Null,
+}
+
+impl Display for TapeElement {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ TapeElement::StartObject(_) => write!(f, "{{"),
+ TapeElement::EndObject(_) => write!(f, "}}"),
+ TapeElement::StartList(_) => write!(f, "["),
+ TapeElement::EndList(_) => write!(f, "]"),
+ TapeElement::String(_) => write!(f, "string"),
+ TapeElement::Number(_) => write!(f, "number"),
+ TapeElement::True => write!(f, "true"),
+ TapeElement::False => write!(f, "false"),
+ TapeElement::Null => write!(f, "null"),
+ }
+ }
+}
+
+/// A decoded JSON tape
+///
+/// String and numeric data is stored alongside an array of [`TapeElement`]
+///
+/// The first element is always [`TapeElement::Null`]
+///
+/// This approach to decoding JSON is inspired by [simdjson]
+///
+/// [simdjson]: https://github.com/simdjson/simdjson/blob/master/doc/tape.md
+#[derive(Debug)]
+pub struct Tape<'a> {
+ elements: &'a [TapeElement],
+ strings: &'a str,
+ string_offsets: &'a [usize],
+ num_rows: usize,
+}
+
+impl<'a> Tape<'a> {
+ /// Returns the string for the given string index
+ #[inline]
+ pub fn get_string(&self, idx: u32) -> &'a str {
+ let end_offset = self.string_offsets[idx as usize + 1];
+ let start_offset = self.string_offsets[idx as usize];
+ // SAFETY:
+ // Verified offsets
+ unsafe { self.strings.get_unchecked(start_offset..end_offset) }
+ }
+
+ /// Returns the tape element at `idx`
+ pub fn get(&self, idx: u32) -> TapeElement {
+ self.elements[idx as usize]
+ }
+
+ /// Returns the number of rows
+ pub fn num_rows(&self) -> usize {
+ self.num_rows
+ }
+}
+
+/// States based on <https://www.json.org/json-en.html>
+#[derive(Debug, Copy, Clone)]
+enum DecoderState {
+ /// Decoding an object
+ ///
+ /// Contains index of start [`TapeElement::StartObject`]
+ Object(u32),
+ /// Decoding a list
+ ///
+ /// Contains index of start [`TapeElement::StartList`]
+ List(u32),
+ String,
+ Value,
+ Number,
+ Colon,
+ Escape,
+ /// A unicode escape sequence,
+ ///
+ /// Consists of a `(low surrogate, high surrogate, decoded length)`
+ Unicode(u16, u16, u8),
+ /// A boolean or null literal
+ ///
+ /// Consists of `(literal, decoded length)`
+ Literal(Literal, u8),
+}
+
+impl DecoderState {
+ fn as_str(&self) -> &'static str {
+ match self {
+ DecoderState::Object(_) => "object",
+ DecoderState::List(_) => "list",
+ DecoderState::String => "string",
+ DecoderState::Value => "value",
+ DecoderState::Number => "number",
+ DecoderState::Colon => "colon",
+ DecoderState::Escape => "escape",
+ DecoderState::Unicode(_, _, _) => "unicode literal",
+ DecoderState::Literal(d, _) => d.as_str(),
+ }
+ }
+}
+
+#[derive(Debug, Copy, Clone)]
+enum Literal {
+ Null,
+ True,
+ False,
+}
+
+impl Literal {
+ fn element(&self) -> TapeElement {
+ match self {
+ Literal::Null => TapeElement::Null,
+ Literal::True => TapeElement::True,
+ Literal::False => TapeElement::False,
+ }
+ }
+
+ fn as_str(&self) -> &'static str {
+ match self {
+ Literal::Null => "null",
+ Literal::True => "true",
+ Literal::False => "false",
+ }
+ }
+
+ fn bytes(&self) -> &'static [u8] {
+ self.as_str().as_bytes()
+ }
+}
+
+/// Evaluates to the next element in the iterator or breaks the current loop
+macro_rules! next {
+ ($next:ident) => {
+ match $next.next() {
+ Some(b) => b,
+ None => break,
+ }
+ };
+}
+
+/// Implements a state machine for decoding JSON to a tape
+pub struct TapeDecoder {
+ elements: Vec<TapeElement>,
+
+ num_rows: usize,
+
+ /// Number of rows to read per batch
+ batch_size: usize,
+
+ /// A buffer of parsed string data
+ ///
+ /// Note: if part way through a record, i.e. `stack` is not empty,
+ /// this may contain truncated UTF-8 data
+ bytes: Vec<u8>,
+
+ /// Offsets into `data`
+ offsets: Vec<usize>,
+
+ /// A stack of [`DecoderState`]
+ stack: Vec<DecoderState>,
+}
+
+impl TapeDecoder {
+ /// Create a new [`TapeDecoder`] with the provided batch size
+ /// and an estimated number of fields in each row
+ pub fn new(batch_size: usize, num_fields: usize) -> Self {
+ let tokens_per_row = 2 + num_fields * 2;
+ let mut offsets = Vec::with_capacity(batch_size * (num_fields * 2) + 1);
+ offsets.push(0);
+
+ let mut elements = Vec::with_capacity(batch_size * tokens_per_row);
+ elements.push(TapeElement::Null);
+
+ Self {
+ offsets,
+ elements,
+ batch_size,
+ num_rows: 0,
+ bytes: Vec::with_capacity(num_fields * 2 * 8),
+ stack: Vec::with_capacity(10),
+ }
+ }
+
+ pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
+ if self.num_rows >= self.batch_size {
+ return Ok(0);
+ }
+
+ let mut iter = BufIter::new(buf);
+
+ while !iter.is_empty() {
+ match self.stack.last_mut() {
+ // Start of row
+ None => {
+ // Skip over leading whitespace
+ iter.skip_whitespace();
+ match next!(iter) {
+ b'{' => {
+ let idx = self.elements.len() as u32;
+ self.stack.push(DecoderState::Object(idx));
+ self.elements.push(TapeElement::StartObject(u32::MAX));
+ }
+ b => return Err(err(b, "trimming leading whitespace")),
+ }
+ }
+ // Decoding an object
+ Some(DecoderState::Object(start_idx)) => {
+ iter.advance_until(|b| !json_whitespace(b) && b != b',');
+ match next!(iter) {
+ b'"' => {
+ self.stack.push(DecoderState::Value);
+ self.stack.push(DecoderState::Colon);
+ self.stack.push(DecoderState::String);
+ }
+ b'}' => {
+ let start_idx = *start_idx;
+ let end_idx = self.elements.len() as u32;
+ self.elements[start_idx as usize] =
+ TapeElement::StartObject(end_idx);
+ self.elements.push(TapeElement::EndObject(start_idx));
+ self.stack.pop();
+ self.num_rows += self.stack.is_empty() as usize;
+ if self.num_rows >= self.batch_size {
+ break;
+ }
+ }
+ b => return Err(err(b, "parsing object")),
+ }
+ }
+ // Decoding a list
+ Some(DecoderState::List(start_idx)) => {
+ iter.advance_until(|b| !json_whitespace(b) && b != b',');
+ match iter.peek() {
+ Some(b']') => {
+ iter.next();
+ let start_idx = *start_idx;
+ let end_idx = self.elements.len() as u32;
+ self.elements[start_idx as usize] =
+ TapeElement::StartList(end_idx);
+ self.elements.push(TapeElement::EndList(start_idx));
+ self.stack.pop();
+ }
+ Some(_) => self.stack.push(DecoderState::Value),
+ None => break,
+ }
+ }
+ // Decoding a string
+ Some(DecoderState::String) => {
+ let s = iter.advance_until(|b| matches!(b, b'\\' | b'"'));
+ self.bytes.extend_from_slice(s);
+
+ match next!(iter) {
+ b'\\' => self.stack.push(DecoderState::Escape),
+ b'"' => {
+ let idx = self.offsets.len() - 1;
+ self.elements.push(TapeElement::String(idx as _));
+ self.offsets.push(self.bytes.len());
+ self.stack.pop();
+ }
+ b => unreachable!("{}", b),
+ }
+ }
+ Some(state @ DecoderState::Value) => {
+ iter.skip_whitespace();
+ *state = match next!(iter) {
+ b'"' => DecoderState::String,
+ b @ b'-' | b @ b'0'..=b'9' => {
+ self.bytes.push(b);
+ DecoderState::Number
+ }
+ b'n' => DecoderState::Literal(Literal::Null, 1),
+ b'f' => DecoderState::Literal(Literal::False, 1),
+ b't' => DecoderState::Literal(Literal::True, 1),
+ b'[' => {
+ let idx = self.elements.len() as u32;
+ self.elements.push(TapeElement::StartList(u32::MAX));
+ DecoderState::List(idx)
+ }
+ b'{' => {
+ let idx = self.elements.len() as u32;
+ self.elements.push(TapeElement::StartObject(u32::MAX));
+ DecoderState::Object(idx)
+ }
+ b => return Err(err(b, "parsing value")),
+ };
+ }
+ Some(DecoderState::Number) => {
+ let s = iter.advance_until(|b| {
+ !matches!(b, b'0'..=b'9' | b'-' | b'+' | b'.' | b'e' | b'E')
+ });
+ self.bytes.extend_from_slice(s);
+
+ if !iter.is_empty() {
+ self.stack.pop();
+ let idx = self.offsets.len() - 1;
+ self.elements.push(TapeElement::Number(idx as _));
+ self.offsets.push(self.bytes.len());
+ }
+ }
+ Some(DecoderState::Colon) => {
+ iter.skip_whitespace();
+ match next!(iter) {
+ b':' => self.stack.pop(),
+ b => return Err(err(b, "parsing colon")),
+ };
+ }
+ Some(DecoderState::Literal(literal, idx)) => {
+ let bytes = literal.bytes();
+ let expected = bytes.iter().skip(*idx as usize).copied();
+ for (expected, b) in expected.zip(&mut iter) {
+ match b == expected {
+ true => *idx += 1,
+ false => return Err(err(b, "parsing literal")),
+ }
+ }
+ if *idx == bytes.len() as u8 {
+ let element = literal.element();
+ self.stack.pop();
+ self.elements.push(element);
+ }
+ }
+ Some(DecoderState::Escape) => {
+ let v = match next!(iter) {
+ b'u' => {
+ self.stack.pop();
+ self.stack.push(DecoderState::Unicode(0, 0, 0));
+ continue;
+ }
+ b'"' => b'"',
+ b'\\' => b'\\',
+ b'/' => b'/',
+ b'b' => 8, // BS
+ b'f' => 12, // FF
+ b'n' => b'\n',
+ b'r' => b'\r',
+ b't' => b'\t',
+ b => return Err(err(b, "parsing escape sequence")),
+ };
+
+ self.stack.pop();
+ self.bytes.push(v);
+ }
+ // Parse a unicode escape sequence
+ Some(DecoderState::Unicode(high, low, idx)) => loop {
+ match *idx {
+ 0..=3 => *high = *high << 4 | parse_hex(next!(iter))? as u16,
+ 4 => {
+ if let Some(c) = char::from_u32(*high as u32) {
+ write_char(c, &mut self.bytes);
+ self.stack.pop();
+ break;
+ }
+
+ match next!(iter) {
+ b'\\' => {}
+ b => return Err(err(b, "parsing surrogate pair escape")),
+ }
+ }
+ 5 => match next!(iter) {
+ b'u' => {}
+ b => return Err(err(b, "parsing surrogate pair unicode")),
+ },
+ 6..=9 => *low = *low << 4 | parse_hex(next!(iter))? as u16,
+ _ => {
+ let c = char_from_surrogate_pair(*low, *high)?;
+ write_char(c, &mut self.bytes);
+ self.stack.pop();
+ break;
+ }
+ }
+ *idx += 1;
+ },
+ }
+ }
+
+ Ok(buf.len() - iter.len())
+ }
+
+ /// Finishes the current [`Tape`]
+ pub fn finish(&self) -> Result<Tape<'_>, ArrowError> {
+ if let Some(b) = self.stack.last() {
+ return Err(ArrowError::JsonError(format!(
+ "Truncated record whilst reading {}",
+ b.as_str()
+ )));
+ }
+
+ if self.offsets.len() >= u32::MAX as usize {
+ return Err(ArrowError::JsonError(format!("Encountered more than {} bytes of string data, consider using a smaller batch size", u32::MAX)));
+ }
+
+ if self.offsets.len() >= u32::MAX as usize {
+ return Err(ArrowError::JsonError(format!("Encountered more than {} JSON elements, consider using a smaller batch size", u32::MAX)));
+ }
+
+ // Sanity check
+ assert_eq!(
+ self.offsets.last().copied().unwrap_or_default(),
+ self.bytes.len()
+ );
+
+ let strings = std::str::from_utf8(&self.bytes).map_err(|_| {
+ ArrowError::JsonError("Encountered non-UTF-8 data".to_string())
+ })?;
+
+ for offset in self.offsets.iter().copied() {
+ if !strings.is_char_boundary(offset) {
+ return Err(ArrowError::JsonError(
+ "Encountered truncated UTF-8 sequence".to_string(),
+ ));
+ }
+ }
+
+ Ok(Tape {
+ strings,
+ elements: &self.elements,
+ string_offsets: &self.offsets,
+ num_rows: self.num_rows,
+ })
+ }
+
+ /// Clears this [`TapeDecoder`] in preparation to read the next batch
+ pub fn clear(&mut self) {
+ assert!(self.stack.is_empty());
+
+ self.num_rows = 0;
+ self.bytes.clear();
+ self.elements.clear();
+ self.elements.push(TapeElement::Null);
+ self.offsets.clear();
+ self.offsets.push(0);
+ }
+}
+
+/// A wrapper around a slice iterator that provides some helper functionality
+struct BufIter<'a>(std::slice::Iter<'a, u8>);
+
+impl<'a> BufIter<'a> {
+ fn new(buf: &'a [u8]) -> Self {
+ Self(buf.iter())
+ }
+
+ fn as_slice(&self) -> &'a [u8] {
+ self.0.as_slice()
+ }
+
+ fn is_empty(&self) -> bool {
+ self.0.len() == 0
+ }
+
+ fn peek(&self) -> Option<u8> {
+ self.0.as_slice().first().copied()
+ }
+
+ fn advance(&mut self, skip: usize) {
+ for _ in 0..skip {
+ self.0.next();
+ }
+ }
+
+ fn advance_until<F: FnMut(u8) -> bool>(&mut self, f: F) -> &[u8] {
+ let s = self.as_slice();
+ match s.iter().copied().position(f) {
+ Some(x) => {
+ self.advance(x);
+ &s[..x]
+ }
+ None => {
+ self.advance(s.len());
+ s
+ }
+ }
+ }
+
+ fn skip_whitespace(&mut self) {
+ self.advance_until(|b| !json_whitespace(b));
+ }
+}
+
+impl<'a> Iterator for BufIter<'a> {
+ type Item = u8;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.0.next().copied()
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.0.size_hint()
+ }
+}
+
+impl<'a> ExactSizeIterator for BufIter<'a> {}
+
+/// Returns an error for a given byte `b` and context `ctx`
+fn err(b: u8, ctx: &str) -> ArrowError {
+ ArrowError::JsonError(format!(
+ "Encountered unexpected '{}' whilst {ctx}",
+ b as char
+ ))
+}
+
+/// Creates a character from an UTF-16 surrogate pair
+fn char_from_surrogate_pair(low: u16, high: u16) -> Result<char, ArrowError> {
+ let n = (((high - 0xD800) as u32) << 10 | (low - 0xDC00) as u32) + 0x1_0000;
+ char::from_u32(n).ok_or_else(|| {
+ ArrowError::JsonError(format!("Invalid UTF-16 surrogate pair {}", n))
+ })
+}
+
+/// Writes `c` as UTF-8 to `out`
+fn write_char(c: char, out: &mut Vec<u8>) {
+ let mut t = [0; 4];
+ out.extend_from_slice(c.encode_utf8(&mut t).as_bytes());
+}
+
+/// Evaluates to true if `b` is a valid JSON whitespace character
+#[inline]
+fn json_whitespace(b: u8) -> bool {
+ matches!(b, b' ' | b'\n' | b'\r' | b'\t')
+}
+
+/// Parse a hex character to `u8`
+fn parse_hex(b: u8) -> Result<u8, ArrowError> {
+ let digit = char::from(b)
+ .to_digit(16)
+ .ok_or_else(|| err(b, "unicode escape"))?;
+ Ok(digit as u8)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_sizes() {
+ assert_eq!(std::mem::size_of::<DecoderState>(), 8);
+ assert_eq!(std::mem::size_of::<TapeElement>(), 8);
+ }
+
+ #[test]
+ fn test_basic() {
+ let a = r#"
+ {"hello": "world", "foo": 2, "bar": 45}
+
+ {"foo": "bar"}
+
+ {"fiz": null}
+
+ {"a": true, "b": false, "c": null}
+
+ {"a": "", "": "a"}
+
+ {"a": "b", "object": {"nested": "hello", "foo": 23}, "b": {}, "c": {"foo": null }}
+
+ {"a": ["", "foo", ["bar", "c"]], "b": {"1": []}, "c": {"2": [1, 2, 3]} }
+ "#;
+ let mut decoder = TapeDecoder::new(16, 2);
+ decoder.decode(a.as_bytes()).unwrap();
+
+ let finished = decoder.finish().unwrap();
+ assert_eq!(
+ finished.elements,
+ &[
+ TapeElement::Null,
+ TapeElement::StartObject(8), // {"hello": "world", "foo": 2, "bar": 45}
+ TapeElement::String(0), // "hello"
+ TapeElement::String(1), // "world"
+ TapeElement::String(2), // "foo"
+ TapeElement::Number(3), // 2
+ TapeElement::String(4), // "bar"
+ TapeElement::Number(5), // 45
+ TapeElement::EndObject(1),
+ TapeElement::StartObject(12), // {"foo": "bar"}
+ TapeElement::String(6), // "foo"
+ TapeElement::String(7), // "bar"
+ TapeElement::EndObject(9),
+ TapeElement::StartObject(16), // {"fiz": null}
+ TapeElement::String(8), // "fiz
+ TapeElement::Null, // null
+ TapeElement::EndObject(13),
+ TapeElement::StartObject(24), // {"a": true, "b": false, "c": null}
+ TapeElement::String(9), // "a"
+ TapeElement::True, // true
+ TapeElement::String(10), // "b"
+ TapeElement::False, // false
+ TapeElement::String(11), // "c"
+ TapeElement::Null, // null
+ TapeElement::EndObject(17),
+ TapeElement::StartObject(30), // {"a": "", "": "a"}
+ TapeElement::String(12), // "a"
+ TapeElement::String(13), // ""
+ TapeElement::String(14), // ""
+ TapeElement::String(15), // "a"
+ TapeElement::EndObject(25),
+ TapeElement::StartObject(49), // {"a": "b", "object": {"nested": "hello", "foo": 23}, "b": {}, "c": {"foo": null }}
+ TapeElement::String(16), // "a"
+ TapeElement::String(17), // "b"
+ TapeElement::String(18), // "object"
+ TapeElement::StartObject(40), // {"nested": "hello", "foo": 23}
+ TapeElement::String(19), // "nested"
+ TapeElement::String(20), // "hello"
+ TapeElement::String(21), // "foo"
+ TapeElement::Number(22), // 23
+ TapeElement::EndObject(35),
+ TapeElement::String(23), // "b"
+ TapeElement::StartObject(43), // {}
+ TapeElement::EndObject(42),
+ TapeElement::String(24), // "c"
+ TapeElement::StartObject(48), // {"foo": null }
+ TapeElement::String(25), // "foo"
+ TapeElement::Null, // null
+ TapeElement::EndObject(45),
+ TapeElement::EndObject(31),
+ TapeElement::StartObject(75), // {"a": ["", "foo", ["bar", "c"]], "b": {"1": []}, "c": {"2": [1, 2, 3]} }
+ TapeElement::String(26), // "a"
+ TapeElement::StartList(59), // ["", "foo", ["bar", "c"]]
+ TapeElement::String(27), // ""
+ TapeElement::String(28), // "foo"
+ TapeElement::StartList(58), // ["bar", "c"]
+ TapeElement::String(29), // "bar"
+ TapeElement::String(30), // "c"
+ TapeElement::EndList(55),
+ TapeElement::EndList(52),
+ TapeElement::String(31), // "b"
+ TapeElement::StartObject(65), // {"1": []}
+ TapeElement::String(32), // "1"
+ TapeElement::StartList(64), // []
+ TapeElement::EndList(63),
+ TapeElement::EndObject(61),
+ TapeElement::String(33), // "c"
+ TapeElement::StartObject(74), // {"2": [1, 2, 3]}
+ TapeElement::String(34), // "2"
+ TapeElement::StartList(73), // [1, 2, 3]
+ TapeElement::Number(35), // 1
+ TapeElement::Number(36), // 2
+ TapeElement::Number(37), // 3
+ TapeElement::EndList(69),
+ TapeElement::EndObject(67),
+ TapeElement::EndObject(50)
+ ]
+ );
+
+ assert_eq!(
+ finished.strings,
+ "helloworldfoo2bar45foobarfizabcaaabobjectnestedhellofoo23bcfooafoobarcb1c2123"
+ );
+ assert_eq!(
+ &finished.string_offsets,
+ &[
+ 0, 5, 10, 13, 14, 17, 19, 22, 25, 28, 29, 30, 31, 32, 32, 32, 33, 34, 35,
+ 41, 47, 52, 55, 57, 58, 59, 62, 63, 63, 66, 69, 70, 71, 72, 73, 74, 75,
+ 76, 77
+ ]
+ )
+ }
+
+ #[test]
+ fn test_invalid() {
+ // Test invalid
+ let mut decoder = TapeDecoder::new(16, 2);
+ let err = decoder.decode(b"hello").unwrap_err().to_string();
+ assert_eq!(
+ err,
+ "Json error: Encountered unexpected 'h' whilst trimming leading whitespace"
+ );
+
+ let mut decoder = TapeDecoder::new(16, 2);
+ let err = decoder.decode(b"{\"hello\": }").unwrap_err().to_string();
+ assert_eq!(
+ err,
+ "Json error: Encountered unexpected '}' whilst parsing value"
+ );
+
+ let mut decoder = TapeDecoder::new(16, 2);
+ let err = decoder
+ .decode(b"{\"hello\": [ false, tru ]}")
+ .unwrap_err()
+ .to_string();
+ assert_eq!(
+ err,
+ "Json error: Encountered unexpected ' ' whilst parsing literal"
+ );
+
+ let mut decoder = TapeDecoder::new(16, 2);
+ let err = decoder
+ .decode(b"{\"hello\": \"\\ud8\"}")
+ .unwrap_err()
+ .to_string();
+ assert_eq!(
+ err,
+ "Json error: Encountered unexpected '\"' whilst unicode escape"
+ );
+
+ // Missing surrogate pair
+ let mut decoder = TapeDecoder::new(16, 2);
+ let err = decoder
+ .decode(b"{\"hello\": \"\\ud83d\"}")
+ .unwrap_err()
+ .to_string();
+ assert_eq!(
+ err,
+ "Json error: Encountered unexpected '\"' whilst parsing surrogate pair escape"
+ );
+
+ // Test truncation
+ let mut decoder = TapeDecoder::new(16, 2);
+ decoder.decode(b"{\"he").unwrap();
+ let err = decoder.finish().unwrap_err().to_string();
+ assert_eq!(err, "Json error: Truncated record whilst reading string");
+
+ let mut decoder = TapeDecoder::new(16, 2);
+ decoder.decode(b"{\"hello\" : ").unwrap();
+ let err = decoder.finish().unwrap_err().to_string();
+ assert_eq!(err, "Json error: Truncated record whilst reading value");
+
+ let mut decoder = TapeDecoder::new(16, 2);
+ decoder.decode(b"{\"hello\" : [").unwrap();
+ let err = decoder.finish().unwrap_err().to_string();
+ assert_eq!(err, "Json error: Truncated record whilst reading list");
+
+ let mut decoder = TapeDecoder::new(16, 2);
+ decoder.decode(b"{\"hello\" : tru").unwrap();
+ let err = decoder.finish().unwrap_err().to_string();
+ assert_eq!(err, "Json error: Truncated record whilst reading true");
+
+ let mut decoder = TapeDecoder::new(16, 2);
+ decoder.decode(b"{\"hello\" : nu").unwrap();
+ let err = decoder.finish().unwrap_err().to_string();
+ assert_eq!(err, "Json error: Truncated record whilst reading null");
+
+ // Test invalid UTF-8
+ let mut decoder = TapeDecoder::new(16, 2);
+ decoder.decode(b"{\"hello\" : \"world\xFF\"}").unwrap();
+ let err = decoder.finish().unwrap_err().to_string();
+ assert_eq!(err, "Json error: Encountered non-UTF-8 data");
+
+ let mut decoder = TapeDecoder::new(16, 2);
+ decoder.decode(b"{\"\xe2\" : \"\x96\xa1\"}").unwrap();
+ let err = decoder.finish().unwrap_err().to_string();
+ assert_eq!(err, "Json error: Encountered truncated UTF-8 sequence");
+ }
+}
diff --git a/arrow-json/src/reader.rs b/arrow-json/src/reader.rs
index 64a1b5319..c2647ebfc 100644
--- a/arrow-json/src/reader.rs
+++ b/arrow-json/src/reader.rs
@@ -563,6 +563,9 @@ where
/// converts them to [`RecordBatch`]es. To decode JSON formatted files,
/// see [`Reader`].
///
+/// Note: Consider instead using [`RawDecoder`] which is faster and will
+/// eventually replace this implementation as part of [#3610]
+///
/// # Examples
/// ```
/// use arrow_json::reader::{Decoder, DecoderOptions, ValueIter, infer_json_schema};
@@ -584,6 +587,9 @@ where
/// assert_eq!(4, batch.num_rows());
/// assert_eq!(4, batch.num_columns());
/// ```
+///
+/// [`RawDecoder`]: crate::raw::RawDecoder
+/// [#3610]: https://github.com/apache/arrow-rs/issues/3610
#[derive(Debug)]
pub struct Decoder {
/// Explicit schema for the JSON file
@@ -1607,6 +1613,12 @@ fn flatten_json_string_values(values: &[Value]) -> Vec<Option<String>> {
.collect::<Vec<Option<_>>>()
}
/// JSON file reader
+///
+/// Note: Consider instead using [`RawReader`] which is faster and will
+/// eventually replace this implementation as part of [#3610]
+///
+/// [`RawReader`]: crate::raw::RawReader
+/// [#3610]: https://github.com/apache/arrow-rs/issues/3610
#[derive(Debug)]
pub struct Reader<R: Read> {
reader: BufReader<R>,
@@ -1652,6 +1664,13 @@ impl<R: Read> Reader<R> {
}
/// JSON file reader builder
+///
+/// Note: Consider instead using [`RawReaderBuilder`] which is faster and will
+/// eventually replace this implementation as part of [#3610]
+///
+/// [`RawReaderBuilder`]: crate::raw::RawReaderBuilder
+/// [#3610]: https://github.com/apache/arrow-rs/issues/3610
+///
#[derive(Debug, Default)]
pub struct ReaderBuilder {
/// Optional schema for the JSON file
diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index ee926ee52..bb67bfc40 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -207,7 +207,7 @@ required-features = ["test_utils", "csv"]
[[bench]]
name = "json_reader"
harness = false
-required-features = ["json"]
+required-features = ["test_utils", "json"]
[[bench]]
name = "equal"
diff --git a/arrow/benches/json_reader.rs b/arrow/benches/json_reader.rs
index 7bc3f4179..b5d8a5367 100644
--- a/arrow/benches/json_reader.rs
+++ b/arrow/benches/json_reader.rs
@@ -18,18 +18,50 @@
use criterion::*;
use arrow::datatypes::*;
-use arrow_json::ReaderBuilder;
+use arrow::util::bench_util::{
+ create_primitive_array, create_string_array, create_string_array_with_len,
+};
+use arrow_array::RecordBatch;
+use arrow_json::RawReaderBuilder;
+use arrow_json::{LineDelimitedWriter, ReaderBuilder};
use std::io::Cursor;
use std::sync::Arc;
-fn json_primitive_to_record_batch() {
+fn do_bench(c: &mut Criterion, name: &str, json: &str, schema: SchemaRef) {
+ c.bench_function(&format!("{name} (basic)"), |b| {
+ b.iter(|| {
+ let cursor = Cursor::new(black_box(json));
+ let builder = ReaderBuilder::new()
+ .with_schema(schema.clone())
+ .with_batch_size(64);
+
+ let mut reader = builder.build(cursor).unwrap();
+ while let Some(next) = reader.next().transpose() {
+ next.unwrap();
+ }
+ })
+ });
+
+ c.bench_function(&format!("{name} (raw)"), |b| {
+ b.iter(|| {
+ let cursor = Cursor::new(black_box(json));
+ let builder = RawReaderBuilder::new(schema.clone()).with_batch_size(64);
+ let reader = builder.build(cursor).unwrap();
+ for next in reader {
+ next.unwrap();
+ }
+ })
+ });
+}
+
+fn small_bench_primitive(c: &mut Criterion) {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Float64, true),
Field::new("c3", DataType::UInt32, true),
Field::new("c4", DataType::Boolean, true),
]));
- let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
+
let json_content = r#"
{"c1": "eleven", "c2": 6.2222222225, "c3": 5.0, "c4": false}
{"c1": "twelve", "c2": -55555555555555.2, "c3": 3}
@@ -42,15 +74,45 @@ fn json_primitive_to_record_batch() {
{"c2": -35, "c3": 100.0, "c4": true}
{"c1": "fifteen", "c2": null, "c4": true}
"#;
- let cursor = Cursor::new(json_content);
- let mut reader = builder.build(cursor).unwrap();
- #[allow(clippy::unit_arg)]
- criterion::black_box({
- reader.next().unwrap();
- });
+
+ do_bench(c, "small_bench_primitive", json_content, schema)
}
-fn json_list_primitive_to_record_batch() {
+fn large_bench_primitive(c: &mut Criterion) {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Utf8, true),
+ Field::new("c2", DataType::Int32, true),
+ Field::new("c3", DataType::UInt32, true),
+ Field::new("c4", DataType::Utf8, true),
+ Field::new("c5", DataType::Utf8, true),
+ Field::new("c6", DataType::Float32, true),
+ ]));
+
+ let c1 = Arc::new(create_string_array::<i32>(4096, 0.));
+ let c2 = Arc::new(create_primitive_array::<Int32Type>(4096, 0.));
+ let c3 = Arc::new(create_primitive_array::<UInt32Type>(4096, 0.));
+ let c4 = Arc::new(create_string_array_with_len::<i32>(4096, 0.2, 10));
+ let c5 = Arc::new(create_string_array_with_len::<i32>(4096, 0.2, 20));
+ let c6 = Arc::new(create_primitive_array::<Float32Type>(4096, 0.2));
+
+ let batch = RecordBatch::try_from_iter([
+ ("c1", c1 as _),
+ ("c2", c2 as _),
+ ("c3", c3 as _),
+ ("c4", c4 as _),
+ ("c5", c5 as _),
+ ("c6", c6 as _),
+ ])
+ .unwrap();
+
+ let mut out = Vec::with_capacity(1024);
+ LineDelimitedWriter::new(&mut out).write(batch).unwrap();
+
+ let json = std::str::from_utf8(&out).unwrap();
+ do_bench(c, "large_bench_primitive", json, schema)
+}
+
+fn small_bench_list(c: &mut Criterion) {
let schema = Arc::new(Schema::new(vec![
Field::new(
"c1",
@@ -73,8 +135,7 @@ fn json_list_primitive_to_record_batch() {
true,
),
]));
- let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
- let json_content = r#"
+ let json = r#"
{"c1": ["eleven"], "c2": [6.2222222225, -3.2, null], "c3": [5.0, 6], "c4": [false, true]}
{"c1": ["twelve"], "c2": [-55555555555555.2, 12500000.0], "c3": [3, 4, 5]}
{"c1": null, "c2": [3], "c3": [125, 127, 129], "c4": [null, false, true]}
@@ -88,21 +149,13 @@ fn json_list_primitive_to_record_batch() {
{"c1": ["fifteen"], "c2": [null, 2.1, 1.5, -3], "c4": [true, false, null]}
{"c1": ["fifteen"], "c2": [], "c4": [true, false, null]}
"#;
- let cursor = Cursor::new(json_content);
- let mut reader = builder.build(cursor).unwrap();
- #[allow(clippy::unit_arg)]
- criterion::black_box({
- reader.next().unwrap();
- });
+ do_bench(c, "small_bench_list", json, schema)
}
fn criterion_benchmark(c: &mut Criterion) {
- c.bench_function("json_primitive_to_record_batch", |b| {
- b.iter(json_primitive_to_record_batch)
- });
- c.bench_function("json_list_primitive_to_record_batch", |b| {
- b.iter(json_list_primitive_to_record_batch)
- });
+ small_bench_primitive(c);
+ large_bench_primitive(c);
+ small_bench_list(c);
}
criterion_group!(benches, criterion_benchmark);