You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/01/26 15:03:36 UTC

[arrow-rs] branch master updated: Add Raw JSON Reader (~2.5x faster) (#3479)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f1a92a5f Add Raw JSON Reader (~2.5x faster) (#3479)
0f1a92a5f is described below

commit 0f1a92a5f31916570d70b78562913cf877e8929c
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jan 26 15:03:30 2023 +0000

    Add Raw JSON Reader (~2.5x faster) (#3479)
    
    * Add Raw JSON Reader
    
    * Custom tape decoder
    
    * RAT
    
    * Cleanup
    
    * More columns in benchmark
    
    * CI fixes
    
    * Tweaks
    
    * Add List support
    
    * Add support for nested nulls
    
    * Remove unnecessary dependency
    
    * Add RawDecoder
    
    * Clippy
    
    * Fix List
    
    * Fix buffering
    
    * More tests
    
    * Add Send bounds
    
    * Fix variance
    
    * Review feedback
    
    * Add deprecation notices
    
    * Build RawDecoder with builder
    
    * Improve field estimate
    
    * Format
    
    * Handle unicode split over strings
    
    * Improve detection of invalid UTF-8 sequences
---
 arrow-json/Cargo.toml                 |   1 +
 arrow-json/src/lib.rs                 |   6 +-
 arrow-json/src/raw/boolean_array.rs   |  43 ++
 arrow-json/src/raw/list_array.rs      | 116 +++++
 arrow-json/src/raw/mod.rs             | 570 ++++++++++++++++++++++++
 arrow-json/src/raw/primitive_array.rs |  88 ++++
 arrow-json/src/raw/string_array.rs    |  67 +++
 arrow-json/src/raw/struct_array.rs    | 129 ++++++
 arrow-json/src/raw/tape.rs            | 801 ++++++++++++++++++++++++++++++++++
 arrow-json/src/reader.rs              |  19 +
 arrow/Cargo.toml                      |   2 +-
 arrow/benches/json_reader.rs          | 101 ++++-
 12 files changed, 1916 insertions(+), 27 deletions(-)

diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml
index 9b9095b27..c6aa9b486 100644
--- a/arrow-json/Cargo.toml
+++ b/arrow-json/Cargo.toml
@@ -48,6 +48,7 @@ indexmap = { version = "1.9", default-features = false, features = ["std"] }
 num = { version = "0.4", default-features = false, features = ["std"] }
 serde_json = { version = "1.0", default-features = false, features = ["std"] }
 chrono = { version = "0.4.23", default-features = false, features = ["clock"] }
+lexical-core = { version = "0.8", default-features = false }
 
 [dev-dependencies]
 tempfile = "3.3"
diff --git a/arrow-json/src/lib.rs b/arrow-json/src/lib.rs
index 0f1c0064f..7e582c335 100644
--- a/arrow-json/src/lib.rs
+++ b/arrow-json/src/lib.rs
@@ -25,8 +25,10 @@
 pub mod reader;
 pub mod writer;
 
-pub use self::reader::Reader;
-pub use self::reader::ReaderBuilder;
+mod raw;
+
+pub use self::raw::{RawDecoder, RawReader, RawReaderBuilder};
+pub use self::reader::{Reader, ReaderBuilder};
 pub use self::writer::{ArrayWriter, LineDelimitedWriter, Writer};
 use half::f16;
 use serde_json::{Number, Value};
diff --git a/arrow-json/src/raw/boolean_array.rs b/arrow-json/src/raw/boolean_array.rs
new file mode 100644
index 000000000..12917785e
--- /dev/null
+++ b/arrow-json/src/raw/boolean_array.rs
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::builder::BooleanBuilder;
+use arrow_array::Array;
+use arrow_data::ArrayData;
+use arrow_schema::ArrowError;
+
+use crate::raw::tape::{Tape, TapeElement};
+use crate::raw::{tape_error, ArrayDecoder};
+
+#[derive(Default)]
+pub struct BooleanArrayDecoder {}
+
+impl ArrayDecoder for BooleanArrayDecoder {
+    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
+        let mut builder = BooleanBuilder::with_capacity(pos.len());
+        for p in pos {
+            match tape.get(*p) {
+                TapeElement::Null => builder.append_null(),
+                TapeElement::True => builder.append_value(true),
+                TapeElement::False => builder.append_value(false),
+                d => return Err(tape_error(d, "boolean")),
+            }
+        }
+
+        Ok(builder.finish().into_data())
+    }
+}
diff --git a/arrow-json/src/raw/list_array.rs b/arrow-json/src/raw/list_array.rs
new file mode 100644
index 000000000..9d96885f9
--- /dev/null
+++ b/arrow-json/src/raw/list_array.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::tape::{Tape, TapeElement};
+use crate::raw::{make_decoder, tape_error, ArrayDecoder};
+use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
+use arrow_array::OffsetSizeTrait;
+use arrow_data::{ArrayData, ArrayDataBuilder};
+use arrow_schema::{ArrowError, DataType};
+use std::marker::PhantomData;
+
+pub struct ListArrayDecoder<O> {
+    data_type: DataType,
+    decoder: Box<dyn ArrayDecoder>,
+    phantom: PhantomData<O>,
+    is_nullable: bool,
+}
+
+impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
+    pub fn new(data_type: DataType, is_nullable: bool) -> Result<Self, ArrowError> {
+        let field = match &data_type {
+            DataType::List(f) if !O::IS_LARGE => f,
+            DataType::LargeList(f) if O::IS_LARGE => f,
+            _ => unreachable!(),
+        };
+        let decoder = make_decoder(field.data_type().clone(), field.is_nullable())?;
+
+        Ok(Self {
+            data_type,
+            decoder,
+            phantom: Default::default(),
+            is_nullable,
+        })
+    }
+}
+
+impl<O: OffsetSizeTrait> ArrayDecoder for ListArrayDecoder<O> {
+    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
+        let mut child_pos = Vec::with_capacity(pos.len());
+        let mut offsets = BufferBuilder::<O>::new(pos.len() + 1);
+        offsets.append(O::from_usize(0).unwrap());
+
+        let mut null_count = 0;
+        let mut nulls = self
+            .is_nullable
+            .then(|| BooleanBufferBuilder::new(pos.len()));
+
+        for p in pos {
+            let end_idx = match (tape.get(*p), nulls.as_mut()) {
+                (TapeElement::StartList(end_idx), None) => end_idx,
+                (TapeElement::StartList(end_idx), Some(nulls)) => {
+                    nulls.append(true);
+                    end_idx
+                }
+                (TapeElement::Null, Some(nulls)) => {
+                    nulls.append(false);
+                    null_count += 1;
+                    *p + 1
+                }
+                (d, _) => return Err(tape_error(d, "[")),
+            };
+
+            let mut cur_idx = *p + 1;
+            while cur_idx < end_idx {
+                child_pos.push(cur_idx);
+
+                // Advance to next field
+                cur_idx = match tape.get(cur_idx) {
+                    TapeElement::String(_)
+                    | TapeElement::Number(_)
+                    | TapeElement::True
+                    | TapeElement::False
+                    | TapeElement::Null => cur_idx + 1,
+                    TapeElement::StartList(end_idx) => end_idx + 1,
+                    TapeElement::StartObject(end_idx) => end_idx + 1,
+                    d => return Err(tape_error(d, "list value")),
+                }
+            }
+
+            let offset = O::from_usize(child_pos.len()).ok_or_else(|| {
+                ArrowError::JsonError(format!(
+                    "offset overflow decoding {}",
+                    self.data_type
+                ))
+            })?;
+            offsets.append(offset)
+        }
+
+        let child_data = self.decoder.decode(tape, &child_pos).unwrap();
+
+        let data = ArrayDataBuilder::new(self.data_type.clone())
+            .len(pos.len())
+            .null_bit_buffer(nulls.as_mut().map(|x| x.finish()))
+            .null_count(null_count)
+            .add_buffer(offsets.finish())
+            .child_data(vec![child_data]);
+
+        // Safety
+        // Validated lengths above
+        Ok(unsafe { data.build_unchecked() })
+    }
+}
diff --git a/arrow-json/src/raw/mod.rs b/arrow-json/src/raw/mod.rs
new file mode 100644
index 000000000..9ffa7d213
--- /dev/null
+++ b/arrow-json/src/raw/mod.rs
@@ -0,0 +1,570 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A faster JSON reader that will eventually replace [`Reader`]
+//!
+//! [`Reader`]: crate::reader::Reader
+
+use crate::raw::boolean_array::BooleanArrayDecoder;
+use crate::raw::list_array::ListArrayDecoder;
+use crate::raw::primitive_array::PrimitiveArrayDecoder;
+use crate::raw::string_array::StringArrayDecoder;
+use crate::raw::struct_array::StructArrayDecoder;
+use crate::raw::tape::{Tape, TapeDecoder, TapeElement};
+use arrow_array::types::*;
+use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader};
+use arrow_data::ArrayData;
+use arrow_schema::{ArrowError, DataType, SchemaRef};
+use std::io::BufRead;
+
+mod boolean_array;
+mod list_array;
+mod primitive_array;
+mod string_array;
+mod struct_array;
+mod tape;
+
+/// A builder for [`RawReader`] and [`RawDecoder`]
+pub struct RawReaderBuilder {
+    batch_size: usize,
+
+    schema: SchemaRef,
+}
+
+impl RawReaderBuilder {
+    /// Create a new [`RawReaderBuilder`] with the provided [`SchemaRef`]
+    ///
+    /// This could be obtained using [`infer_json_schema`] if not known
+    ///
+    /// Any columns not present in `schema` will be ignored
+    ///
+    /// [`infer_json_schema`]: crate::reader::infer_json_schema
+    pub fn new(schema: SchemaRef) -> Self {
+        Self {
+            batch_size: 1024,
+            schema,
+        }
+    }
+
+    /// Sets the batch size in rows to read
+    pub fn with_batch_size(self, batch_size: usize) -> Self {
+        Self { batch_size, ..self }
+    }
+
+    /// Create a [`RawReader`] with the provided [`BufRead`]
+    pub fn build<R: BufRead>(self, reader: R) -> Result<RawReader<R>, ArrowError> {
+        Ok(RawReader {
+            reader,
+            decoder: self.build_decoder()?,
+        })
+    }
+
+    /// Create a [`RawDecoder`]
+    pub fn build_decoder(self) -> Result<RawDecoder, ArrowError> {
+        let decoder = make_decoder(DataType::Struct(self.schema.fields.clone()), false)?;
+        let num_fields = self.schema.all_fields().len();
+
+        Ok(RawDecoder {
+            decoder,
+            tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
+            batch_size: self.batch_size,
+            schema: self.schema,
+        })
+    }
+}
+
+/// Reads JSON data with a known schema directly into arrow [`RecordBatch`]
+///
+/// This is significantly faster than [`Reader`] and eventually intended
+/// to replace it ([#3610](https://github.com/apache/arrow-rs/issues/3610))
+///
+/// Lines consisting solely of ASCII whitespace are ignored
+///
+/// [`Reader`]: crate::reader::Reader
+pub struct RawReader<R> {
+    reader: R,
+    decoder: RawDecoder,
+}
+
+impl<R> std::fmt::Debug for RawReader<R> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("RawReader")
+            .field("decoder", &self.decoder)
+            .finish()
+    }
+}
+
+impl<R: BufRead> RawReader<R> {
+    /// Reads the next [`RecordBatch`] returning `Ok(None)` if EOF
+    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        loop {
+            let buf = self.reader.fill_buf()?;
+            if buf.is_empty() {
+                break;
+            }
+            let read = buf.len();
+
+            let decoded = self.decoder.decode(buf)?;
+            self.reader.consume(decoded);
+            if decoded != read {
+                break;
+            }
+        }
+        self.decoder.flush()
+    }
+}
+
+impl<R: BufRead> Iterator for RawReader<R> {
+    type Item = Result<RecordBatch, ArrowError>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.read().transpose()
+    }
+}
+
+impl<R: BufRead> RecordBatchReader for RawReader<R> {
+    fn schema(&self) -> SchemaRef {
+        self.decoder.schema.clone()
+    }
+}
+
+/// A low-level interface for reading JSON data from a byte stream
+///
+/// See [`RawReader`] for a higher-level interface for interface with [`BufRead`]
+///
+/// The push-based interface facilitates integration with sources that yield arbitrarily
+/// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from
+/// object storage
+///
+/// ```
+/// # use std::io::BufRead;
+/// # use arrow_array::RecordBatch;
+/// # use arrow_json::{RawDecoder, RawReaderBuilder};
+/// # use arrow_schema::{ArrowError, SchemaRef};
+/// #
+/// fn read_from_json<R: BufRead>(
+///     mut reader: R,
+///     schema: SchemaRef,
+/// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
+///     let mut decoder = RawReaderBuilder::new(schema).build_decoder()?;
+///     let mut next = move || {
+///         loop {
+///             // RawDecoder is agnostic that buf doesn't contain whole records
+///             let buf = reader.fill_buf()?;
+///             if buf.is_empty() {
+///                 break; // Input exhausted
+///             }
+///             let read = buf.len();
+///             let decoded = decoder.decode(buf)?;
+///
+///             // Consume the number of bytes read
+///             reader.consume(decoded);
+///             if decoded != read {
+///                 break; // Read batch size
+///             }
+///         }
+///         decoder.flush()
+///     };
+///     Ok(std::iter::from_fn(move || next().transpose()))
+/// }
+/// ```
+pub struct RawDecoder {
+    tape_decoder: TapeDecoder,
+    decoder: Box<dyn ArrayDecoder>,
+    batch_size: usize,
+    schema: SchemaRef,
+}
+
+impl std::fmt::Debug for RawDecoder {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("RawDecoder")
+            .field("schema", &self.schema)
+            .field("batch_size", &self.batch_size)
+            .finish()
+    }
+}
+
+impl RawDecoder {
+    /// Read JSON objects from `buf`, returning the number of bytes read
+    ///
+    /// This method returns once `batch_size` objects have been parsed since the
+    /// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes
+    /// should be included in the next call to [`Self::decode`]
+    ///
+    /// There is no requirement that `buf` contains a whole number of records, facilitating
+    /// integration with arbitrary byte streams, such as that yielded by [`BufRead`]
+    pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
+        self.tape_decoder.decode(buf)
+    }
+
+    /// Flushes the currently buffered data to a [`RecordBatch`]
+    ///
+    /// Returns `Ok(None)` if no buffered data
+    ///
+    /// Note: if called part way through decoding a record, this will return an error
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        let tape = self.tape_decoder.finish()?;
+
+        if tape.num_rows() == 0 {
+            return Ok(None);
+        }
+
+        // First offset is null sentinel
+        let mut next_object = 1;
+        let pos: Vec<_> = (0..tape.num_rows())
+            .map(|_| {
+                let end = match tape.get(next_object) {
+                    TapeElement::StartObject(end) => end,
+                    _ => unreachable!("corrupt tape"),
+                };
+                std::mem::replace(&mut next_object, end + 1)
+            })
+            .collect();
+
+        let decoded = self.decoder.decode(&tape, &pos)?;
+        self.tape_decoder.clear();
+
+        // Sanity check
+        assert!(matches!(decoded.data_type(), DataType::Struct(_)));
+        assert_eq!(decoded.null_count(), 0);
+        assert_eq!(decoded.len(), pos.len());
+
+        // Clear out buffer
+        let columns = decoded
+            .child_data()
+            .iter()
+            .map(|x| make_array(x.clone()))
+            .collect();
+
+        let batch = RecordBatch::try_new(self.schema.clone(), columns)?;
+        Ok(Some(batch))
+    }
+}
+
+trait ArrayDecoder: Send {
+    /// Decode elements from `tape` starting at the indexes contained in `pos`
+    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
+}
+
+macro_rules! primitive_decoder {
+    ($t:ty, $data_type:expr) => {
+        Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
+    };
+}
+
+fn make_decoder(
+    data_type: DataType,
+    is_nullable: bool,
+) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
+    downcast_integer! {
+        data_type => (primitive_decoder, data_type),
+        DataType::Float32 => primitive_decoder!(Float32Type, data_type),
+        DataType::Float64 => primitive_decoder!(Float64Type, data_type),
+        DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
+        DataType::Utf8 => Ok(Box::<StringArrayDecoder::<i32>>::default()),
+        DataType::LargeUtf8 => Ok(Box::<StringArrayDecoder::<i64>>::default()),
+        DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, is_nullable)?)),
+        DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, is_nullable)?)),
+        DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, is_nullable)?)),
+        DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
+            Err(ArrowError::JsonError(format!("{} is not supported by JSON", data_type)))
+        }
+        d => Err(ArrowError::NotYetImplemented(format!("Support for {} in JSON reader", d)))
+    }
+}
+
+fn tape_error(d: TapeElement, expected: &str) -> ArrowError {
+    ArrowError::JsonError(format!("expected {expected} got {d}"))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::reader::infer_json_schema;
+    use crate::ReaderBuilder;
+    use arrow_array::cast::{
+        as_boolean_array, as_largestring_array, as_list_array, as_primitive_array,
+        as_string_array, as_struct_array,
+    };
+    use arrow_array::types::Int32Type;
+    use arrow_array::Array;
+    use arrow_schema::{DataType, Field, Schema};
+    use std::fs::File;
+    use std::io::{BufReader, Cursor, Seek};
+    use std::sync::Arc;
+
+    fn do_read(buf: &str, batch_size: usize, schema: SchemaRef) -> Vec<RecordBatch> {
+        let mut unbuffered = vec![];
+
+        // Test with different batch sizes to test for boundary conditions
+        for batch_size in [1, 3, 100, batch_size] {
+            unbuffered = RawReaderBuilder::new(schema.clone())
+                .with_batch_size(batch_size)
+                .build(Cursor::new(buf.as_bytes()))
+                .unwrap()
+                .collect::<Result<Vec<_>, _>>()
+                .unwrap();
+
+            for b in unbuffered.iter().take(unbuffered.len() - 1) {
+                assert_eq!(b.num_rows(), batch_size)
+            }
+
+            // Test with different buffer sizes to test for boundary conditions
+            for b in [1, 3, 5] {
+                let buffered = RawReaderBuilder::new(schema.clone())
+                    .with_batch_size(batch_size)
+                    .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
+                    .unwrap()
+                    .collect::<Result<Vec<_>, _>>()
+                    .unwrap();
+                assert_eq!(unbuffered, buffered);
+            }
+        }
+
+        unbuffered
+    }
+
+    #[test]
+    fn test_basic() {
+        let buf = r#"
+        {"a": 1, "b": 2, "c": true}
+        {"a": 2E0, "b": 4, "c": false}
+
+        {"b": 6, "a": 2.0}
+        {"b": "5", "a": 2}
+        {"b": 4e0}
+        {"b": 7, "a": null}
+        "#;
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int64, true),
+            Field::new("b", DataType::Int32, true),
+            Field::new("c", DataType::Boolean, true),
+        ]));
+
+        let batches = do_read(buf, 1024, schema);
+        assert_eq!(batches.len(), 1);
+
+        let col1 = as_primitive_array::<Int64Type>(batches[0].column(0));
+        assert_eq!(col1.null_count(), 2);
+        assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
+        assert!(col1.is_null(4));
+        assert!(col1.is_null(5));
+
+        let col2 = as_primitive_array::<Int32Type>(batches[0].column(1));
+        assert_eq!(col2.null_count(), 0);
+        assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
+
+        let col3 = as_boolean_array(batches[0].column(2));
+        assert_eq!(col3.null_count(), 4);
+        assert!(col3.value(0));
+        assert!(!col3.is_null(0));
+        assert!(!col3.value(1));
+        assert!(!col3.is_null(1));
+    }
+
+    #[test]
+    fn test_string() {
+        let buf = r#"
+        {"a": "1", "b": "2"}
+        {"a": "hello", "b": "shoo"}
+        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
+        
+        {"b": null}
+        {"b": "", "a": null}
+
+        "#;
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Utf8, true),
+            Field::new("b", DataType::LargeUtf8, true),
+        ]));
+
+        let batches = do_read(buf, 1024, schema);
+        assert_eq!(batches.len(), 1);
+
+        let col1 = as_string_array(batches[0].column(0));
+        assert_eq!(col1.null_count(), 2);
+        assert_eq!(col1.value(0), "1");
+        assert_eq!(col1.value(1), "hello");
+        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
+        assert!(col1.is_null(3));
+        assert!(col1.is_null(4));
+
+        let col2 = as_largestring_array(batches[0].column(1));
+        assert_eq!(col2.null_count(), 1);
+        assert_eq!(col2.value(0), "2");
+        assert_eq!(col2.value(1), "shoo");
+        assert_eq!(col2.value(2), "\t😁foo");
+        assert!(col2.is_null(3));
+        assert_eq!(col2.value(4), "");
+    }
+
+    #[test]
+    fn test_complex() {
+        let buf = r#"
+           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
+           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
+           {"list": null, "nested": {"a": null}}
+        "#;
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new(
+                "list",
+                DataType::List(Box::new(Field::new("element", DataType::Int32, false))),
+                true,
+            ),
+            Field::new(
+                "nested",
+                DataType::Struct(vec![
+                    Field::new("a", DataType::Int32, false),
+                    Field::new("b", DataType::Int32, false),
+                ]),
+                true,
+            ),
+            Field::new(
+                "nested_list",
+                DataType::Struct(vec![Field::new(
+                    "list2",
+                    DataType::List(Box::new(Field::new(
+                        "element",
+                        DataType::Struct(vec![Field::new("c", DataType::Int32, false)]),
+                        false,
+                    ))),
+                    true,
+                )]),
+                true,
+            ),
+        ]));
+
+        let batches = do_read(buf, 1024, schema);
+        assert_eq!(batches.len(), 1);
+
+        let list = as_list_array(batches[0].column(0).as_ref());
+        assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
+        assert_eq!(list.null_count(), 1);
+        assert!(list.is_null(4));
+        let list_values = as_primitive_array::<Int32Type>(list.values().as_ref());
+        assert_eq!(list_values.values(), &[5, 6]);
+
+        let nested = as_struct_array(batches[0].column(1).as_ref());
+        let a = as_primitive_array::<Int32Type>(nested.column(0).as_ref());
+        assert_eq!(list.null_count(), 1);
+        assert_eq!(a.values(), &[1, 7, 0]);
+        assert!(list.is_null(2));
+
+        let b = as_primitive_array::<Int32Type>(nested.column(1).as_ref());
+        assert_eq!(b.null_count(), 2);
+        assert_eq!(b.len(), 3);
+        assert_eq!(b.value(0), 2);
+        assert!(b.is_null(1));
+        assert!(b.is_null(2));
+
+        let nested_list = as_struct_array(batches[0].column(2).as_ref());
+        let list2 = as_list_array(nested_list.column(0).as_ref());
+        assert_eq!(list2.null_count(), 1);
+        assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
+        assert!(list2.is_null(3));
+
+        let list2_values = as_struct_array(list2.values().as_ref());
+
+        let c = as_primitive_array::<Int32Type>(list2_values.column(0));
+        assert_eq!(c.values(), &[3, 4]);
+    }
+
+    #[test]
+    fn test_projection() {
+        let buf = r#"
+           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
+           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
+        "#;
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new(
+                "nested",
+                DataType::Struct(vec![Field::new("a", DataType::Int32, false)]),
+                true,
+            ),
+            Field::new(
+                "nested_list",
+                DataType::Struct(vec![Field::new(
+                    "list2",
+                    DataType::List(Box::new(Field::new(
+                        "element",
+                        DataType::Struct(vec![Field::new("d", DataType::Int32, false)]),
+                        false,
+                    ))),
+                    true,
+                )]),
+                true,
+            ),
+        ]));
+
+        let batches = do_read(buf, 1024, schema);
+        assert_eq!(batches.len(), 1);
+
+        let nested = as_struct_array(batches[0].column(0).as_ref());
+        assert_eq!(nested.num_columns(), 1);
+        let a = as_primitive_array::<Int32Type>(nested.column(0).as_ref());
+        assert_eq!(a.null_count(), 0);
+        assert_eq!(a.values(), &[1, 7]);
+
+        let nested_list = as_struct_array(batches[0].column(1).as_ref());
+        assert_eq!(nested_list.num_columns(), 1);
+        assert_eq!(nested_list.null_count(), 0);
+
+        let list2 = as_list_array(nested_list.column(0).as_ref());
+        assert_eq!(list2.value_offsets(), &[0, 2, 2]);
+        assert_eq!(list2.null_count(), 0);
+
+        let child = as_struct_array(list2.values().as_ref());
+        assert_eq!(child.num_columns(), 1);
+        assert_eq!(child.len(), 2);
+        assert_eq!(child.null_count(), 0);
+
+        let c = as_primitive_array::<Int32Type>(child.column(0).as_ref());
+        assert_eq!(c.values(), &[5, 0]);
+        assert_eq!(c.null_count(), 1);
+        assert!(c.is_null(1));
+    }
+
+    #[test]
+    fn integration_test() {
+        let files = [
+            "test/data/basic.json",
+            "test/data/basic_nulls.json",
+            "test/data/list_string_dict_nested_nulls.json",
+        ];
+
+        for file in files {
+            let mut f = BufReader::new(File::open(file).unwrap());
+            let schema = Arc::new(infer_json_schema(&mut f, None).unwrap());
+
+            f.rewind().unwrap();
+            let a = ReaderBuilder::new()
+                .with_schema(schema.clone())
+                .build(&mut f)
+                .unwrap();
+            let a_result = a.into_iter().collect::<Result<Vec<_>, _>>().unwrap();
+
+            f.rewind().unwrap();
+            let b = RawReaderBuilder::new(schema).build(f).unwrap();
+            let b_result = b.into_iter().collect::<Result<Vec<_>, _>>().unwrap();
+
+            assert_eq!(a_result, b_result);
+        }
+    }
+}
diff --git a/arrow-json/src/raw/primitive_array.rs b/arrow-json/src/raw/primitive_array.rs
new file mode 100644
index 000000000..72ce30203
--- /dev/null
+++ b/arrow-json/src/raw/primitive_array.rs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use num::NumCast;
+use std::marker::PhantomData;
+
+use arrow_array::builder::PrimitiveBuilder;
+use arrow_array::{Array, ArrowPrimitiveType};
+use arrow_cast::parse::Parser;
+use arrow_data::ArrayData;
+use arrow_schema::{ArrowError, DataType};
+
+use crate::raw::tape::{Tape, TapeElement};
+use crate::raw::{tape_error, ArrayDecoder};
+
+pub struct PrimitiveArrayDecoder<P: ArrowPrimitiveType> {
+    data_type: DataType,
+    // Invariant and Send
+    phantom: PhantomData<fn(P) -> P>,
+}
+
+impl<P: ArrowPrimitiveType> PrimitiveArrayDecoder<P> {
+    pub fn new(data_type: DataType) -> Self {
+        Self {
+            data_type,
+            phantom: Default::default(),
+        }
+    }
+}
+
+impl<P> ArrayDecoder for PrimitiveArrayDecoder<P>
+where
+    P: ArrowPrimitiveType + Parser,
+    P::Native: NumCast,
+{
+    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
+        let mut builder = PrimitiveBuilder::<P>::with_capacity(pos.len())
+            .with_data_type(self.data_type.clone());
+
+        for p in pos {
+            match tape.get(*p) {
+                TapeElement::Null => builder.append_null(),
+                TapeElement::String(idx) => {
+                    let s = tape.get_string(idx);
+                    let value = P::parse(s).ok_or_else(|| {
+                        ArrowError::JsonError(format!(
+                            "failed to parse \"{s}\" as {}",
+                            self.data_type
+                        ))
+                    })?;
+
+                    builder.append_value(value)
+                }
+                TapeElement::Number(idx) => {
+                    let s = tape.get_string(idx);
+                    let value = lexical_core::parse::<f64>(s.as_bytes())
+                        .ok()
+                        .and_then(NumCast::from)
+                        .ok_or_else(|| {
+                            ArrowError::JsonError(format!(
+                                "failed to parse {s} as {}",
+                                self.data_type
+                            ))
+                        })?;
+
+                    builder.append_value(value)
+                }
+                d => return Err(tape_error(d, "primitive")),
+            }
+        }
+
+        Ok(builder.finish().into_data())
+    }
+}
diff --git a/arrow-json/src/raw/string_array.rs b/arrow-json/src/raw/string_array.rs
new file mode 100644
index 000000000..31a7a99be
--- /dev/null
+++ b/arrow-json/src/raw/string_array.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::builder::GenericStringBuilder;
+use arrow_array::{Array, GenericStringArray, OffsetSizeTrait};
+use arrow_data::ArrayData;
+use arrow_schema::ArrowError;
+use std::marker::PhantomData;
+
+use crate::raw::tape::{Tape, TapeElement};
+use crate::raw::{tape_error, ArrayDecoder};
+
+#[derive(Default)]
+pub struct StringArrayDecoder<O: OffsetSizeTrait> {
+    phantom: PhantomData<O>,
+}
+
+impl<O: OffsetSizeTrait> ArrayDecoder for StringArrayDecoder<O> {
+    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
+        let mut data_capacity = 0;
+        for p in pos {
+            match tape.get(*p) {
+                TapeElement::String(idx) => {
+                    data_capacity += tape.get_string(idx).len();
+                }
+                TapeElement::Null => {}
+                d => return Err(tape_error(d, "string")),
+            }
+        }
+
+        if O::from_usize(data_capacity).is_none() {
+            return Err(ArrowError::JsonError(format!(
+                "offset overflow decoding {}",
+                GenericStringArray::<O>::DATA_TYPE
+            )));
+        }
+
+        let mut builder =
+            GenericStringBuilder::<O>::with_capacity(pos.len(), data_capacity);
+
+        for p in pos {
+            match tape.get(*p) {
+                TapeElement::String(idx) => {
+                    builder.append_value(tape.get_string(idx));
+                }
+                TapeElement::Null => builder.append_null(),
+                _ => unreachable!(),
+            }
+        }
+
+        Ok(builder.finish().into_data())
+    }
+}
diff --git a/arrow-json/src/raw/struct_array.rs b/arrow-json/src/raw/struct_array.rs
new file mode 100644
index 000000000..3b7895f37
--- /dev/null
+++ b/arrow-json/src/raw/struct_array.rs
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::tape::{Tape, TapeElement};
+use crate::raw::{make_decoder, tape_error, ArrayDecoder};
+use arrow_array::builder::BooleanBufferBuilder;
+use arrow_data::{ArrayData, ArrayDataBuilder};
+use arrow_schema::{ArrowError, DataType, Field};
+
+pub struct StructArrayDecoder {
+    data_type: DataType,
+    decoders: Vec<Box<dyn ArrayDecoder>>,
+    is_nullable: bool,
+}
+
+impl StructArrayDecoder {
+    pub fn new(data_type: DataType, is_nullable: bool) -> Result<Self, ArrowError> {
+        let decoders = struct_fields(&data_type)
+            .iter()
+            .map(|f| make_decoder(f.data_type().clone(), f.is_nullable()))
+            .collect::<Result<Vec<_>, ArrowError>>()?;
+
+        Ok(Self {
+            data_type,
+            decoders,
+            is_nullable,
+        })
+    }
+}
+
+impl ArrayDecoder for StructArrayDecoder {
+    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
+        let fields = struct_fields(&self.data_type);
+        let mut child_pos: Vec<_> =
+            (0..fields.len()).map(|_| vec![0; pos.len()]).collect();
+
+        let mut null_count = 0;
+        let mut nulls = self
+            .is_nullable
+            .then(|| BooleanBufferBuilder::new(pos.len()));
+
+        for (row, p) in pos.iter().enumerate() {
+            let end_idx = match (tape.get(*p), nulls.as_mut()) {
+                (TapeElement::StartObject(end_idx), None) => end_idx,
+                (TapeElement::StartObject(end_idx), Some(nulls)) => {
+                    nulls.append(true);
+                    end_idx
+                }
+                (TapeElement::Null, Some(nulls)) => {
+                    nulls.append(false);
+                    null_count += 1;
+                    continue;
+                }
+                (d, _) => return Err(tape_error(d, "{")),
+            };
+
+            let mut cur_idx = *p + 1;
+            while cur_idx < end_idx {
+                // Read field name
+                let field_name = match tape.get(cur_idx) {
+                    TapeElement::String(s) => tape.get_string(s),
+                    d => return Err(tape_error(d, "field name")),
+                };
+
+                // Update child pos if match found
+                if let Some(field_idx) =
+                    fields.iter().position(|x| x.name() == field_name)
+                {
+                    child_pos[field_idx][row] = cur_idx + 1;
+                }
+
+                // Advance to next field
+                cur_idx = match tape.get(cur_idx + 1) {
+                    TapeElement::String(_)
+                    | TapeElement::Number(_)
+                    | TapeElement::True
+                    | TapeElement::False
+                    | TapeElement::Null => cur_idx + 2,
+                    TapeElement::StartList(end_idx) => end_idx + 1,
+                    TapeElement::StartObject(end_idx) => end_idx + 1,
+                    d => return Err(tape_error(d, "field value")),
+                }
+            }
+        }
+
+        let child_data = self
+            .decoders
+            .iter_mut()
+            .zip(child_pos)
+            .map(|(d, pos)| d.decode(tape, &pos))
+            .collect::<Result<Vec<_>, ArrowError>>()?;
+
+        // Sanity check
+        child_data
+            .iter()
+            .for_each(|x| assert_eq!(x.len(), pos.len()));
+
+        let data = ArrayDataBuilder::new(self.data_type.clone())
+            .len(pos.len())
+            .null_count(null_count)
+            .null_bit_buffer(nulls.as_mut().map(|x| x.finish()))
+            .child_data(child_data);
+
+        // Safety
+        // Validated lengths above
+        Ok(unsafe { data.build_unchecked() })
+    }
+}
+
+fn struct_fields(data_type: &DataType) -> &[Field] {
+    match &data_type {
+        DataType::Struct(f) => f,
+        _ => unreachable!(),
+    }
+}
diff --git a/arrow-json/src/raw/tape.rs b/arrow-json/src/raw/tape.rs
new file mode 100644
index 000000000..6ca4e2d3f
--- /dev/null
+++ b/arrow-json/src/raw/tape.rs
@@ -0,0 +1,801 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_schema::ArrowError;
+use std::fmt::{Display, Formatter};
+
+/// We decode JSON to a flattened tape representation,
+/// allowing for efficient traversal of the JSON data
+///
+/// This approach is inspired by [simdjson]
+///
+/// Uses `u32` for offsets to ensure `TapeElement` is 64-bits. A future
+/// iteration may increase this to a custom `u56` type.
+///
+/// [simdjson]: https://github.com/simdjson/simdjson/blob/master/doc/tape.md
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub enum TapeElement {
+    /// The start of an object, i.e. `{`
+    ///
+    /// Contains the offset of the corresponding [`Self::EndObject`]
+    StartObject(u32),
+    /// The end of an object, i.e. `}`
+    ///
+    /// Contains the offset of the corresponding [`Self::StartObject`]
+    EndObject(u32),
+    /// The start of a list , i.e. `[`
+    ///
+    /// Contains the offset of the corresponding [`Self::EndList`]
+    StartList(u32),
+    /// The end of a list , i.e. `]`
+    ///
+    /// Contains the offset of the corresponding [`Self::StartList`]
+    EndList(u32),
+    /// A string value
+    ///
+    /// Contains the offset into the [`Tape`] string data
+    String(u32),
+    /// A numeric value
+    ///
+    /// Contains the offset into the [`Tape`] string data
+    Number(u32),
+    /// A true literal
+    True,
+    /// A false literal
+    False,
+    /// A null literal
+    Null,
+}
+
+impl Display for TapeElement {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            TapeElement::StartObject(_) => write!(f, "{{"),
+            TapeElement::EndObject(_) => write!(f, "}}"),
+            TapeElement::StartList(_) => write!(f, "["),
+            TapeElement::EndList(_) => write!(f, "]"),
+            TapeElement::String(_) => write!(f, "string"),
+            TapeElement::Number(_) => write!(f, "number"),
+            TapeElement::True => write!(f, "true"),
+            TapeElement::False => write!(f, "false"),
+            TapeElement::Null => write!(f, "null"),
+        }
+    }
+}
+
+/// A decoded JSON tape
+///
+/// String and numeric data is stored alongside an array of [`TapeElement`]
+///
+/// The first element is always [`TapeElement::Null`]
+///
+/// This approach to decoding JSON is inspired by [simdjson]
+///
+/// [simdjson]: https://github.com/simdjson/simdjson/blob/master/doc/tape.md
+#[derive(Debug)]
+pub struct Tape<'a> {
+    elements: &'a [TapeElement],
+    strings: &'a str,
+    string_offsets: &'a [usize],
+    num_rows: usize,
+}
+
+impl<'a> Tape<'a> {
+    /// Returns the string for the given string index
+    #[inline]
+    pub fn get_string(&self, idx: u32) -> &'a str {
+        let end_offset = self.string_offsets[idx as usize + 1];
+        let start_offset = self.string_offsets[idx as usize];
+        // SAFETY:
+        // Verified offsets
+        unsafe { self.strings.get_unchecked(start_offset..end_offset) }
+    }
+
+    /// Returns the tape element at `idx`
+    pub fn get(&self, idx: u32) -> TapeElement {
+        self.elements[idx as usize]
+    }
+
+    /// Returns the number of rows
+    pub fn num_rows(&self) -> usize {
+        self.num_rows
+    }
+}
+
+/// States based on <https://www.json.org/json-en.html>
+#[derive(Debug, Copy, Clone)]
+enum DecoderState {
+    /// Decoding an object
+    ///
+    /// Contains index of start [`TapeElement::StartObject`]
+    Object(u32),
+    /// Decoding a list
+    ///
+    /// Contains index of start [`TapeElement::StartList`]
+    List(u32),
+    String,
+    Value,
+    Number,
+    Colon,
+    Escape,
+    /// A unicode escape sequence,
+    ///
+    /// Consists of a `(low surrogate, high surrogate, decoded length)`
+    Unicode(u16, u16, u8),
+    /// A boolean or null literal
+    ///
+    /// Consists of `(literal, decoded length)`
+    Literal(Literal, u8),
+}
+
+impl DecoderState {
+    fn as_str(&self) -> &'static str {
+        match self {
+            DecoderState::Object(_) => "object",
+            DecoderState::List(_) => "list",
+            DecoderState::String => "string",
+            DecoderState::Value => "value",
+            DecoderState::Number => "number",
+            DecoderState::Colon => "colon",
+            DecoderState::Escape => "escape",
+            DecoderState::Unicode(_, _, _) => "unicode literal",
+            DecoderState::Literal(d, _) => d.as_str(),
+        }
+    }
+}
+
+#[derive(Debug, Copy, Clone)]
+enum Literal {
+    Null,
+    True,
+    False,
+}
+
+impl Literal {
+    fn element(&self) -> TapeElement {
+        match self {
+            Literal::Null => TapeElement::Null,
+            Literal::True => TapeElement::True,
+            Literal::False => TapeElement::False,
+        }
+    }
+
+    fn as_str(&self) -> &'static str {
+        match self {
+            Literal::Null => "null",
+            Literal::True => "true",
+            Literal::False => "false",
+        }
+    }
+
+    fn bytes(&self) -> &'static [u8] {
+        self.as_str().as_bytes()
+    }
+}
+
+/// Evaluates to the next element in the iterator or breaks the current loop
+macro_rules! next {
+    ($next:ident) => {
+        match $next.next() {
+            Some(b) => b,
+            None => break,
+        }
+    };
+}
+
+/// Implements a state machine for decoding JSON to a tape
+pub struct TapeDecoder {
+    elements: Vec<TapeElement>,
+
+    num_rows: usize,
+
+    /// Number of rows to read per batch
+    batch_size: usize,
+
+    /// A buffer of parsed string data
+    ///
+    /// Note: if part way through a record, i.e. `stack` is not empty,
+    /// this may contain truncated UTF-8 data
+    bytes: Vec<u8>,
+
+    /// Offsets into `data`
+    offsets: Vec<usize>,
+
+    /// A stack of [`DecoderState`]
+    stack: Vec<DecoderState>,
+}
+
+impl TapeDecoder {
+    /// Create a new [`TapeDecoder`] with the provided batch size
+    /// and an estimated number of fields in each row
+    pub fn new(batch_size: usize, num_fields: usize) -> Self {
+        let tokens_per_row = 2 + num_fields * 2;
+        let mut offsets = Vec::with_capacity(batch_size * (num_fields * 2) + 1);
+        offsets.push(0);
+
+        let mut elements = Vec::with_capacity(batch_size * tokens_per_row);
+        elements.push(TapeElement::Null);
+
+        Self {
+            offsets,
+            elements,
+            batch_size,
+            num_rows: 0,
+            bytes: Vec::with_capacity(num_fields * 2 * 8),
+            stack: Vec::with_capacity(10),
+        }
+    }
+
+    pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
+        if self.num_rows >= self.batch_size {
+            return Ok(0);
+        }
+
+        let mut iter = BufIter::new(buf);
+
+        while !iter.is_empty() {
+            match self.stack.last_mut() {
+                // Start of row
+                None => {
+                    // Skip over leading whitespace
+                    iter.skip_whitespace();
+                    match next!(iter) {
+                        b'{' => {
+                            let idx = self.elements.len() as u32;
+                            self.stack.push(DecoderState::Object(idx));
+                            self.elements.push(TapeElement::StartObject(u32::MAX));
+                        }
+                        b => return Err(err(b, "trimming leading whitespace")),
+                    }
+                }
+                // Decoding an object
+                Some(DecoderState::Object(start_idx)) => {
+                    iter.advance_until(|b| !json_whitespace(b) && b != b',');
+                    match next!(iter) {
+                        b'"' => {
+                            self.stack.push(DecoderState::Value);
+                            self.stack.push(DecoderState::Colon);
+                            self.stack.push(DecoderState::String);
+                        }
+                        b'}' => {
+                            let start_idx = *start_idx;
+                            let end_idx = self.elements.len() as u32;
+                            self.elements[start_idx as usize] =
+                                TapeElement::StartObject(end_idx);
+                            self.elements.push(TapeElement::EndObject(start_idx));
+                            self.stack.pop();
+                            self.num_rows += self.stack.is_empty() as usize;
+                            if self.num_rows >= self.batch_size {
+                                break;
+                            }
+                        }
+                        b => return Err(err(b, "parsing object")),
+                    }
+                }
+                // Decoding a list
+                Some(DecoderState::List(start_idx)) => {
+                    iter.advance_until(|b| !json_whitespace(b) && b != b',');
+                    match iter.peek() {
+                        Some(b']') => {
+                            iter.next();
+                            let start_idx = *start_idx;
+                            let end_idx = self.elements.len() as u32;
+                            self.elements[start_idx as usize] =
+                                TapeElement::StartList(end_idx);
+                            self.elements.push(TapeElement::EndList(start_idx));
+                            self.stack.pop();
+                        }
+                        Some(_) => self.stack.push(DecoderState::Value),
+                        None => break,
+                    }
+                }
+                // Decoding a string
+                Some(DecoderState::String) => {
+                    let s = iter.advance_until(|b| matches!(b, b'\\' | b'"'));
+                    self.bytes.extend_from_slice(s);
+
+                    match next!(iter) {
+                        b'\\' => self.stack.push(DecoderState::Escape),
+                        b'"' => {
+                            let idx = self.offsets.len() - 1;
+                            self.elements.push(TapeElement::String(idx as _));
+                            self.offsets.push(self.bytes.len());
+                            self.stack.pop();
+                        }
+                        b => unreachable!("{}", b),
+                    }
+                }
+                Some(state @ DecoderState::Value) => {
+                    iter.skip_whitespace();
+                    *state = match next!(iter) {
+                        b'"' => DecoderState::String,
+                        b @ b'-' | b @ b'0'..=b'9' => {
+                            self.bytes.push(b);
+                            DecoderState::Number
+                        }
+                        b'n' => DecoderState::Literal(Literal::Null, 1),
+                        b'f' => DecoderState::Literal(Literal::False, 1),
+                        b't' => DecoderState::Literal(Literal::True, 1),
+                        b'[' => {
+                            let idx = self.elements.len() as u32;
+                            self.elements.push(TapeElement::StartList(u32::MAX));
+                            DecoderState::List(idx)
+                        }
+                        b'{' => {
+                            let idx = self.elements.len() as u32;
+                            self.elements.push(TapeElement::StartObject(u32::MAX));
+                            DecoderState::Object(idx)
+                        }
+                        b => return Err(err(b, "parsing value")),
+                    };
+                }
+                Some(DecoderState::Number) => {
+                    let s = iter.advance_until(|b| {
+                        !matches!(b, b'0'..=b'9' | b'-' | b'+' | b'.' | b'e' | b'E')
+                    });
+                    self.bytes.extend_from_slice(s);
+
+                    if !iter.is_empty() {
+                        self.stack.pop();
+                        let idx = self.offsets.len() - 1;
+                        self.elements.push(TapeElement::Number(idx as _));
+                        self.offsets.push(self.bytes.len());
+                    }
+                }
+                Some(DecoderState::Colon) => {
+                    iter.skip_whitespace();
+                    match next!(iter) {
+                        b':' => self.stack.pop(),
+                        b => return Err(err(b, "parsing colon")),
+                    };
+                }
+                Some(DecoderState::Literal(literal, idx)) => {
+                    let bytes = literal.bytes();
+                    let expected = bytes.iter().skip(*idx as usize).copied();
+                    for (expected, b) in expected.zip(&mut iter) {
+                        match b == expected {
+                            true => *idx += 1,
+                            false => return Err(err(b, "parsing literal")),
+                        }
+                    }
+                    if *idx == bytes.len() as u8 {
+                        let element = literal.element();
+                        self.stack.pop();
+                        self.elements.push(element);
+                    }
+                }
+                Some(DecoderState::Escape) => {
+                    let v = match next!(iter) {
+                        b'u' => {
+                            self.stack.pop();
+                            self.stack.push(DecoderState::Unicode(0, 0, 0));
+                            continue;
+                        }
+                        b'"' => b'"',
+                        b'\\' => b'\\',
+                        b'/' => b'/',
+                        b'b' => 8,  // BS
+                        b'f' => 12, // FF
+                        b'n' => b'\n',
+                        b'r' => b'\r',
+                        b't' => b'\t',
+                        b => return Err(err(b, "parsing escape sequence")),
+                    };
+
+                    self.stack.pop();
+                    self.bytes.push(v);
+                }
+                // Parse a unicode escape sequence
+                Some(DecoderState::Unicode(high, low, idx)) => loop {
+                    match *idx {
+                        0..=3 => *high = *high << 4 | parse_hex(next!(iter))? as u16,
+                        4 => {
+                            if let Some(c) = char::from_u32(*high as u32) {
+                                write_char(c, &mut self.bytes);
+                                self.stack.pop();
+                                break;
+                            }
+
+                            match next!(iter) {
+                                b'\\' => {}
+                                b => return Err(err(b, "parsing surrogate pair escape")),
+                            }
+                        }
+                        5 => match next!(iter) {
+                            b'u' => {}
+                            b => return Err(err(b, "parsing surrogate pair unicode")),
+                        },
+                        6..=9 => *low = *low << 4 | parse_hex(next!(iter))? as u16,
+                        _ => {
+                            let c = char_from_surrogate_pair(*low, *high)?;
+                            write_char(c, &mut self.bytes);
+                            self.stack.pop();
+                            break;
+                        }
+                    }
+                    *idx += 1;
+                },
+            }
+        }
+
+        Ok(buf.len() - iter.len())
+    }
+
+    /// Finishes the current [`Tape`]
+    pub fn finish(&self) -> Result<Tape<'_>, ArrowError> {
+        if let Some(b) = self.stack.last() {
+            return Err(ArrowError::JsonError(format!(
+                "Truncated record whilst reading {}",
+                b.as_str()
+            )));
+        }
+
+        if self.offsets.len() >= u32::MAX as usize {
+            return Err(ArrowError::JsonError(format!("Encountered more than {} bytes of string data, consider using a smaller batch size", u32::MAX)));
+        }
+
+        if self.offsets.len() >= u32::MAX as usize {
+            return Err(ArrowError::JsonError(format!("Encountered more than {} JSON elements, consider using a smaller batch size", u32::MAX)));
+        }
+
+        // Sanity check
+        assert_eq!(
+            self.offsets.last().copied().unwrap_or_default(),
+            self.bytes.len()
+        );
+
+        let strings = std::str::from_utf8(&self.bytes).map_err(|_| {
+            ArrowError::JsonError("Encountered non-UTF-8 data".to_string())
+        })?;
+
+        for offset in self.offsets.iter().copied() {
+            if !strings.is_char_boundary(offset) {
+                return Err(ArrowError::JsonError(
+                    "Encountered truncated UTF-8 sequence".to_string(),
+                ));
+            }
+        }
+
+        Ok(Tape {
+            strings,
+            elements: &self.elements,
+            string_offsets: &self.offsets,
+            num_rows: self.num_rows,
+        })
+    }
+
+    /// Clears this [`TapeDecoder`] in preparation to read the next batch
+    pub fn clear(&mut self) {
+        assert!(self.stack.is_empty());
+
+        self.num_rows = 0;
+        self.bytes.clear();
+        self.elements.clear();
+        self.elements.push(TapeElement::Null);
+        self.offsets.clear();
+        self.offsets.push(0);
+    }
+}
+
+/// A wrapper around a slice iterator that provides some helper functionality
+struct BufIter<'a>(std::slice::Iter<'a, u8>);
+
+impl<'a> BufIter<'a> {
+    fn new(buf: &'a [u8]) -> Self {
+        Self(buf.iter())
+    }
+
+    fn as_slice(&self) -> &'a [u8] {
+        self.0.as_slice()
+    }
+
+    fn is_empty(&self) -> bool {
+        self.0.len() == 0
+    }
+
+    fn peek(&self) -> Option<u8> {
+        self.0.as_slice().first().copied()
+    }
+
+    fn advance(&mut self, skip: usize) {
+        for _ in 0..skip {
+            self.0.next();
+        }
+    }
+
+    fn advance_until<F: FnMut(u8) -> bool>(&mut self, f: F) -> &[u8] {
+        let s = self.as_slice();
+        match s.iter().copied().position(f) {
+            Some(x) => {
+                self.advance(x);
+                &s[..x]
+            }
+            None => {
+                self.advance(s.len());
+                s
+            }
+        }
+    }
+
+    fn skip_whitespace(&mut self) {
+        self.advance_until(|b| !json_whitespace(b));
+    }
+}
+
+impl<'a> Iterator for BufIter<'a> {
+    type Item = u8;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.0.next().copied()
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.0.size_hint()
+    }
+}
+
+impl<'a> ExactSizeIterator for BufIter<'a> {}
+
+/// Returns an error for a given byte `b` and context `ctx`
+fn err(b: u8, ctx: &str) -> ArrowError {
+    ArrowError::JsonError(format!(
+        "Encountered unexpected '{}' whilst {ctx}",
+        b as char
+    ))
+}
+
+/// Creates a character from an UTF-16 surrogate pair
+fn char_from_surrogate_pair(low: u16, high: u16) -> Result<char, ArrowError> {
+    let n = (((high - 0xD800) as u32) << 10 | (low - 0xDC00) as u32) + 0x1_0000;
+    char::from_u32(n).ok_or_else(|| {
+        ArrowError::JsonError(format!("Invalid UTF-16 surrogate pair {}", n))
+    })
+}
+
+/// Writes `c` as UTF-8 to `out`
+fn write_char(c: char, out: &mut Vec<u8>) {
+    let mut t = [0; 4];
+    out.extend_from_slice(c.encode_utf8(&mut t).as_bytes());
+}
+
+/// Evaluates to true if `b` is a valid JSON whitespace character
+#[inline]
+fn json_whitespace(b: u8) -> bool {
+    matches!(b, b' ' | b'\n' | b'\r' | b'\t')
+}
+
+/// Parse a hex character to `u8`
+fn parse_hex(b: u8) -> Result<u8, ArrowError> {
+    let digit = char::from(b)
+        .to_digit(16)
+        .ok_or_else(|| err(b, "unicode escape"))?;
+    Ok(digit as u8)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_sizes() {
+        assert_eq!(std::mem::size_of::<DecoderState>(), 8);
+        assert_eq!(std::mem::size_of::<TapeElement>(), 8);
+    }
+
+    #[test]
+    fn test_basic() {
+        let a = r#"
+        {"hello": "world", "foo": 2, "bar": 45}
+
+        {"foo": "bar"}
+
+        {"fiz": null}
+
+        {"a": true, "b": false, "c": null}
+
+        {"a": "", "": "a"}
+
+        {"a": "b", "object": {"nested": "hello", "foo": 23}, "b": {}, "c": {"foo": null }}
+
+        {"a": ["", "foo", ["bar", "c"]], "b": {"1": []}, "c": {"2": [1, 2, 3]} }
+        "#;
+        let mut decoder = TapeDecoder::new(16, 2);
+        decoder.decode(a.as_bytes()).unwrap();
+
+        let finished = decoder.finish().unwrap();
+        assert_eq!(
+            finished.elements,
+            &[
+                TapeElement::Null,
+                TapeElement::StartObject(8), // {"hello": "world", "foo": 2, "bar": 45}
+                TapeElement::String(0),      // "hello"
+                TapeElement::String(1),      // "world"
+                TapeElement::String(2),      // "foo"
+                TapeElement::Number(3),      // 2
+                TapeElement::String(4),      // "bar"
+                TapeElement::Number(5),      // 45
+                TapeElement::EndObject(1),
+                TapeElement::StartObject(12), // {"foo": "bar"}
+                TapeElement::String(6),       // "foo"
+                TapeElement::String(7),       // "bar"
+                TapeElement::EndObject(9),
+                TapeElement::StartObject(16), // {"fiz": null}
+                TapeElement::String(8),       // "fiz
+                TapeElement::Null,            // null
+                TapeElement::EndObject(13),
+                TapeElement::StartObject(24), // {"a": true, "b": false, "c": null}
+                TapeElement::String(9),       // "a"
+                TapeElement::True,            // true
+                TapeElement::String(10),      // "b"
+                TapeElement::False,           // false
+                TapeElement::String(11),      // "c"
+                TapeElement::Null,            // null
+                TapeElement::EndObject(17),
+                TapeElement::StartObject(30), // {"a": "", "": "a"}
+                TapeElement::String(12),      // "a"
+                TapeElement::String(13),      // ""
+                TapeElement::String(14),      // ""
+                TapeElement::String(15),      // "a"
+                TapeElement::EndObject(25),
+                TapeElement::StartObject(49), // {"a": "b", "object": {"nested": "hello", "foo": 23}, "b": {}, "c": {"foo": null }}
+                TapeElement::String(16),      // "a"
+                TapeElement::String(17),      // "b"
+                TapeElement::String(18),      // "object"
+                TapeElement::StartObject(40), // {"nested": "hello", "foo": 23}
+                TapeElement::String(19),      // "nested"
+                TapeElement::String(20),      // "hello"
+                TapeElement::String(21),      // "foo"
+                TapeElement::Number(22),      // 23
+                TapeElement::EndObject(35),
+                TapeElement::String(23),      // "b"
+                TapeElement::StartObject(43), // {}
+                TapeElement::EndObject(42),
+                TapeElement::String(24),      // "c"
+                TapeElement::StartObject(48), // {"foo": null }
+                TapeElement::String(25),      // "foo"
+                TapeElement::Null,            // null
+                TapeElement::EndObject(45),
+                TapeElement::EndObject(31),
+                TapeElement::StartObject(75), // {"a": ["", "foo", ["bar", "c"]], "b": {"1": []}, "c": {"2": [1, 2, 3]} }
+                TapeElement::String(26),      // "a"
+                TapeElement::StartList(59),   // ["", "foo", ["bar", "c"]]
+                TapeElement::String(27),      // ""
+                TapeElement::String(28),      // "foo"
+                TapeElement::StartList(58),   // ["bar", "c"]
+                TapeElement::String(29),      // "bar"
+                TapeElement::String(30),      // "c"
+                TapeElement::EndList(55),
+                TapeElement::EndList(52),
+                TapeElement::String(31),      // "b"
+                TapeElement::StartObject(65), // {"1": []}
+                TapeElement::String(32),      // "1"
+                TapeElement::StartList(64),   // []
+                TapeElement::EndList(63),
+                TapeElement::EndObject(61),
+                TapeElement::String(33),      // "c"
+                TapeElement::StartObject(74), // {"2": [1, 2, 3]}
+                TapeElement::String(34),      // "2"
+                TapeElement::StartList(73),   // [1, 2, 3]
+                TapeElement::Number(35),      // 1
+                TapeElement::Number(36),      // 2
+                TapeElement::Number(37),      // 3
+                TapeElement::EndList(69),
+                TapeElement::EndObject(67),
+                TapeElement::EndObject(50)
+            ]
+        );
+
+        assert_eq!(
+            finished.strings,
+            "helloworldfoo2bar45foobarfizabcaaabobjectnestedhellofoo23bcfooafoobarcb1c2123"
+        );
+        assert_eq!(
+            &finished.string_offsets,
+            &[
+                0, 5, 10, 13, 14, 17, 19, 22, 25, 28, 29, 30, 31, 32, 32, 32, 33, 34, 35,
+                41, 47, 52, 55, 57, 58, 59, 62, 63, 63, 66, 69, 70, 71, 72, 73, 74, 75,
+                76, 77
+            ]
+        )
+    }
+
+    #[test]
+    fn test_invalid() {
+        // Test invalid
+        let mut decoder = TapeDecoder::new(16, 2);
+        let err = decoder.decode(b"hello").unwrap_err().to_string();
+        assert_eq!(
+            err,
+            "Json error: Encountered unexpected 'h' whilst trimming leading whitespace"
+        );
+
+        let mut decoder = TapeDecoder::new(16, 2);
+        let err = decoder.decode(b"{\"hello\": }").unwrap_err().to_string();
+        assert_eq!(
+            err,
+            "Json error: Encountered unexpected '}' whilst parsing value"
+        );
+
+        let mut decoder = TapeDecoder::new(16, 2);
+        let err = decoder
+            .decode(b"{\"hello\": [ false, tru ]}")
+            .unwrap_err()
+            .to_string();
+        assert_eq!(
+            err,
+            "Json error: Encountered unexpected ' ' whilst parsing literal"
+        );
+
+        let mut decoder = TapeDecoder::new(16, 2);
+        let err = decoder
+            .decode(b"{\"hello\": \"\\ud8\"}")
+            .unwrap_err()
+            .to_string();
+        assert_eq!(
+            err,
+            "Json error: Encountered unexpected '\"' whilst unicode escape"
+        );
+
+        // Missing surrogate pair
+        let mut decoder = TapeDecoder::new(16, 2);
+        let err = decoder
+            .decode(b"{\"hello\": \"\\ud83d\"}")
+            .unwrap_err()
+            .to_string();
+        assert_eq!(
+            err,
+            "Json error: Encountered unexpected '\"' whilst parsing surrogate pair escape"
+        );
+
+        // Test truncation
+        let mut decoder = TapeDecoder::new(16, 2);
+        decoder.decode(b"{\"he").unwrap();
+        let err = decoder.finish().unwrap_err().to_string();
+        assert_eq!(err, "Json error: Truncated record whilst reading string");
+
+        let mut decoder = TapeDecoder::new(16, 2);
+        decoder.decode(b"{\"hello\" : ").unwrap();
+        let err = decoder.finish().unwrap_err().to_string();
+        assert_eq!(err, "Json error: Truncated record whilst reading value");
+
+        let mut decoder = TapeDecoder::new(16, 2);
+        decoder.decode(b"{\"hello\" : [").unwrap();
+        let err = decoder.finish().unwrap_err().to_string();
+        assert_eq!(err, "Json error: Truncated record whilst reading list");
+
+        let mut decoder = TapeDecoder::new(16, 2);
+        decoder.decode(b"{\"hello\" : tru").unwrap();
+        let err = decoder.finish().unwrap_err().to_string();
+        assert_eq!(err, "Json error: Truncated record whilst reading true");
+
+        let mut decoder = TapeDecoder::new(16, 2);
+        decoder.decode(b"{\"hello\" : nu").unwrap();
+        let err = decoder.finish().unwrap_err().to_string();
+        assert_eq!(err, "Json error: Truncated record whilst reading null");
+
+        // Test invalid UTF-8
+        let mut decoder = TapeDecoder::new(16, 2);
+        decoder.decode(b"{\"hello\" : \"world\xFF\"}").unwrap();
+        let err = decoder.finish().unwrap_err().to_string();
+        assert_eq!(err, "Json error: Encountered non-UTF-8 data");
+
+        let mut decoder = TapeDecoder::new(16, 2);
+        decoder.decode(b"{\"\xe2\" : \"\x96\xa1\"}").unwrap();
+        let err = decoder.finish().unwrap_err().to_string();
+        assert_eq!(err, "Json error: Encountered truncated UTF-8 sequence");
+    }
+}
diff --git a/arrow-json/src/reader.rs b/arrow-json/src/reader.rs
index 64a1b5319..c2647ebfc 100644
--- a/arrow-json/src/reader.rs
+++ b/arrow-json/src/reader.rs
@@ -563,6 +563,9 @@ where
 /// converts them to [`RecordBatch`]es. To decode JSON formatted files,
 /// see [`Reader`].
 ///
+/// Note: Consider instead using [`RawDecoder`] which is faster and will
+/// eventually replace this implementation as part of [#3610]
+///
 /// # Examples
 /// ```
 /// use arrow_json::reader::{Decoder, DecoderOptions, ValueIter, infer_json_schema};
@@ -584,6 +587,9 @@ where
 /// assert_eq!(4, batch.num_rows());
 /// assert_eq!(4, batch.num_columns());
 /// ```
+///
+/// [`RawDecoder`]: crate::raw::RawDecoder
+/// [#3610]: https://github.com/apache/arrow-rs/issues/3610
 #[derive(Debug)]
 pub struct Decoder {
     /// Explicit schema for the JSON file
@@ -1607,6 +1613,12 @@ fn flatten_json_string_values(values: &[Value]) -> Vec<Option<String>> {
         .collect::<Vec<Option<_>>>()
 }
 /// JSON file reader
+///
+/// Note: Consider instead using [`RawReader`] which is faster and will
+/// eventually replace this implementation as part of [#3610]
+///
+/// [`RawReader`]: crate::raw::RawReader
+/// [#3610]: https://github.com/apache/arrow-rs/issues/3610
 #[derive(Debug)]
 pub struct Reader<R: Read> {
     reader: BufReader<R>,
@@ -1652,6 +1664,13 @@ impl<R: Read> Reader<R> {
 }
 
 /// JSON file reader builder
+///
+/// Note: Consider instead using [`RawReaderBuilder`] which is faster and will
+/// eventually replace this implementation as part of [#3610]
+///
+/// [`RawReaderBuilder`]: crate::raw::RawReaderBuilder
+/// [#3610]: https://github.com/apache/arrow-rs/issues/3610
+///
 #[derive(Debug, Default)]
 pub struct ReaderBuilder {
     /// Optional schema for the JSON file
diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index ee926ee52..bb67bfc40 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -207,7 +207,7 @@ required-features = ["test_utils", "csv"]
 [[bench]]
 name = "json_reader"
 harness = false
-required-features = ["json"]
+required-features = ["test_utils", "json"]
 
 [[bench]]
 name = "equal"
diff --git a/arrow/benches/json_reader.rs b/arrow/benches/json_reader.rs
index 7bc3f4179..b5d8a5367 100644
--- a/arrow/benches/json_reader.rs
+++ b/arrow/benches/json_reader.rs
@@ -18,18 +18,50 @@
 use criterion::*;
 
 use arrow::datatypes::*;
-use arrow_json::ReaderBuilder;
+use arrow::util::bench_util::{
+    create_primitive_array, create_string_array, create_string_array_with_len,
+};
+use arrow_array::RecordBatch;
+use arrow_json::RawReaderBuilder;
+use arrow_json::{LineDelimitedWriter, ReaderBuilder};
 use std::io::Cursor;
 use std::sync::Arc;
 
-fn json_primitive_to_record_batch() {
+fn do_bench(c: &mut Criterion, name: &str, json: &str, schema: SchemaRef) {
+    c.bench_function(&format!("{name} (basic)"), |b| {
+        b.iter(|| {
+            let cursor = Cursor::new(black_box(json));
+            let builder = ReaderBuilder::new()
+                .with_schema(schema.clone())
+                .with_batch_size(64);
+
+            let mut reader = builder.build(cursor).unwrap();
+            while let Some(next) = reader.next().transpose() {
+                next.unwrap();
+            }
+        })
+    });
+
+    c.bench_function(&format!("{name} (raw)"), |b| {
+        b.iter(|| {
+            let cursor = Cursor::new(black_box(json));
+            let builder = RawReaderBuilder::new(schema.clone()).with_batch_size(64);
+            let reader = builder.build(cursor).unwrap();
+            for next in reader {
+                next.unwrap();
+            }
+        })
+    });
+}
+
+fn small_bench_primitive(c: &mut Criterion) {
     let schema = Arc::new(Schema::new(vec![
         Field::new("c1", DataType::Utf8, true),
         Field::new("c2", DataType::Float64, true),
         Field::new("c3", DataType::UInt32, true),
         Field::new("c4", DataType::Boolean, true),
     ]));
-    let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
+
     let json_content = r#"
         {"c1": "eleven", "c2": 6.2222222225, "c3": 5.0, "c4": false}
         {"c1": "twelve", "c2": -55555555555555.2, "c3": 3}
@@ -42,15 +74,45 @@ fn json_primitive_to_record_batch() {
         {"c2": -35, "c3": 100.0, "c4": true}
         {"c1": "fifteen", "c2": null, "c4": true}
         "#;
-    let cursor = Cursor::new(json_content);
-    let mut reader = builder.build(cursor).unwrap();
-    #[allow(clippy::unit_arg)]
-    criterion::black_box({
-        reader.next().unwrap();
-    });
+
+    do_bench(c, "small_bench_primitive", json_content, schema)
 }
 
-fn json_list_primitive_to_record_batch() {
+fn large_bench_primitive(c: &mut Criterion) {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Utf8, true),
+        Field::new("c2", DataType::Int32, true),
+        Field::new("c3", DataType::UInt32, true),
+        Field::new("c4", DataType::Utf8, true),
+        Field::new("c5", DataType::Utf8, true),
+        Field::new("c6", DataType::Float32, true),
+    ]));
+
+    let c1 = Arc::new(create_string_array::<i32>(4096, 0.));
+    let c2 = Arc::new(create_primitive_array::<Int32Type>(4096, 0.));
+    let c3 = Arc::new(create_primitive_array::<UInt32Type>(4096, 0.));
+    let c4 = Arc::new(create_string_array_with_len::<i32>(4096, 0.2, 10));
+    let c5 = Arc::new(create_string_array_with_len::<i32>(4096, 0.2, 20));
+    let c6 = Arc::new(create_primitive_array::<Float32Type>(4096, 0.2));
+
+    let batch = RecordBatch::try_from_iter([
+        ("c1", c1 as _),
+        ("c2", c2 as _),
+        ("c3", c3 as _),
+        ("c4", c4 as _),
+        ("c5", c5 as _),
+        ("c6", c6 as _),
+    ])
+    .unwrap();
+
+    let mut out = Vec::with_capacity(1024);
+    LineDelimitedWriter::new(&mut out).write(batch).unwrap();
+
+    let json = std::str::from_utf8(&out).unwrap();
+    do_bench(c, "large_bench_primitive", json, schema)
+}
+
+fn small_bench_list(c: &mut Criterion) {
     let schema = Arc::new(Schema::new(vec![
         Field::new(
             "c1",
@@ -73,8 +135,7 @@ fn json_list_primitive_to_record_batch() {
             true,
         ),
     ]));
-    let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
-    let json_content = r#"
+    let json = r#"
         {"c1": ["eleven"], "c2": [6.2222222225, -3.2, null], "c3": [5.0, 6], "c4": [false, true]}
         {"c1": ["twelve"], "c2": [-55555555555555.2, 12500000.0], "c3": [3, 4, 5]}
         {"c1": null, "c2": [3], "c3": [125, 127, 129], "c4": [null, false, true]}
@@ -88,21 +149,13 @@ fn json_list_primitive_to_record_batch() {
         {"c1": ["fifteen"], "c2": [null, 2.1, 1.5, -3], "c4": [true, false, null]}
         {"c1": ["fifteen"], "c2": [], "c4": [true, false, null]}
         "#;
-    let cursor = Cursor::new(json_content);
-    let mut reader = builder.build(cursor).unwrap();
-    #[allow(clippy::unit_arg)]
-    criterion::black_box({
-        reader.next().unwrap();
-    });
+    do_bench(c, "small_bench_list", json, schema)
 }
 
 fn criterion_benchmark(c: &mut Criterion) {
-    c.bench_function("json_primitive_to_record_batch", |b| {
-        b.iter(json_primitive_to_record_batch)
-    });
-    c.bench_function("json_list_primitive_to_record_batch", |b| {
-        b.iter(json_list_primitive_to_record_batch)
-    });
+    small_bench_primitive(c);
+    large_bench_primitive(c);
+    small_bench_list(c);
 }
 
 criterion_group!(benches, criterion_benchmark);