You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/04/12 04:09:01 UTC

[arrow-rs] branch master updated: Remove old JSON Reader and Decoder (#3610) (#4052)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ce332a3d Remove old JSON Reader and Decoder (#3610) (#4052)
6ce332a3d is described below

commit 6ce332a3d099aaa421676075d4ca8c4644666d14
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Apr 12 05:08:55 2023 +0100

    Remove old JSON Reader and Decoder (#3610) (#4052)
    
    * Remove old JSON Reader and Decoder (#3610)
    
    * More tests
    
    * Fix doc
    
    * Fix test
    
    * Fix bench
---
 arrow-json/src/lib.rs                             |   14 +-
 arrow-json/src/reader.rs                          | 3449 ---------------------
 arrow-json/src/{raw => reader}/boolean_array.rs   |    4 +-
 arrow-json/src/{raw => reader}/decimal_array.rs   |    4 +-
 arrow-json/src/{raw => reader}/list_array.rs      |    4 +-
 arrow-json/src/{raw => reader}/map_array.rs       |    4 +-
 arrow-json/src/{raw => reader}/mod.rs             |  687 +++-
 arrow-json/src/{raw => reader}/primitive_array.rs |    4 +-
 arrow-json/src/reader/schema.rs                   |  710 +++++
 arrow-json/src/{raw => reader}/serializer.rs      |    2 +-
 arrow-json/src/{raw => reader}/string_array.rs    |    4 +-
 arrow-json/src/{raw => reader}/struct_array.rs    |    4 +-
 arrow-json/src/{raw => reader}/tape.rs            |    2 +-
 arrow-json/src/{raw => reader}/timestamp_array.rs |    4 +-
 arrow-json/src/writer.rs                          |   35 +-
 arrow/benches/json_reader.rs                      |   21 +-
 arrow/src/lib.rs                                  |    6 +-
 parquet/src/arrow/arrow_writer/levels.rs          |    2 +-
 parquet/src/arrow/arrow_writer/mod.rs             |    2 +-
 19 files changed, 1341 insertions(+), 3621 deletions(-)

diff --git a/arrow-json/src/lib.rs b/arrow-json/src/lib.rs
index 5998bc3a4..88415ff2e 100644
--- a/arrow-json/src/lib.rs
+++ b/arrow-json/src/lib.rs
@@ -25,10 +25,18 @@
 pub mod reader;
 pub mod writer;
 
-mod raw;
+#[doc(hidden)]
+#[deprecated(note = "Use Decoder")]
+pub type RawDecoder = reader::Decoder;
+
+#[doc(hidden)]
+#[deprecated(note = "Use Reader")]
+pub type RawReader<R> = Reader<R>;
+
+#[doc(hidden)]
+#[deprecated(note = "Use ReaderBuilder")]
+pub type RawReaderBuilder = ReaderBuilder;
 
-pub use self::raw::{RawDecoder, RawReader, RawReaderBuilder};
-#[allow(deprecated)]
 pub use self::reader::{Reader, ReaderBuilder};
 pub use self::writer::{ArrayWriter, LineDelimitedWriter, Writer};
 use half::f16;
diff --git a/arrow-json/src/reader.rs b/arrow-json/src/reader.rs
deleted file mode 100644
index d343d3ed9..000000000
--- a/arrow-json/src/reader.rs
+++ /dev/null
@@ -1,3449 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! # JSON Reader
-//!
-//! This JSON reader allows JSON line-delimited files to be read into the Arrow memory
-//! model. Records are loaded in batches and are then converted from row-based data to
-//! columnar data.
-//!
-//! Example:
-//!
-//! ```
-//! # use arrow_schema::*;
-//! # use std::fs::File;
-//! # use std::io::BufReader;
-//! # use std::sync::Arc;
-//!
-//! let schema = Schema::new(vec![
-//!     Field::new("a", DataType::Float64, false),
-//!     Field::new("b", DataType::Float64, false),
-//!     Field::new("c", DataType::Float64, true),
-//! ]);
-//!
-//! let file = File::open("test/data/basic.json").unwrap();
-//!
-//! let mut json = arrow_json::Reader::new(
-//!    BufReader::new(file),
-//!    Arc::new(schema),
-//!    arrow_json::reader::DecoderOptions::new(),
-//! );
-//!
-//! let batch = json.next().unwrap().unwrap();
-//! ```
-
-use std::borrow::Borrow;
-use std::io::{BufRead, BufReader, Read, Seek};
-use std::sync::Arc;
-
-use indexmap::map::IndexMap as HashMap;
-use indexmap::set::IndexSet as HashSet;
-use serde_json::json;
-use serde_json::{map::Map as JsonMap, Value};
-
-use arrow_array::builder::*;
-use arrow_array::types::*;
-use arrow_array::*;
-use arrow_buffer::{bit_util, i256, Buffer, MutableBuffer};
-use arrow_cast::parse::{parse_decimal, Parser};
-use arrow_data::{ArrayData, ArrayDataBuilder};
-use arrow_schema::*;
-
-#[derive(Debug, Clone)]
-enum InferredType {
-    Scalar(HashSet<DataType>),
-    Array(Box<InferredType>),
-    Object(HashMap<String, InferredType>),
-    Any,
-}
-
-impl InferredType {
-    fn merge(&mut self, other: InferredType) -> Result<(), ArrowError> {
-        match (self, other) {
-            (InferredType::Array(s), InferredType::Array(o)) => {
-                s.merge(*o)?;
-            }
-            (InferredType::Scalar(self_hs), InferredType::Scalar(other_hs)) => {
-                other_hs.into_iter().for_each(|v| {
-                    self_hs.insert(v);
-                });
-            }
-            (InferredType::Object(self_map), InferredType::Object(other_map)) => {
-                for (k, v) in other_map {
-                    self_map.entry(k).or_insert(InferredType::Any).merge(v)?;
-                }
-            }
-            (s @ InferredType::Any, v) => {
-                *s = v;
-            }
-            (_, InferredType::Any) => {}
-            // convert a scalar type to a single-item scalar array type.
-            (
-                InferredType::Array(self_inner_type),
-                other_scalar @ InferredType::Scalar(_),
-            ) => {
-                self_inner_type.merge(other_scalar)?;
-            }
-            (s @ InferredType::Scalar(_), InferredType::Array(mut other_inner_type)) => {
-                other_inner_type.merge(s.clone())?;
-                *s = InferredType::Array(other_inner_type);
-            }
-            // incompatible types
-            (s, o) => {
-                return Err(ArrowError::JsonError(format!(
-                    "Incompatible type found during schema inference: {s:?} v.s. {o:?}",
-                )));
-            }
-        }
-
-        Ok(())
-    }
-}
-
-/// Coerce data type during inference
-///
-/// * `Int64` and `Float64` should be `Float64`
-/// * Lists and scalars are coerced to a list of a compatible scalar
-/// * All other types are coerced to `Utf8`
-fn coerce_data_type(dt: Vec<&DataType>) -> DataType {
-    let mut dt_iter = dt.into_iter().cloned();
-    let dt_init = dt_iter.next().unwrap_or(DataType::Utf8);
-
-    dt_iter.fold(dt_init, |l, r| match (l, r) {
-        (DataType::Boolean, DataType::Boolean) => DataType::Boolean,
-        (DataType::Int64, DataType::Int64) => DataType::Int64,
-        (DataType::Float64, DataType::Float64)
-        | (DataType::Float64, DataType::Int64)
-        | (DataType::Int64, DataType::Float64) => DataType::Float64,
-        (DataType::List(l), DataType::List(r)) => DataType::List(Arc::new(Field::new(
-            "item",
-            coerce_data_type(vec![l.data_type(), r.data_type()]),
-            true,
-        ))),
-        // coerce scalar and scalar array into scalar array
-        (DataType::List(e), not_list) | (not_list, DataType::List(e)) => {
-            DataType::List(Arc::new(Field::new(
-                "item",
-                coerce_data_type(vec![e.data_type(), &not_list]),
-                true,
-            )))
-        }
-        _ => DataType::Utf8,
-    })
-}
-
-fn generate_datatype(t: &InferredType) -> Result<DataType, ArrowError> {
-    Ok(match t {
-        InferredType::Scalar(hs) => coerce_data_type(hs.iter().collect()),
-        InferredType::Object(spec) => DataType::Struct(generate_fields(spec)?),
-        InferredType::Array(ele_type) => DataType::List(Arc::new(Field::new(
-            "item",
-            generate_datatype(ele_type)?,
-            true,
-        ))),
-        InferredType::Any => DataType::Null,
-    })
-}
-
-fn generate_fields(spec: &HashMap<String, InferredType>) -> Result<Fields, ArrowError> {
-    spec.iter()
-        .map(|(k, types)| Ok(Field::new(k, generate_datatype(types)?, true)))
-        .collect()
-}
-
-/// Generate schema from JSON field names and inferred data types
-fn generate_schema(spec: HashMap<String, InferredType>) -> Result<Schema, ArrowError> {
-    Ok(Schema::new(generate_fields(&spec)?))
-}
-
-/// JSON file reader that produces a serde_json::Value iterator from a Read trait
-///
-/// # Example
-///
-/// ```
-/// use std::fs::File;
-/// use std::io::BufReader;
-/// use arrow_json::reader::ValueIter;
-///
-/// let mut reader =
-///     BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
-/// let mut value_reader = ValueIter::new(&mut reader, None);
-/// for value in value_reader {
-///     println!("JSON value: {}", value.unwrap());
-/// }
-/// ```
-#[derive(Debug)]
-pub struct ValueIter<'a, R: BufRead> {
-    reader: &'a mut R,
-    max_read_records: Option<usize>,
-    record_count: usize,
-    // reuse line buffer to avoid allocation on each record
-    line_buf: String,
-}
-
-impl<'a, R: BufRead> ValueIter<'a, R> {
-    /// Creates a new `ValueIter`
-    pub fn new(reader: &'a mut R, max_read_records: Option<usize>) -> Self {
-        Self {
-            reader,
-            max_read_records,
-            record_count: 0,
-            line_buf: String::new(),
-        }
-    }
-}
-
-impl<'a, R: BufRead> Iterator for ValueIter<'a, R> {
-    type Item = Result<Value, ArrowError>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        if let Some(max) = self.max_read_records {
-            if self.record_count >= max {
-                return None;
-            }
-        }
-
-        loop {
-            self.line_buf.truncate(0);
-            match self.reader.read_line(&mut self.line_buf) {
-                Ok(0) => {
-                    // read_line returns 0 when stream reached EOF
-                    return None;
-                }
-                Err(e) => {
-                    return Some(Err(ArrowError::JsonError(format!(
-                        "Failed to read JSON record: {e}"
-                    ))));
-                }
-                _ => {
-                    let trimmed_s = self.line_buf.trim();
-                    if trimmed_s.is_empty() {
-                        // ignore empty lines
-                        continue;
-                    }
-
-                    self.record_count += 1;
-                    return Some(serde_json::from_str(trimmed_s).map_err(|e| {
-                        ArrowError::JsonError(format!("Not valid JSON: {e}"))
-                    }));
-                }
-            }
-        }
-    }
-}
-
-/// Infer the fields of a JSON file by reading the first n records of the file, with
-/// `max_read_records` controlling the maximum number of records to read.
-///
-/// If `max_read_records` is not set, the whole file is read to infer its field types.
-///
-/// Contrary to [`infer_json_schema`], this function will seek back to the start of the `reader`.
-/// That way, the `reader` can be used immediately afterwards to create a [`Reader`].
-///
-/// # Examples
-/// ```
-/// use std::fs::File;
-/// use std::io::BufReader;
-/// use arrow_json::reader::infer_json_schema_from_seekable;
-///
-/// let file = File::open("test/data/mixed_arrays.json").unwrap();
-/// // file's cursor's offset at 0
-/// let mut reader = BufReader::new(file);
-/// let inferred_schema = infer_json_schema_from_seekable(&mut reader, None).unwrap();
-/// // file's cursor's offset automatically set at 0
-/// ```
-pub fn infer_json_schema_from_seekable<R: Read + Seek>(
-    reader: &mut BufReader<R>,
-    max_read_records: Option<usize>,
-) -> Result<Schema, ArrowError> {
-    let schema = infer_json_schema(reader, max_read_records);
-    // return the reader seek back to the start
-    reader.rewind()?;
-
-    schema
-}
-
-/// Infer the fields of a JSON file by reading the first n records of the buffer, with
-/// `max_read_records` controlling the maximum number of records to read.
-///
-/// If `max_read_records` is not set, the whole file is read to infer its field types.
-///
-/// This function will not seek back to the start of the `reader`. The user has to manage the
-/// original file's cursor. This function is useful when the `reader`'s cursor is not available
-/// (does not implement [`Seek`]), such is the case for compressed streams decoders.
-///
-/// # Examples
-/// ```
-/// use std::fs::File;
-/// use std::io::{BufReader, SeekFrom, Seek};
-/// use flate2::read::GzDecoder;
-/// use arrow_json::reader::infer_json_schema;
-///
-/// let mut file = File::open("test/data/mixed_arrays.json.gz").unwrap();
-///
-/// // file's cursor's offset at 0
-/// let mut reader = BufReader::new(GzDecoder::new(&file));
-/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
-/// // cursor's offset at end of file
-///
-/// // seek back to start so that the original file is usable again
-/// file.seek(SeekFrom::Start(0)).unwrap();
-/// ```
-pub fn infer_json_schema<R: BufRead>(
-    reader: &mut R,
-    max_read_records: Option<usize>,
-) -> Result<Schema, ArrowError> {
-    infer_json_schema_from_iterator(ValueIter::new(reader, max_read_records))
-}
-
-fn set_object_scalar_field_type(
-    field_types: &mut HashMap<String, InferredType>,
-    key: &str,
-    ftype: DataType,
-) -> Result<(), ArrowError> {
-    if !field_types.contains_key(key) {
-        field_types.insert(key.to_string(), InferredType::Scalar(HashSet::new()));
-    }
-
-    match field_types.get_mut(key).unwrap() {
-        InferredType::Scalar(hs) => {
-            hs.insert(ftype);
-            Ok(())
-        }
-        // in case of column contains both scalar type and scalar array type, we convert type of
-        // this column to scalar array.
-        scalar_array @ InferredType::Array(_) => {
-            let mut hs = HashSet::new();
-            hs.insert(ftype);
-            scalar_array.merge(InferredType::Scalar(hs))?;
-            Ok(())
-        }
-        t => Err(ArrowError::JsonError(format!(
-            "Expected scalar or scalar array JSON type, found: {t:?}",
-        ))),
-    }
-}
-
-fn infer_scalar_array_type(array: &[Value]) -> Result<InferredType, ArrowError> {
-    let mut hs = HashSet::new();
-
-    for v in array {
-        match v {
-            Value::Null => {}
-            Value::Number(n) => {
-                if n.is_i64() {
-                    hs.insert(DataType::Int64);
-                } else {
-                    hs.insert(DataType::Float64);
-                }
-            }
-            Value::Bool(_) => {
-                hs.insert(DataType::Boolean);
-            }
-            Value::String(_) => {
-                hs.insert(DataType::Utf8);
-            }
-            Value::Array(_) | Value::Object(_) => {
-                return Err(ArrowError::JsonError(format!(
-                    "Expected scalar value for scalar array, got: {v:?}"
-                )));
-            }
-        }
-    }
-
-    Ok(InferredType::Scalar(hs))
-}
-
-fn infer_nested_array_type(array: &[Value]) -> Result<InferredType, ArrowError> {
-    let mut inner_ele_type = InferredType::Any;
-
-    for v in array {
-        match v {
-            Value::Array(inner_array) => {
-                inner_ele_type.merge(infer_array_element_type(inner_array)?)?;
-            }
-            x => {
-                return Err(ArrowError::JsonError(format!(
-                    "Got non array element in nested array: {x:?}"
-                )));
-            }
-        }
-    }
-
-    Ok(InferredType::Array(Box::new(inner_ele_type)))
-}
-
-fn infer_struct_array_type(array: &[Value]) -> Result<InferredType, ArrowError> {
-    let mut field_types = HashMap::new();
-
-    for v in array {
-        match v {
-            Value::Object(map) => {
-                collect_field_types_from_object(&mut field_types, map)?;
-            }
-            _ => {
-                return Err(ArrowError::JsonError(format!(
-                    "Expected struct value for struct array, got: {v:?}"
-                )));
-            }
-        }
-    }
-
-    Ok(InferredType::Object(field_types))
-}
-
-fn infer_array_element_type(array: &[Value]) -> Result<InferredType, ArrowError> {
-    match array.iter().take(1).next() {
-        None => Ok(InferredType::Any), // empty array, return any type that can be updated later
-        Some(a) => match a {
-            Value::Array(_) => infer_nested_array_type(array),
-            Value::Object(_) => infer_struct_array_type(array),
-            _ => infer_scalar_array_type(array),
-        },
-    }
-}
-
-fn collect_field_types_from_object(
-    field_types: &mut HashMap<String, InferredType>,
-    map: &JsonMap<String, Value>,
-) -> Result<(), ArrowError> {
-    for (k, v) in map {
-        match v {
-            Value::Array(array) => {
-                let ele_type = infer_array_element_type(array)?;
-
-                if !field_types.contains_key(k) {
-                    match ele_type {
-                        InferredType::Scalar(_) => {
-                            field_types.insert(
-                                k.to_string(),
-                                InferredType::Array(Box::new(InferredType::Scalar(
-                                    HashSet::new(),
-                                ))),
-                            );
-                        }
-                        InferredType::Object(_) => {
-                            field_types.insert(
-                                k.to_string(),
-                                InferredType::Array(Box::new(InferredType::Object(
-                                    HashMap::new(),
-                                ))),
-                            );
-                        }
-                        InferredType::Any | InferredType::Array(_) => {
-                            // set inner type to any for nested array as well
-                            // so it can be updated properly from subsequent type merges
-                            field_types.insert(
-                                k.to_string(),
-                                InferredType::Array(Box::new(InferredType::Any)),
-                            );
-                        }
-                    }
-                }
-
-                match field_types.get_mut(k).unwrap() {
-                    InferredType::Array(inner_type) => {
-                        inner_type.merge(ele_type)?;
-                    }
-                    // in case of column contains both scalar type and scalar array type, we
-                    // convert type of this column to scalar array.
-                    field_type @ InferredType::Scalar(_) => {
-                        field_type.merge(ele_type)?;
-                        *field_type = InferredType::Array(Box::new(field_type.clone()));
-                    }
-                    t => {
-                        return Err(ArrowError::JsonError(format!(
-                            "Expected array json type, found: {t:?}",
-                        )));
-                    }
-                }
-            }
-            Value::Bool(_) => {
-                set_object_scalar_field_type(field_types, k, DataType::Boolean)?;
-            }
-            Value::Null => {
-                // do nothing, we treat json as nullable by default when
-                // inferring
-            }
-            Value::Number(n) => {
-                if n.is_f64() {
-                    set_object_scalar_field_type(field_types, k, DataType::Float64)?;
-                } else {
-                    // default to i64
-                    set_object_scalar_field_type(field_types, k, DataType::Int64)?;
-                }
-            }
-            Value::String(_) => {
-                set_object_scalar_field_type(field_types, k, DataType::Utf8)?;
-            }
-            Value::Object(inner_map) => {
-                if !field_types.contains_key(k) {
-                    field_types
-                        .insert(k.to_string(), InferredType::Object(HashMap::new()));
-                }
-                match field_types.get_mut(k).unwrap() {
-                    InferredType::Object(inner_field_types) => {
-                        collect_field_types_from_object(inner_field_types, inner_map)?;
-                    }
-                    t => {
-                        return Err(ArrowError::JsonError(format!(
-                            "Expected object json type, found: {t:?}",
-                        )));
-                    }
-                }
-            }
-        }
-    }
-
-    Ok(())
-}
-
-/// Infer the fields of a JSON file by reading all items from the JSON Value Iterator.
-///
-/// The following type coercion logic is implemented:
-/// * `Int64` and `Float64` are converted to `Float64`
-/// * Lists and scalars are coerced to a list of a compatible scalar
-/// * All other cases are coerced to `Utf8` (String)
-///
-/// Note that the above coercion logic is different from what Spark has, where it would default to
-/// String type in case of List and Scalar values appeared in the same field.
-///
-/// The reason we diverge here is because we don't have utilities to deal with JSON data once it's
-/// interpreted as Strings. We should match Spark's behavior once we added more JSON parsing
-/// kernels in the future.
-pub fn infer_json_schema_from_iterator<I, V>(value_iter: I) -> Result<Schema, ArrowError>
-where
-    I: Iterator<Item = Result<V, ArrowError>>,
-    V: Borrow<Value>,
-{
-    let mut field_types: HashMap<String, InferredType> = HashMap::new();
-
-    for record in value_iter {
-        match record?.borrow() {
-            Value::Object(map) => {
-                collect_field_types_from_object(&mut field_types, map)?;
-            }
-            value => {
-                return Err(ArrowError::JsonError(format!(
-                    "Expected JSON record to be an object, found {value:?}"
-                )));
-            }
-        };
-    }
-
-    generate_schema(field_types)
-}
-
-/// JSON values to Arrow record batch decoder.
-///
-/// A [`Decoder`] decodes arbitrary streams of [`serde_json::Value`]s and
-/// converts them to [`RecordBatch`]es. To decode JSON formatted files,
-/// see [`Reader`].
-///
-/// Note: Consider instead using [`RawDecoder`] which is faster and will
-/// eventually replace this implementation as part of [#3610]
-///
-/// # Examples
-/// ```
-/// use arrow_json::reader::{Decoder, DecoderOptions, ValueIter, infer_json_schema};
-/// use std::fs::File;
-/// use std::io::{BufReader, Seek, SeekFrom};
-/// use std::sync::Arc;
-///
-/// let mut reader =
-///     BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
-/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
-/// let options = DecoderOptions::new()
-///     .with_batch_size(1024);
-/// let decoder = Decoder::new(Arc::new(inferred_schema), options);
-///
-/// // seek back to start so that the original file is usable again
-/// reader.seek(SeekFrom::Start(0)).unwrap();
-/// let mut value_reader = ValueIter::new(&mut reader, None);
-/// let batch = decoder.next_batch(&mut value_reader).unwrap().unwrap();
-/// assert_eq!(4, batch.num_rows());
-/// assert_eq!(4, batch.num_columns());
-/// ```
-///
-/// [`RawDecoder`]: crate::raw::RawDecoder
-/// [#3610]: https://github.com/apache/arrow-rs/issues/3610
-#[derive(Debug)]
-#[deprecated(note = "Use RawDecoder instead")]
-pub struct Decoder {
-    /// Explicit schema for the JSON file
-    schema: SchemaRef,
-    /// This is a collection of options for json decoder
-    options: DecoderOptions,
-}
-
-#[derive(Debug, Clone, PartialEq, Eq)]
-/// Options for JSON decoding
-pub struct DecoderOptions {
-    /// Batch size (number of records to load each time), defaults to 1024 records
-    batch_size: usize,
-    /// Optional projection for which columns to load (case-sensitive names)
-    projection: Option<Vec<String>>,
-    /// optional HashMap of column name to its format string
-    format_strings: Option<HashMap<String, String>>,
-}
-
-impl Default for DecoderOptions {
-    fn default() -> Self {
-        Self {
-            batch_size: 1024,
-            projection: None,
-            format_strings: None,
-        }
-    }
-}
-
-impl DecoderOptions {
-    /// Creates a new `DecoderOptions`
-    pub fn new() -> Self {
-        Default::default()
-    }
-
-    /// Set the batch size (number of records to load at one time)
-    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
-        self.batch_size = batch_size;
-        self
-    }
-
-    /// Set the reader's column projection
-    pub fn with_projection(mut self, projection: Vec<String>) -> Self {
-        self.projection = Some(projection);
-        self
-    }
-
-    /// Set the decoder's format Strings param
-    pub fn with_format_strings(
-        mut self,
-        format_strings: HashMap<String, String>,
-    ) -> Self {
-        self.format_strings = Some(format_strings);
-        self
-    }
-}
-
-#[allow(deprecated)]
-impl Decoder {
-    /// Create a new JSON decoder from some value that implements an
-    /// iterator over [`serde_json::Value`]s (aka implements the
-    /// `Iterator<Item=Result<Value>>` trait).
-    pub fn new(schema: SchemaRef, options: DecoderOptions) -> Self {
-        Self { schema, options }
-    }
-
-    /// Returns the schema of the reader, useful for getting the schema without reading
-    /// record batches
-    pub fn schema(&self) -> SchemaRef {
-        match &self.options.projection {
-            Some(projection) => {
-                let fields = self.schema.fields();
-                let projected_fields: Fields = fields
-                    .iter()
-                    .filter_map(|field| {
-                        if projection.contains(field.name()) {
-                            Some(field.clone())
-                        } else {
-                            None
-                        }
-                    })
-                    .collect();
-
-                Arc::new(Schema::new(projected_fields))
-            }
-            None => self.schema.clone(),
-        }
-    }
-
-    /// Read the next batch of [`serde_json::Value`] records from the
-    /// iterator into a [`RecordBatch`].
-    ///
-    /// Returns `None` if the input iterator is exhausted.
-    pub fn next_batch<I>(
-        &self,
-        value_iter: &mut I,
-    ) -> Result<Option<RecordBatch>, ArrowError>
-    where
-        I: Iterator<Item = Result<Value, ArrowError>>,
-    {
-        let batch_size = self.options.batch_size;
-        let mut rows: Vec<Value> = Vec::with_capacity(batch_size);
-
-        for value in value_iter.by_ref().take(batch_size) {
-            let v = value?;
-            match v {
-                Value::Object(_) => rows.push(v),
-                _ => {
-                    return Err(ArrowError::JsonError(format!(
-                        "Row needs to be of type object, got: {v:?}"
-                    )));
-                }
-            }
-        }
-        if rows.is_empty() {
-            // reached end of file
-            return Ok(None);
-        }
-
-        let rows = &rows[..];
-
-        let arrays =
-            self.build_struct_array(rows, self.schema.fields(), &self.options.projection);
-
-        let projected_fields: Fields = match self.options.projection.as_ref() {
-            Some(projection) => projection
-                .iter()
-                .filter_map(|name| Some(self.schema.fields.find(name)?.1.clone()))
-                .collect(),
-            None => self.schema.fields.clone(),
-        };
-        let projected_schema = Arc::new(Schema::new(projected_fields));
-
-        arrays.and_then(|arr| {
-            RecordBatch::try_new_with_options(
-                projected_schema,
-                arr,
-                &RecordBatchOptions::new()
-                    .with_match_field_names(true)
-                    .with_row_count(Some(rows.len())),
-            )
-            .map(Some)
-        })
-    }
-
-    fn build_wrapped_list_array(
-        &self,
-        rows: &[Value],
-        col_name: &str,
-        key_type: &DataType,
-    ) -> Result<ArrayRef, ArrowError> {
-        match *key_type {
-            DataType::Int8 => {
-                let dtype = DataType::Dictionary(
-                    Box::new(DataType::Int8),
-                    Box::new(DataType::Utf8),
-                );
-                self.list_array_string_array_builder::<Int8Type>(&dtype, col_name, rows)
-            }
-            DataType::Int16 => {
-                let dtype = DataType::Dictionary(
-                    Box::new(DataType::Int16),
-                    Box::new(DataType::Utf8),
-                );
-                self.list_array_string_array_builder::<Int16Type>(&dtype, col_name, rows)
-            }
-            DataType::Int32 => {
-                let dtype = DataType::Dictionary(
-                    Box::new(DataType::Int32),
-                    Box::new(DataType::Utf8),
-                );
-                self.list_array_string_array_builder::<Int32Type>(&dtype, col_name, rows)
-            }
-            DataType::Int64 => {
-                let dtype = DataType::Dictionary(
-                    Box::new(DataType::Int64),
-                    Box::new(DataType::Utf8),
-                );
-                self.list_array_string_array_builder::<Int64Type>(&dtype, col_name, rows)
-            }
-            DataType::UInt8 => {
-                let dtype = DataType::Dictionary(
-                    Box::new(DataType::UInt8),
-                    Box::new(DataType::Utf8),
-                );
-                self.list_array_string_array_builder::<UInt8Type>(&dtype, col_name, rows)
-            }
-            DataType::UInt16 => {
-                let dtype = DataType::Dictionary(
-                    Box::new(DataType::UInt16),
-                    Box::new(DataType::Utf8),
-                );
-                self.list_array_string_array_builder::<UInt16Type>(&dtype, col_name, rows)
-            }
-            DataType::UInt32 => {
-                let dtype = DataType::Dictionary(
-                    Box::new(DataType::UInt32),
-                    Box::new(DataType::Utf8),
-                );
-                self.list_array_string_array_builder::<UInt32Type>(&dtype, col_name, rows)
-            }
-            DataType::UInt64 => {
-                let dtype = DataType::Dictionary(
-                    Box::new(DataType::UInt64),
-                    Box::new(DataType::Utf8),
-                );
-                self.list_array_string_array_builder::<UInt64Type>(&dtype, col_name, rows)
-            }
-            ref e => Err(ArrowError::JsonError(format!(
-                "Data type is currently not supported for dictionaries in list : {e:?}"
-            ))),
-        }
-    }
-
-    #[inline(always)]
-    fn list_array_string_array_builder<DT>(
-        &self,
-        data_type: &DataType,
-        col_name: &str,
-        rows: &[Value],
-    ) -> Result<ArrayRef, ArrowError>
-    where
-        DT: ArrowPrimitiveType + ArrowDictionaryKeyType,
-    {
-        let mut builder: Box<dyn ArrayBuilder> = match data_type {
-            DataType::Utf8 => {
-                let values_builder =
-                    StringBuilder::with_capacity(rows.len(), rows.len() * 5);
-                Box::new(ListBuilder::new(values_builder))
-            }
-            DataType::Dictionary(_, _) => {
-                let values_builder =
-                    self.build_string_dictionary_builder::<DT>(rows.len() * 5);
-                Box::new(ListBuilder::new(values_builder))
-            }
-            e => {
-                return Err(ArrowError::JsonError(format!(
-                    "Nested list data builder type is not supported: {e:?}"
-                )))
-            }
-        };
-
-        for row in rows {
-            if let Some(value) = row.get(col_name) {
-                // value can be an array or a scalar
-                let vals: Vec<Option<String>> = if let Value::String(v) = value {
-                    vec![Some(v.to_string())]
-                } else if let Value::Array(n) = value {
-                    n.iter()
-                        .map(|v: &Value| {
-                            if v.is_string() {
-                                Some(v.as_str().unwrap().to_string())
-                            } else if v.is_array() || v.is_object() || v.is_null() {
-                                // implicitly drop nested values
-                                // TODO support deep-nesting
-                                None
-                            } else {
-                                Some(v.to_string())
-                            }
-                        })
-                        .collect()
-                } else if let Value::Null = value {
-                    vec![None]
-                } else if !value.is_object() {
-                    vec![Some(value.to_string())]
-                } else {
-                    return Err(ArrowError::JsonError(
-                        "Only scalars are currently supported in JSON arrays".to_string(),
-                    ));
-                };
-
-                // TODO: ARROW-10335: APIs of dictionary arrays and others are different. Unify
-                // them.
-                match data_type {
-                    DataType::Utf8 => {
-                        let builder = builder
-                            .as_any_mut()
-                            .downcast_mut::<ListBuilder<StringBuilder>>()
-                            .ok_or_else(||ArrowError::JsonError(
-                                "Cast failed for ListBuilder<StringBuilder> during nested data parsing".to_string(),
-                            ))?;
-                        for val in vals {
-                            if let Some(v) = val {
-                                builder.values().append_value(&v);
-                            } else {
-                                builder.values().append_null();
-                            };
-                        }
-
-                        // Append to the list
-                        builder.append(true);
-                    }
-                    DataType::Dictionary(_, _) => {
-                        let builder = builder.as_any_mut().downcast_mut::<ListBuilder<StringDictionaryBuilder<DT>>>().ok_or_else(||ArrowError::JsonError(
-                            "Cast failed for ListBuilder<StringDictionaryBuilder> during nested data parsing".to_string(),
-                        ))?;
-                        for val in vals {
-                            if let Some(v) = val {
-                                let _ = builder.values().append(&v);
-                            } else {
-                                builder.values().append_null();
-                            };
-                        }
-
-                        // Append to the list
-                        builder.append(true);
-                    }
-                    e => {
-                        return Err(ArrowError::JsonError(format!(
-                            "Nested list data builder type is not supported: {e:?}"
-                        )))
-                    }
-                }
-            }
-        }
-
-        Ok(builder.finish() as ArrayRef)
-    }
-
-    #[inline(always)]
-    fn build_string_dictionary_builder<T>(
-        &self,
-        row_len: usize,
-    ) -> StringDictionaryBuilder<T>
-    where
-        T: ArrowPrimitiveType + ArrowDictionaryKeyType,
-    {
-        StringDictionaryBuilder::with_capacity(row_len, row_len, row_len * 5)
-    }
-
-    #[inline(always)]
-    fn build_string_dictionary_array(
-        &self,
-        rows: &[Value],
-        col_name: &str,
-        key_type: &DataType,
-        value_type: &DataType,
-    ) -> Result<ArrayRef, ArrowError> {
-        if let DataType::Utf8 = *value_type {
-            match *key_type {
-                DataType::Int8 => self.build_dictionary_array::<Int8Type>(rows, col_name),
-                DataType::Int16 => {
-                    self.build_dictionary_array::<Int16Type>(rows, col_name)
-                }
-                DataType::Int32 => {
-                    self.build_dictionary_array::<Int32Type>(rows, col_name)
-                }
-                DataType::Int64 => {
-                    self.build_dictionary_array::<Int64Type>(rows, col_name)
-                }
-                DataType::UInt8 => {
-                    self.build_dictionary_array::<UInt8Type>(rows, col_name)
-                }
-                DataType::UInt16 => {
-                    self.build_dictionary_array::<UInt16Type>(rows, col_name)
-                }
-                DataType::UInt32 => {
-                    self.build_dictionary_array::<UInt32Type>(rows, col_name)
-                }
-                DataType::UInt64 => {
-                    self.build_dictionary_array::<UInt64Type>(rows, col_name)
-                }
-                _ => Err(ArrowError::JsonError(
-                    "unsupported dictionary key type".to_string(),
-                )),
-            }
-        } else {
-            Err(ArrowError::JsonError(
-                "dictionary types other than UTF-8 not yet supported".to_string(),
-            ))
-        }
-    }
-
-    fn build_boolean_array(
-        &self,
-        rows: &[Value],
-        col_name: &str,
-    ) -> Result<ArrayRef, ArrowError> {
-        let mut builder = BooleanBuilder::with_capacity(rows.len());
-        for row in rows {
-            if let Some(value) = row.get(col_name) {
-                if let Some(boolean) = value.as_bool() {
-                    builder.append_value(boolean);
-                } else {
-                    builder.append_null();
-                }
-            } else {
-                builder.append_null();
-            }
-        }
-        Ok(Arc::new(builder.finish()))
-    }
-
-    fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
-        &self,
-        rows: &[Value],
-        col_name: &str,
-    ) -> Result<ArrayRef, ArrowError>
-    where
-        T: ArrowPrimitiveType,
-        T::Native: num::NumCast,
-    {
-        let format_string = self
-            .options
-            .format_strings
-            .as_ref()
-            .and_then(|fmts| fmts.get(col_name));
-        Ok(Arc::new(
-            rows.iter()
-                .map(|row| {
-                    row.get(col_name).and_then(|value| {
-                        if value.is_i64() {
-                            value.as_i64().and_then(num::cast::cast)
-                        } else if value.is_u64() {
-                            value.as_u64().and_then(num::cast::cast)
-                        } else if value.is_string() {
-                            match format_string {
-                                Some(fmt) => {
-                                    T::parse_formatted(value.as_str().unwrap(), fmt)
-                                }
-                                None => T::parse(value.as_str().unwrap()),
-                            }
-                        } else {
-                            value.as_f64().and_then(num::cast::cast)
-                        }
-                    })
-                })
-                .collect::<PrimitiveArray<T>>(),
-        ))
-    }
-
-    fn build_decimal128_array(
-        &self,
-        rows: &[Value],
-        col_name: &str,
-        precision: u8,
-        scale: i8,
-    ) -> Result<ArrayRef, ArrowError> {
-        Ok(Arc::new(
-            rows.iter()
-                .map(|row| {
-                    row.get(col_name).and_then(|value| {
-                        if value.is_i64() {
-                            let mul = 10i128.pow(scale as _);
-                            value
-                                .as_i64()
-                                .and_then(num::cast::cast)
-                                .map(|v: i128| v * mul)
-                        } else if value.is_u64() {
-                            let mul = 10i128.pow(scale as _);
-                            value
-                                .as_u64()
-                                .and_then(num::cast::cast)
-                                .map(|v: i128| v * mul)
-                        } else if value.is_string() {
-                            value.as_str().and_then(|s| {
-                                parse_decimal::<Decimal128Type>(s, precision, scale).ok()
-                            })
-                        } else {
-                            let mul = 10_f64.powi(scale as i32);
-                            value.as_f64().map(|f| (f * mul).round() as i128)
-                        }
-                    })
-                })
-                .collect::<Decimal128Array>()
-                .with_precision_and_scale(precision, scale)?,
-        ))
-    }
-
-    fn build_decimal256_array(
-        &self,
-        rows: &[Value],
-        col_name: &str,
-        precision: u8,
-        scale: i8,
-    ) -> Result<ArrayRef, ArrowError> {
-        let mul = 10_f64.powi(scale as i32);
-        Ok(Arc::new(
-            rows.iter()
-                .map(|row| {
-                    row.get(col_name).and_then(|value| {
-                        if value.is_i64() {
-                            let mul = i256::from_i128(10).pow_wrapping(scale as _);
-                            value.as_i64().map(|i| i256::from_i128(i as _) * mul)
-                        } else if value.is_u64() {
-                            let mul = i256::from_i128(10).pow_wrapping(scale as _);
-                            value.as_u64().map(|i| i256::from_i128(i as _) * mul)
-                        } else if value.is_string() {
-                            value.as_str().and_then(|s| {
-                                parse_decimal::<Decimal256Type>(s, precision, scale).ok()
-                            })
-                        } else {
-                            value.as_f64().and_then(|f| i256::from_f64(f * mul.round()))
-                        }
-                    })
-                })
-                .collect::<Decimal256Array>()
-                .with_precision_and_scale(precision, scale)?,
-        ))
-    }
-
-    /// Build a nested GenericListArray from a list of unnested `Value`s
-    fn build_nested_list_array<OffsetSize: OffsetSizeTrait>(
-        &self,
-        rows: &[Value],
-        list_field: &FieldRef,
-    ) -> Result<ArrayRef, ArrowError> {
-        // build list offsets
-        let mut cur_offset = OffsetSize::zero();
-        let list_len = rows.len();
-        let num_list_bytes = bit_util::ceil(list_len, 8);
-        let mut offsets = Vec::with_capacity(list_len + 1);
-        let mut list_nulls = MutableBuffer::from_len_zeroed(num_list_bytes);
-        let list_nulls = list_nulls.as_slice_mut();
-        offsets.push(cur_offset);
-        rows.iter().enumerate().for_each(|(i, v)| {
-            if let Value::Array(a) = v {
-                cur_offset += OffsetSize::from_usize(a.len()).unwrap();
-                bit_util::set_bit(list_nulls, i);
-            } else if let Value::Null = v {
-                // value is null, not incremented
-            } else {
-                cur_offset += OffsetSize::one();
-            }
-            offsets.push(cur_offset);
-        });
-        let valid_len = cur_offset.to_usize().unwrap();
-        let array_data = match list_field.data_type() {
-            DataType::Null => NullArray::new(valid_len).into_data(),
-            DataType::Boolean => {
-                let num_bytes = bit_util::ceil(valid_len, 8);
-                let mut bool_values = MutableBuffer::from_len_zeroed(num_bytes);
-                let mut bool_nulls =
-                    MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
-                let mut curr_index = 0;
-                rows.iter().for_each(|v| {
-                    if let Value::Array(vs) = v {
-                        vs.iter().for_each(|value| {
-                            if let Value::Bool(child) = value {
-                                // if valid boolean, append value
-                                if *child {
-                                    bit_util::set_bit(
-                                        bool_values.as_slice_mut(),
-                                        curr_index,
-                                    );
-                                }
-                            } else {
-                                // null slot
-                                bit_util::unset_bit(
-                                    bool_nulls.as_slice_mut(),
-                                    curr_index,
-                                );
-                            }
-                            curr_index += 1;
-                        });
-                    }
-                });
-                unsafe {
-                    ArrayData::builder(list_field.data_type().clone())
-                        .len(valid_len)
-                        .add_buffer(bool_values.into())
-                        .null_bit_buffer(Some(bool_nulls.into()))
-                        .build_unchecked()
-                }
-            }
-            DataType::Int8 => self.read_primitive_list_values::<Int8Type>(rows),
-            DataType::Int16 => self.read_primitive_list_values::<Int16Type>(rows),
-            DataType::Int32 => self.read_primitive_list_values::<Int32Type>(rows),
-            DataType::Int64 => self.read_primitive_list_values::<Int64Type>(rows),
-            DataType::UInt8 => self.read_primitive_list_values::<UInt8Type>(rows),
-            DataType::UInt16 => self.read_primitive_list_values::<UInt16Type>(rows),
-            DataType::UInt32 => self.read_primitive_list_values::<UInt32Type>(rows),
-            DataType::UInt64 => self.read_primitive_list_values::<UInt64Type>(rows),
-            DataType::Float16 => {
-                return Err(ArrowError::JsonError("Float16 not supported".to_string()))
-            }
-            DataType::Float32 => self.read_primitive_list_values::<Float32Type>(rows),
-            DataType::Float64 => self.read_primitive_list_values::<Float64Type>(rows),
-            DataType::Timestamp(_, _)
-            | DataType::Date32
-            | DataType::Date64
-            | DataType::Time32(_)
-            | DataType::Time64(_) => {
-                return Err(ArrowError::JsonError(
-                    "Temporal types are not yet supported, see ARROW-4803".to_string(),
-                ))
-            }
-            DataType::Utf8 => flatten_json_string_values(rows)
-                .into_iter()
-                .collect::<StringArray>()
-                .into_data(),
-            DataType::LargeUtf8 => flatten_json_string_values(rows)
-                .into_iter()
-                .collect::<LargeStringArray>()
-                .into_data(),
-            DataType::List(field) => {
-                let child = self
-                    .build_nested_list_array::<i32>(&flatten_json_values(rows), field)?;
-                child.into_data()
-            }
-            DataType::LargeList(field) => {
-                let child = self
-                    .build_nested_list_array::<i64>(&flatten_json_values(rows), field)?;
-                child.into_data()
-            }
-            DataType::Struct(fields) => {
-                // extract list values, with non-lists converted to Value::Null
-                let array_item_count = cur_offset.to_usize().unwrap();
-                let num_bytes = bit_util::ceil(array_item_count, 8);
-                let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes);
-                let mut struct_index = 0;
-                let rows: Vec<Value> = rows
-                    .iter()
-                    .flat_map(|row| match row {
-                        Value::Array(values) if !values.is_empty() => {
-                            values.iter().for_each(|value| {
-                                if !value.is_null() {
-                                    bit_util::set_bit(
-                                        null_buffer.as_slice_mut(),
-                                        struct_index,
-                                    );
-                                }
-                                struct_index += 1;
-                            });
-                            values.clone()
-                        }
-                        _ => {
-                            vec![]
-                        }
-                    })
-                    .collect();
-                let arrays = self.build_struct_array(rows.as_slice(), fields, &None)?;
-                let data_type = DataType::Struct(fields.clone());
-                let buf = null_buffer.into();
-                unsafe {
-                    ArrayDataBuilder::new(data_type)
-                        .len(rows.len())
-                        .null_bit_buffer(Some(buf))
-                        .child_data(arrays.into_iter().map(|a| a.into_data()).collect())
-                        .build_unchecked()
-                }
-            }
-            datatype => {
-                return Err(ArrowError::JsonError(format!(
-                    "Nested list of {datatype:?} not supported"
-                )));
-            }
-        };
-        // build list
-        let list_data = ArrayData::builder(DataType::List(list_field.clone()))
-            .len(list_len)
-            .add_buffer(Buffer::from_slice_ref(&offsets))
-            .add_child_data(array_data)
-            .null_bit_buffer(Some(list_nulls.into()));
-        let list_data = unsafe { list_data.build_unchecked() };
-        Ok(Arc::new(GenericListArray::<OffsetSize>::from(list_data)))
-    }
-
-    /// Builds the child values of a `StructArray`, falling short of constructing the StructArray.
-    /// The function does not construct the StructArray as some callers would want the child arrays.
-    ///
-    /// *Note*: The function is recursive, and will read nested structs.
-    ///
-    /// If `projection` is &None, then all values are returned. The first level of projection
-    /// occurs at the `RecordBatch` level. No further projection currently occurs, but would be
-    /// useful if plucking values from a struct, e.g. getting `a.b.c.e` from `a.b.c.{d, e}`.
-    fn build_struct_array(
-        &self,
-        rows: &[Value],
-        struct_fields: &Fields,
-        projection: &Option<Vec<String>>,
-    ) -> Result<Vec<ArrayRef>, ArrowError> {
-        let arrays: Result<Vec<ArrayRef>, ArrowError> = struct_fields
-            .iter()
-            .filter(|field| {
-                projection
-                    .as_ref()
-                    .map(|p| p.contains(field.name()))
-                    .unwrap_or(true)
-            })
-            .map(|field| {
-                match field.data_type() {
-                    DataType::Null => {
-                        Ok(Arc::new(NullArray::new(rows.len())) as ArrayRef)
-                    }
-                    DataType::Boolean => self.build_boolean_array(rows, field.name()),
-                    DataType::Float64 => {
-                        self.build_primitive_array::<Float64Type>(rows, field.name())
-                    }
-                    DataType::Float32 => {
-                        self.build_primitive_array::<Float32Type>(rows, field.name())
-                    }
-                    DataType::Int64 => {
-                        self.build_primitive_array::<Int64Type>(rows, field.name())
-                    }
-                    DataType::Int32 => {
-                        self.build_primitive_array::<Int32Type>(rows, field.name())
-                    }
-                    DataType::Int16 => {
-                        self.build_primitive_array::<Int16Type>(rows, field.name())
-                    }
-                    DataType::Int8 => {
-                        self.build_primitive_array::<Int8Type>(rows, field.name())
-                    }
-                    DataType::UInt64 => {
-                        self.build_primitive_array::<UInt64Type>(rows, field.name())
-                    }
-                    DataType::UInt32 => {
-                        self.build_primitive_array::<UInt32Type>(rows, field.name())
-                    }
-                    DataType::UInt16 => {
-                        self.build_primitive_array::<UInt16Type>(rows, field.name())
-                    }
-                    DataType::UInt8 => {
-                        self.build_primitive_array::<UInt8Type>(rows, field.name())
-                    }
-                    // TODO: this is incomplete
-                    DataType::Timestamp(unit, _) => match unit {
-                        TimeUnit::Second => self
-                            .build_primitive_array::<TimestampSecondType>(
-                                rows,
-                                field.name(),
-                            ),
-                        TimeUnit::Microsecond => self
-                            .build_primitive_array::<TimestampMicrosecondType>(
-                                rows,
-                                field.name(),
-                            ),
-                        TimeUnit::Millisecond => self
-                            .build_primitive_array::<TimestampMillisecondType>(
-                                rows,
-                                field.name(),
-                            ),
-                        TimeUnit::Nanosecond => self
-                            .build_primitive_array::<TimestampNanosecondType>(
-                                rows,
-                                field.name(),
-                            ),
-                    },
-                    DataType::Date64 => {
-                        self.build_primitive_array::<Date64Type>(rows, field.name())
-                    }
-                    DataType::Date32 => {
-                        self.build_primitive_array::<Date32Type>(rows, field.name())
-                    }
-                    DataType::Time64(unit) => match unit {
-                        TimeUnit::Microsecond => self
-                            .build_primitive_array::<Time64MicrosecondType>(
-                                rows,
-                                field.name(),
-                            ),
-                        TimeUnit::Nanosecond => self
-                            .build_primitive_array::<Time64NanosecondType>(
-                                rows,
-                                field.name(),
-                            ),
-                        t => Err(ArrowError::JsonError(format!(
-                            "TimeUnit {t:?} not supported with Time64"
-                        ))),
-                    },
-                    DataType::Time32(unit) => match unit {
-                        TimeUnit::Second => self
-                            .build_primitive_array::<Time32SecondType>(
-                                rows,
-                                field.name(),
-                            ),
-                        TimeUnit::Millisecond => self
-                            .build_primitive_array::<Time32MillisecondType>(
-                                rows,
-                                field.name(),
-                            ),
-                        t => Err(ArrowError::JsonError(format!(
-                            "TimeUnit {t:?} not supported with Time32"
-                        ))),
-                    },
-                    DataType::Utf8 => Ok(Arc::new(
-                        rows.iter()
-                            .map(|row| {
-                                let maybe_value = row.get(field.name());
-                                maybe_value.and_then(|value| value.as_str())
-                            })
-                            .collect::<StringArray>(),
-                    ) as ArrayRef),
-                    DataType::Binary => Ok(Arc::new(
-                        rows.iter()
-                            .map(|row| {
-                                let maybe_value = row.get(field.name());
-                                maybe_value.and_then(|value| value.as_str())
-                            })
-                            .collect::<BinaryArray>(),
-                    ) as ArrayRef),
-                    DataType::List(ref list_field) => {
-                        match list_field.data_type() {
-                            DataType::Dictionary(ref key_ty, _) => {
-                                self.build_wrapped_list_array(rows, field.name(), key_ty)
-                            }
-                            _ => {
-                                // extract rows by name
-                                let extracted_rows = rows
-                                    .iter()
-                                    .map(|row| {
-                                        row.get(field.name())
-                                            .cloned()
-                                            .unwrap_or(Value::Null)
-                                    })
-                                    .collect::<Vec<Value>>();
-                                self.build_nested_list_array::<i32>(
-                                    extracted_rows.as_slice(),
-                                    list_field,
-                                )
-                            }
-                        }
-                    }
-                    DataType::Dictionary(ref key_ty, ref val_ty) => self
-                        .build_string_dictionary_array(
-                            rows,
-                            field.name(),
-                            key_ty,
-                            val_ty,
-                        ),
-                    DataType::Struct(fields) => {
-                        let len = rows.len();
-                        let num_bytes = bit_util::ceil(len, 8);
-                        let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes);
-                        let struct_rows = rows
-                            .iter()
-                            .enumerate()
-                            .map(|(i, row)| {
-                                (i, row.as_object().and_then(|v| v.get(field.name())))
-                            })
-                            .map(|(i, v)| match v {
-                                // we want the field as an object, if it's not, we treat as null
-                                Some(Value::Object(value)) => {
-                                    bit_util::set_bit(null_buffer.as_slice_mut(), i);
-                                    Value::Object(value.clone())
-                                }
-                                _ => Value::Object(Default::default()),
-                            })
-                            .collect::<Vec<Value>>();
-                        let arrays =
-                            self.build_struct_array(&struct_rows, fields, &None)?;
-                        // construct a struct array's data in order to set null buffer
-                        let data_type = DataType::Struct(fields.clone());
-                        let data = ArrayDataBuilder::new(data_type)
-                            .len(len)
-                            .null_bit_buffer(Some(null_buffer.into()))
-                            .child_data(
-                                arrays.into_iter().map(|a| a.into_data()).collect(),
-                            );
-                        let data = unsafe { data.build_unchecked() };
-                        Ok(make_array(data))
-                    }
-                    DataType::Map(map_field, _) => self.build_map_array(
-                        rows,
-                        field.name(),
-                        field.data_type(),
-                        map_field,
-                    ),
-                    DataType::Decimal128(precision, scale) => self
-                        .build_decimal128_array(rows, field.name(), *precision, *scale),
-                    DataType::Decimal256(precision, scale) => self
-                        .build_decimal256_array(rows, field.name(), *precision, *scale),
-                    _ => Err(ArrowError::JsonError(format!(
-                        "{:?} type is not supported",
-                        field.data_type()
-                    ))),
-                }
-            })
-            .collect();
-        arrays
-    }
-
-    fn build_map_array(
-        &self,
-        rows: &[Value],
-        field_name: &str,
-        map_type: &DataType,
-        struct_field: &Field,
-    ) -> Result<ArrayRef, ArrowError> {
-        // A map has the format {"key": "value"} where key is most commonly a string,
-        // but could be a string, number or boolean (🤷🏾‍♂️) (e.g. {1: "value"}).
-        // A map is also represented as a flattened contiguous array, with the number
-        // of key-value pairs being separated by a list offset.
-        // If row 1 has 2 key-value pairs, and row 2 has 3, the offsets would be
-        // [0, 2, 5].
-        //
-        // Thus we try to read a map by iterating through the keys and values
-
-        let (key_field, value_field) =
-            if let DataType::Struct(fields) = struct_field.data_type() {
-                if fields.len() != 2 {
-                    return Err(ArrowError::InvalidArgumentError(format!(
-                        "DataType::Map expects a struct with 2 fields, found {} fields",
-                        fields.len()
-                    )));
-                }
-                (&fields[0], &fields[1])
-            } else {
-                return Err(ArrowError::InvalidArgumentError(format!(
-                    "JSON map array builder expects a DataType::Map, found {:?}",
-                    struct_field.data_type()
-                )));
-            };
-        let value_map_iter = rows.iter().map(|value| {
-            value
-                .get(field_name)
-                .and_then(|v| v.as_object().map(|map| (map, map.len() as i32)))
-        });
-        let rows_len = rows.len();
-        let mut list_offsets = Vec::with_capacity(rows_len + 1);
-        list_offsets.push(0i32);
-        let mut last_offset = 0;
-        let num_bytes = bit_util::ceil(rows_len, 8);
-        let mut list_bitmap = MutableBuffer::from_len_zeroed(num_bytes);
-        let null_data = list_bitmap.as_slice_mut();
-
-        let struct_rows = value_map_iter
-            .enumerate()
-            .filter_map(|(i, v)| match v {
-                Some((map, len)) => {
-                    list_offsets.push(last_offset + len);
-                    last_offset += len;
-                    bit_util::set_bit(null_data, i);
-                    Some(map.iter().map(|(k, v)| {
-                        json!({
-                            key_field.name(): k,
-                            value_field.name(): v
-                        })
-                    }))
-                }
-                None => {
-                    list_offsets.push(last_offset);
-                    None
-                }
-            })
-            .flatten()
-            .collect::<Vec<Value>>();
-
-        let struct_children = self.build_struct_array(
-            struct_rows.as_slice(),
-            &Fields::from([key_field.clone(), value_field.clone()]),
-            &None,
-        )?;
-
-        unsafe {
-            Ok(make_array(ArrayData::new_unchecked(
-                map_type.clone(),
-                rows_len,
-                None,
-                Some(list_bitmap.into()),
-                0,
-                vec![Buffer::from_slice_ref(&list_offsets)],
-                vec![ArrayData::new_unchecked(
-                    struct_field.data_type().clone(),
-                    struct_children[0].len(),
-                    None,
-                    None,
-                    0,
-                    vec![],
-                    struct_children
-                        .into_iter()
-                        .map(|array| array.into_data())
-                        .collect(),
-                )],
-            )))
-        }
-    }
-
-    #[inline(always)]
-    fn build_dictionary_array<T>(
-        &self,
-        rows: &[Value],
-        col_name: &str,
-    ) -> Result<ArrayRef, ArrowError>
-    where
-        T::Native: num::NumCast,
-        T: ArrowPrimitiveType + ArrowDictionaryKeyType,
-    {
-        let mut builder: StringDictionaryBuilder<T> =
-            self.build_string_dictionary_builder(rows.len());
-        for row in rows {
-            if let Some(value) = row.get(col_name) {
-                if let Some(str_v) = value.as_str() {
-                    builder.append(str_v).map(drop)?
-                } else {
-                    builder.append_null();
-                }
-            } else {
-                builder.append_null();
-            }
-        }
-        Ok(Arc::new(builder.finish()) as ArrayRef)
-    }
-
-    /// Read the primitive list's values into ArrayData
-    fn read_primitive_list_values<T>(&self, rows: &[Value]) -> ArrayData
-    where
-        T: ArrowPrimitiveType,
-        T::Native: num::NumCast,
-    {
-        let values = rows
-            .iter()
-            .flat_map(|row| {
-                // read values from list
-                if let Value::Array(values) = row {
-                    values
-                        .iter()
-                        .map(|value| {
-                            let v: Option<T::Native> =
-                                value.as_f64().and_then(num::cast::cast);
-                            v
-                        })
-                        .collect::<Vec<Option<T::Native>>>()
-                } else if let Value::Number(value) = row {
-                    // handle the scalar number case
-                    let v: Option<T::Native> = value.as_f64().and_then(num::cast::cast);
-                    v.map(|v| vec![Some(v)]).unwrap_or_default()
-                } else {
-                    vec![]
-                }
-            })
-            .collect::<Vec<Option<T::Native>>>();
-        let array = values.iter().collect::<PrimitiveArray<T>>();
-        array.into_data()
-    }
-}
-
-/// Reads a JSON value as a string, regardless of its type.
-/// This is useful if the expected datatype is a string, in which case we preserve
-/// all the values regardless of they type.
-///
-/// Applying `value.to_string()` unfortunately results in an escaped string, which
-/// is not what we want.
-#[inline(always)]
-fn json_value_as_string(value: &Value) -> Option<String> {
-    match value {
-        Value::Null => None,
-        Value::String(string) => Some(string.clone()),
-        _ => Some(value.to_string()),
-    }
-}
-
-/// Flattens a list of JSON values, by flattening lists, and treating all other values as
-/// single-value lists.
-/// This is used to read into nested lists (list of list, list of struct) and non-dictionary lists.
-#[inline]
-fn flatten_json_values(values: &[Value]) -> Vec<Value> {
-    values
-        .iter()
-        .flat_map(|row| {
-            if let Value::Array(values) = row {
-                values.clone()
-            } else if let Value::Null = row {
-                vec![Value::Null]
-            } else {
-                // we interpret a scalar as a single-value list to minimise data loss
-                vec![row.clone()]
-            }
-        })
-        .collect()
-}
-
-/// Flattens a list into string values, dropping Value::Null in the process.
-/// This is useful for interpreting any JSON array as string, dropping nulls.
-/// See `json_value_as_string`.
-#[inline]
-fn flatten_json_string_values(values: &[Value]) -> Vec<Option<String>> {
-    values
-        .iter()
-        .flat_map(|row| {
-            if let Value::Array(values) = row {
-                values
-                    .iter()
-                    .map(json_value_as_string)
-                    .collect::<Vec<Option<_>>>()
-            } else if let Value::Null = row {
-                vec![]
-            } else {
-                vec![json_value_as_string(row)]
-            }
-        })
-        .collect::<Vec<Option<_>>>()
-}
-/// JSON file reader
-///
-/// Note: Consider instead using [`RawReader`] which is faster and will
-/// eventually replace this implementation as part of [#3610]
-///
-/// [`RawReader`]: crate::raw::RawReader
-/// [#3610]: https://github.com/apache/arrow-rs/issues/3610
-#[derive(Debug)]
-#[deprecated(note = "Use RawReader instead")]
-#[allow(deprecated)]
-pub struct Reader<R: Read> {
-    reader: BufReader<R>,
-    /// JSON value decoder
-    decoder: Decoder,
-}
-
-#[allow(deprecated)]
-impl<R: Read> Reader<R> {
-    /// Create a new JSON Reader from any value that implements the `Read` trait.
-    ///
-    /// If reading a `File`, you can customise the Reader, such as to enable schema
-    /// inference, use `ReaderBuilder`.
-    pub fn new(reader: R, schema: SchemaRef, options: DecoderOptions) -> Self {
-        Self::from_buf_reader(BufReader::new(reader), schema, options)
-    }
-
-    /// Create a new JSON Reader from a `BufReader<R: Read>`
-    ///
-    /// To customize the schema, such as to enable schema inference, use `ReaderBuilder`
-    pub fn from_buf_reader(
-        reader: BufReader<R>,
-        schema: SchemaRef,
-        options: DecoderOptions,
-    ) -> Self {
-        Self {
-            reader,
-            decoder: Decoder::new(schema, options),
-        }
-    }
-
-    /// Returns the schema of the reader, useful for getting the schema without reading
-    /// record batches
-    pub fn schema(&self) -> SchemaRef {
-        self.decoder.schema()
-    }
-
-    /// Read the next batch of records
-    #[allow(clippy::should_implement_trait)]
-    pub fn next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        self.decoder
-            .next_batch(&mut ValueIter::new(&mut self.reader, None))
-    }
-}
-
-/// JSON file reader builder
-///
-/// Note: Consider instead using [`RawReaderBuilder`] which is faster and will
-/// eventually replace this implementation as part of [#3610]
-///
-/// [`RawReaderBuilder`]: crate::raw::RawReaderBuilder
-/// [#3610]: https://github.com/apache/arrow-rs/issues/3610
-///
-#[derive(Debug, Default)]
-#[deprecated(note = "Use RawReaderBuilder instead")]
-pub struct ReaderBuilder {
-    /// Optional schema for the JSON file
-    ///
-    /// If the schema is not supplied, the reader will try to infer the schema
-    /// based on the JSON structure.
-    schema: Option<SchemaRef>,
-    /// Optional maximum number of records to read during schema inference
-    ///
-    /// If a number is not provided, all the records are read.
-    max_records: Option<usize>,
-    /// Options for json decoder
-    options: DecoderOptions,
-}
-
-#[allow(deprecated)]
-impl ReaderBuilder {
-    /// Create a new builder for configuring JSON parsing options.
-    ///
-    /// To convert a builder into a reader, call `Reader::from_builder`
-    ///
-    /// # Example
-    ///
-    /// ```
-    /// # use std::fs::File;
-    ///
-    /// fn example() -> arrow_json::Reader<File> {
-    ///     let file = File::open("test/data/basic.json").unwrap();
-    ///
-    ///     // create a builder, inferring the schema with the first 100 records
-    ///     let builder = arrow_json::ReaderBuilder::new().infer_schema(Some(100));
-    ///
-    ///     let reader = builder.build::<File>(file).unwrap();
-    ///
-    ///     reader
-    /// }
-    /// ```
-    pub fn new() -> Self {
-        Self::default()
-    }
-
-    /// Set the JSON file's schema
-    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
-        self.schema = Some(schema);
-        self
-    }
-
-    /// Set the JSON reader to infer the schema of the file
-    pub fn infer_schema(mut self, max_records: Option<usize>) -> Self {
-        // remove any schema that is set
-        self.schema = None;
-        self.max_records = max_records;
-        self
-    }
-
-    /// Set the batch size (number of records to load at one time)
-    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
-        self.options = self.options.with_batch_size(batch_size);
-        self
-    }
-
-    /// Set the reader's column projection
-    pub fn with_projection(mut self, projection: Vec<String>) -> Self {
-        self.options = self.options.with_projection(projection);
-        self
-    }
-
-    /// Set the decoder's format Strings param
-    pub fn with_format_strings(
-        mut self,
-        format_strings: HashMap<String, String>,
-    ) -> Self {
-        self.options = self.options.with_format_strings(format_strings);
-        self
-    }
-
-    /// Create a new `Reader` from the `ReaderBuilder`
-    pub fn build<R>(self, source: R) -> Result<Reader<R>, ArrowError>
-    where
-        R: Read + Seek,
-    {
-        let mut buf_reader = BufReader::new(source);
-
-        // check if schema should be inferred
-        let schema = match self.schema {
-            Some(schema) => schema,
-            None => Arc::new(infer_json_schema_from_seekable(
-                &mut buf_reader,
-                self.max_records,
-            )?),
-        };
-
-        Ok(Reader::from_buf_reader(buf_reader, schema, self.options))
-    }
-}
-
-#[allow(deprecated)]
-impl<R: Read> Iterator for Reader<R> {
-    type Item = Result<RecordBatch, ArrowError>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.next().transpose()
-    }
-}
-
-#[cfg(test)]
-#[allow(deprecated)]
-mod tests {
-    use super::*;
-    use arrow_array::cast::AsArray;
-    use arrow_buffer::{ArrowNativeType, ToByteSlice};
-    use arrow_schema::DataType::{Dictionary, List};
-    use flate2::read::GzDecoder;
-    use std::fs::File;
-    use std::io::Cursor;
-
-    #[test]
-    fn test_json_basic() {
-        let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(7, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let a = schema.column_with_name("a").unwrap();
-        assert_eq!(0, a.0);
-        assert_eq!(&DataType::Int64, a.1.data_type());
-        let b = schema.column_with_name("b").unwrap();
-        assert_eq!(1, b.0);
-        assert_eq!(&DataType::Float64, b.1.data_type());
-        let c = schema.column_with_name("c").unwrap();
-        assert_eq!(2, c.0);
-        assert_eq!(&DataType::Boolean, c.1.data_type());
-        let d = schema.column_with_name("d").unwrap();
-        assert_eq!(3, d.0);
-        assert_eq!(&DataType::Utf8, d.1.data_type());
-
-        let aa = batch
-            .column(a.0)
-            .as_any()
-            .downcast_ref::<Int64Array>()
-            .unwrap();
-        assert_eq!(1, aa.value(0));
-        assert_eq!(-10, aa.value(1));
-        let bb = batch
-            .column(b.0)
-            .as_any()
-            .downcast_ref::<Float64Array>()
-            .unwrap();
-        assert_eq!(2.0, bb.value(0));
-        assert_eq!(-3.5, bb.value(1));
-        let cc = batch
-            .column(c.0)
-            .as_any()
-            .downcast_ref::<BooleanArray>()
-            .unwrap();
-        assert!(!cc.value(0));
-        assert!(cc.value(10));
-        let dd = batch
-            .column(d.0)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-        assert_eq!("4", dd.value(0));
-        assert_eq!("text", dd.value(8));
-    }
-
-    #[test]
-    fn test_json_empty_projection() {
-        let builder = ReaderBuilder::new()
-            .infer_schema(None)
-            .with_batch_size(64)
-            .with_projection(vec![]);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(0, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-    }
-
-    #[test]
-    fn test_json_basic_with_nulls() {
-        let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(4, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let a = schema.column_with_name("a").unwrap();
-        assert_eq!(&DataType::Int64, a.1.data_type());
-        let b = schema.column_with_name("b").unwrap();
-        assert_eq!(&DataType::Float64, b.1.data_type());
-        let c = schema.column_with_name("c").unwrap();
-        assert_eq!(&DataType::Boolean, c.1.data_type());
-        let d = schema.column_with_name("d").unwrap();
-        assert_eq!(&DataType::Utf8, d.1.data_type());
-
-        let aa = batch
-            .column(a.0)
-            .as_any()
-            .downcast_ref::<Int64Array>()
-            .unwrap();
-        assert!(aa.is_valid(0));
-        assert!(!aa.is_valid(1));
-        assert!(!aa.is_valid(11));
-        let bb = batch
-            .column(b.0)
-            .as_any()
-            .downcast_ref::<Float64Array>()
-            .unwrap();
-        assert!(bb.is_valid(0));
-        assert!(!bb.is_valid(2));
-        assert!(!bb.is_valid(11));
-        let cc = batch
-            .column(c.0)
-            .as_any()
-            .downcast_ref::<BooleanArray>()
-            .unwrap();
-        assert!(cc.is_valid(0));
-        assert!(!cc.is_valid(4));
-        assert!(!cc.is_valid(11));
-        let dd = batch
-            .column(d.0)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-        assert!(!dd.is_valid(0));
-        assert!(dd.is_valid(1));
-        assert!(!dd.is_valid(4));
-        assert!(!dd.is_valid(11));
-    }
-
-    #[test]
-    fn test_json_basic_schema() {
-        let schema = Schema::new(vec![
-            Field::new("a", DataType::Int32, true),
-            Field::new("b", DataType::Float32, false),
-            Field::new("c", DataType::Boolean, false),
-            Field::new("d", DataType::Utf8, false),
-        ]);
-
-        let mut reader: Reader<File> = Reader::new(
-            File::open("test/data/basic.json").unwrap(),
-            Arc::new(schema.clone()),
-            DecoderOptions::new(),
-        );
-        let reader_schema = reader.schema();
-        assert_eq!(reader_schema, Arc::new(schema));
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(4, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = batch.schema();
-
-        let a = schema.column_with_name("a").unwrap();
-        assert_eq!(&DataType::Int32, a.1.data_type());
-        let b = schema.column_with_name("b").unwrap();
-        assert_eq!(&DataType::Float32, b.1.data_type());
-        let c = schema.column_with_name("c").unwrap();
-        assert_eq!(&DataType::Boolean, c.1.data_type());
-        let d = schema.column_with_name("d").unwrap();
-        assert_eq!(&DataType::Utf8, d.1.data_type());
-
-        let aa = batch
-            .column(a.0)
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        assert_eq!(1, aa.value(0));
-        // test that a 64bit value is returned as null due to overflowing
-        assert!(!aa.is_valid(11));
-        let bb = batch
-            .column(b.0)
-            .as_any()
-            .downcast_ref::<Float32Array>()
-            .unwrap();
-        assert_eq!(2.0, bb.value(0));
-        assert_eq!(-3.5, bb.value(1));
-    }
-
-    #[test]
-    fn test_json_format_strings_for_date() {
-        let schema = Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, true)]));
-        let e = schema.column_with_name("e").unwrap();
-        assert_eq!(&DataType::Date32, e.1.data_type());
-        let mut fmts = HashMap::new();
-        let date_format = "%Y-%m-%d".to_string();
-        fmts.insert("e".to_string(), date_format.clone());
-
-        let mut reader: Reader<File> = Reader::new(
-            File::open("test/data/basic.json").unwrap(),
-            schema.clone(),
-            DecoderOptions::new().with_format_strings(fmts),
-        );
-        let reader_schema = reader.schema();
-        assert_eq!(reader_schema, schema);
-        let batch = reader.next().unwrap().unwrap();
-
-        let ee = batch
-            .column(e.0)
-            .as_any()
-            .downcast_ref::<Date32Array>()
-            .unwrap();
-        let dt = Date32Type::parse_formatted("1970-1-2", &date_format).unwrap();
-        assert_eq!(dt, ee.value(0));
-        let dt = Date32Type::parse_formatted("1969-12-31", &date_format).unwrap();
-        assert_eq!(dt, ee.value(1));
-        assert!(!ee.is_valid(2));
-    }
-
-    #[test]
-    fn test_json_basic_schema_projection() {
-        // We test implicit and explicit projection:
-        // Implicit: omitting fields from a schema
-        // Explicit: supplying a vec of fields to take
-        let schema = Schema::new(vec![
-            Field::new("a", DataType::Int32, true),
-            Field::new("b", DataType::Float32, false),
-            Field::new("c", DataType::Boolean, false),
-        ]);
-
-        let mut reader: Reader<File> = Reader::new(
-            File::open("test/data/basic.json").unwrap(),
-            Arc::new(schema),
-            DecoderOptions::new().with_projection(vec!["a".to_string(), "c".to_string()]),
-        );
-        let reader_schema = reader.schema();
-        let expected_schema = Arc::new(Schema::new(vec![
-            Field::new("a", DataType::Int32, true),
-            Field::new("c", DataType::Boolean, false),
-        ]));
-        assert_eq!(reader_schema, expected_schema);
-
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(2, batch.num_columns());
-        assert_eq!(2, batch.schema().fields().len());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = batch.schema();
-        assert_eq!(reader_schema, schema);
-
-        let a = schema.column_with_name("a").unwrap();
-        assert_eq!(0, a.0);
-        assert_eq!(&DataType::Int32, a.1.data_type());
-        let c = schema.column_with_name("c").unwrap();
-        assert_eq!(1, c.0);
-        assert_eq!(&DataType::Boolean, c.1.data_type());
-    }
-
-    #[test]
-    fn test_json_arrays() {
-        let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/arrays.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(4, batch.num_columns());
-        assert_eq!(3, batch.num_rows());
-
-        let schema = batch.schema();
-
-        let a = schema.column_with_name("a").unwrap();
-        assert_eq!(&DataType::Int64, a.1.data_type());
-        let b = schema.column_with_name("b").unwrap();
-        assert_eq!(
-            &DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
-            b.1.data_type()
-        );
-        let c = schema.column_with_name("c").unwrap();
-        assert_eq!(
-            &DataType::List(Arc::new(Field::new("item", DataType::Boolean, true))),
-            c.1.data_type()
-        );
-        let d = schema.column_with_name("d").unwrap();
-        assert_eq!(&DataType::Utf8, d.1.data_type());
-
-        let aa = batch.column(a.0).as_primitive::<Int64Type>();
-        assert_eq!(1, aa.value(0));
-        assert_eq!(-10, aa.value(1));
-        assert_eq!(1627668684594000000, aa.value(2));
-        let bb = batch.column(b.0).as_list::<i32>();
-        let bb = bb.values().as_primitive::<Float64Type>();
-        assert_eq!(9, bb.len());
-        assert_eq!(2.0, bb.value(0));
-        assert_eq!(-6.1, bb.value(5));
-        assert!(!bb.is_valid(7));
-
-        let cc = batch
-            .column(c.0)
-            .as_any()
-            .downcast_ref::<ListArray>()
-            .unwrap();
-        let cc = cc.values().as_boolean();
-        assert_eq!(6, cc.len());
-        assert!(!cc.value(0));
-        assert!(!cc.value(4));
-        assert!(!cc.is_valid(5));
-    }
-
-    #[test]
-    fn test_invalid_json_infer_schema() {
-        let re =
-            infer_json_schema_from_seekable(&mut BufReader::new(Cursor::new(b"}")), None);
-        assert_eq!(
-            re.err().unwrap().to_string(),
-            "Json error: Not valid JSON: expected value at line 1 column 1",
-        );
-    }
-
-    #[test]
-    fn test_invalid_json_read_record() {
-        let schema = Arc::new(Schema::new(vec![Field::new(
-            "a",
-            DataType::Struct(vec![Field::new("a", DataType::Utf8, true)].into()),
-            true,
-        )]));
-        let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
-        let mut reader = builder.build(Cursor::new(b"}")).unwrap();
-        assert_eq!(
-            reader.next().err().unwrap().to_string(),
-            "Json error: Not valid JSON: expected value at line 1 column 1",
-        );
-    }
-
-    #[test]
-    fn test_coercion_scalar_and_list() {
-        use arrow_schema::DataType::*;
-
-        assert_eq!(
-            List(Arc::new(Field::new("item", Float64, true))),
-            coerce_data_type(vec![
-                &Float64,
-                &List(Arc::new(Field::new("item", Float64, true)))
-            ])
-        );
-        assert_eq!(
-            List(Arc::new(Field::new("item", Float64, true))),
-            coerce_data_type(vec![
-                &Float64,
-                &List(Arc::new(Field::new("item", Int64, true)))
-            ])
-        );
-        assert_eq!(
-            List(Arc::new(Field::new("item", Int64, true))),
-            coerce_data_type(vec![
-                &Int64,
-                &List(Arc::new(Field::new("item", Int64, true)))
-            ])
-        );
-        // boolean and number are incompatible, return utf8
-        assert_eq!(
-            List(Arc::new(Field::new("item", Utf8, true))),
-            coerce_data_type(vec![
-                &Boolean,
-                &List(Arc::new(Field::new("item", Float64, true)))
-            ])
-        );
-    }
-
-    #[test]
-    fn test_mixed_json_arrays() {
-        let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/mixed_arrays.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        let mut file = File::open("test/data/mixed_arrays.json.gz").unwrap();
-        let mut reader = BufReader::new(GzDecoder::new(&file));
-        let schema = infer_json_schema(&mut reader, None).unwrap();
-        file.rewind().unwrap();
-
-        let reader = BufReader::new(GzDecoder::new(&file));
-        let options = DecoderOptions::new().with_batch_size(64);
-        let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), options);
-        let batch_gz = reader.next().unwrap().unwrap();
-
-        for batch in vec![batch, batch_gz] {
-            assert_eq!(4, batch.num_columns());
-            assert_eq!(4, batch.num_rows());
-
-            let schema = batch.schema();
-
-            let a = schema.column_with_name("a").unwrap();
-            assert_eq!(&DataType::Int64, a.1.data_type());
-            let b = schema.column_with_name("b").unwrap();
-            assert_eq!(
-                &DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
-                b.1.data_type()
-            );
-            let c = schema.column_with_name("c").unwrap();
-            assert_eq!(
-                &DataType::List(Arc::new(Field::new("item", DataType::Boolean, true))),
-                c.1.data_type()
-            );
-            let d = schema.column_with_name("d").unwrap();
-            assert_eq!(
-                &DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
-                d.1.data_type()
-            );
-
-            let bb = batch
-                .column(b.0)
-                .as_any()
-                .downcast_ref::<ListArray>()
-                .unwrap();
-            let bb = bb.values().as_primitive::<Float64Type>();
-            assert_eq!(10, bb.len());
-            assert_eq!(4.0, bb.value(9));
-
-            let cc = batch.column(c.0).as_list::<i32>();
-            // test that the list offsets are correct
-            assert_eq!(cc.value_offsets(), &[0, 2, 2, 4, 5]);
-            let cc = cc.values().as_boolean();
-            let cc_expected = BooleanArray::from(vec![
-                Some(false),
-                Some(true),
-                Some(false),
-                None,
-                Some(false),
-            ]);
-            assert_eq!(cc, &cc_expected);
-
-            let dd = batch.column(d.0).as_list::<i32>();
-            // test that the list offsets are correct
-            assert_eq!(dd.value_offsets(), &[0, 1, 1, 2, 6]);
-
-            let dd = dd.values().as_string::<i32>();
-            // values are 6 because a `d: null` is treated as a null slot
-            // and a list's null slot can be omitted from the child (i.e. same offset)
-            assert_eq!(6, dd.len());
-            assert_eq!("text", dd.value(1));
-            assert_eq!("1", dd.value(2));
-            assert_eq!("false", dd.value(3));
-            assert_eq!("array", dd.value(4));
-            assert_eq!("2.4", dd.value(5));
-        }
-    }
-
-    #[test]
-    fn test_nested_struct_json_arrays() {
-        let c_field = Field::new(
-            "c",
-            DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
-            true,
-        );
-        let a_field = Field::new(
-            "a",
-            DataType::Struct(Fields::from(vec![
-                Field::new("b", DataType::Boolean, true),
-                c_field.clone(),
-            ])),
-            true,
-        );
-        let schema = Arc::new(Schema::new(vec![a_field.clone()]));
-        let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/nested_structs.json").unwrap())
-            .unwrap();
-
-        // build expected output
-        let d = StringArray::from(vec![Some("text"), None, Some("text"), None]);
-        let c = ArrayDataBuilder::new(c_field.data_type().clone())
-            .len(4)
-            .add_child_data(d.into_data())
-            .null_bit_buffer(Some(Buffer::from(vec![0b00000101])))
-            .build()
-            .unwrap();
-        let b = BooleanArray::from(vec![Some(true), Some(false), Some(true), None]);
-        let a = ArrayDataBuilder::new(a_field.data_type().clone())
-            .len(4)
-            .add_child_data(b.into_data())
-            .add_child_data(c)
-            .null_bit_buffer(Some(Buffer::from(vec![0b00000111])))
-            .build()
-            .unwrap();
-        let expected = make_array(a);
-
-        // compare `a` with result from json reader
-        let batch = reader.next().unwrap().unwrap();
-        let read = batch.column(0);
-        assert_eq!(&expected, read);
-    }
-
-    #[test]
-    fn test_nested_list_json_arrays() {
-        let c_field = Field::new(
-            "c",
-            DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
-            true,
-        );
-        let a_struct_field = Field::new(
-            "a",
-            DataType::Struct(Fields::from(vec![
-                Field::new("b", DataType::Boolean, true),
-                c_field.clone(),
-            ])),
-            true,
-        );
-        let a_field =
-            Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
-        let schema = Arc::new(Schema::new(vec![a_field.clone()]));
-        let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
-        let json_content = r#"
-        {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
-        {"a": [{"b": false, "c": null}]}
-        {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
-        {"a": null}
-        {"a": []}
-        {"a": [null]}
-        "#;
-        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
-
-        // build expected output
-        let d = StringArray::from(vec![
-            Some("a_text"),
-            Some("b_text"),
-            None,
-            Some("c_text"),
-            Some("d_text"),
-            None,
-            None,
-        ]);
-        let c = ArrayDataBuilder::new(c_field.data_type().clone())
-            .len(7)
-            .add_child_data(d.to_data())
-            .null_bit_buffer(Some(Buffer::from(vec![0b00111011])))
-            .build()
-            .unwrap();
-        let b = BooleanArray::from(vec![
-            Some(true),
-            Some(false),
-            Some(false),
-            Some(true),
-            None,
-            Some(true),
-            None,
-        ]);
-        let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
-            .len(7)
-            .add_child_data(b.to_data())
-            .add_child_data(c.clone())
-            .null_bit_buffer(Some(Buffer::from(vec![0b00111111])))
-            .build()
-            .unwrap();
-        let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
-            .len(6)
-            .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
-            .add_child_data(a)
-            .null_bit_buffer(Some(Buffer::from(vec![0b00110111])))
-            .build()
-            .unwrap();
-        let expected = make_array(a_list);
-
-        // compare `a` with result from json reader
-        let batch = reader.next().unwrap().unwrap();
-        let read = batch.column(0);
-        assert_eq!(read.len(), 6);
-        // compare the arrays the long way around, to better detect differences
-        let read: &ListArray = read.as_list::<i32>();
-        let expected = expected.as_list::<i32>();
-        assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
-        // compare list null buffers
-        assert_eq!(read.nulls(), expected.nulls());
-        // build struct from list
-        let struct_array = read.values().as_struct();
-        let expected_struct_array = expected.values().as_struct();
-
-        assert_eq!(7, struct_array.len());
-        assert_eq!(1, struct_array.null_count());
-        assert_eq!(7, expected_struct_array.len());
-        assert_eq!(1, expected_struct_array.null_count());
-        // test struct's nulls
-        assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
-        // test struct's fields
-        let read_b = struct_array.column(0);
-        assert_eq!(b.data_ref(), read_b.data_ref());
-        let read_c = struct_array.column(1);
-        assert_eq!(&c, read_c.data_ref());
-        let read_c: &StructArray = read_c.as_any().downcast_ref::<StructArray>().unwrap();
-        let read_d = read_c.column(0);
-        assert_eq!(d.data_ref(), read_d.data_ref());
-
-        assert_eq!(read.data_ref(), expected.data_ref());
-    }
-
-    #[test]
-    fn test_map_json_arrays() {
-        let account_field = Field::new("account", DataType::UInt16, false);
-        let value_list_type =
-            DataType::List(Arc::new(Field::new("item", DataType::Utf8, false)));
-        let entries_struct_type = DataType::Struct(Fields::from(vec![
-            Field::new("key", DataType::Utf8, false),
-            Field::new("value", value_list_type.clone(), true),
-        ]));
-        let stocks_field = Field::new(
-            "stocks",
-            DataType::Map(
-                Arc::new(Field::new("entries", entries_struct_type.clone(), false)),
-                false,
-            ),
-            true,
-        );
-        let schema = Arc::new(Schema::new(vec![account_field, stocks_field.clone()]));
-        let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
-        // Note: account 456 has 'long' twice, to show that the JSON reader will overwrite
-        // existing keys. This thus guarantees unique keys for the map
-        let json_content = r#"
-        {"account": 123, "stocks":{"long": ["$AAA", "$BBB"], "short": ["$CCC", "$D"]}}
-        {"account": 456, "stocks":{"long": null, "long": ["$AAA", "$CCC", "$D"], "short": null}}
-        {"account": 789, "stocks":{"hedged": ["$YYY"], "long": null, "short": ["$D"]}}
-        "#;
-        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
-
-        // build expected output
-        let expected_accounts = UInt16Array::from(vec![123, 456, 789]);
-
-        let expected_keys = StringArray::from(vec![
-            "long", "short", "long", "short", "hedged", "long", "short",
-        ])
-        .into_data();
-        let expected_value_array_data = StringArray::from(vec![
-            "$AAA", "$BBB", "$CCC", "$D", "$AAA", "$CCC", "$D", "$YYY", "$D",
-        ])
-        .into_data();
-        // Create the list that holds ["$_", "$_"]
-        let expected_values = ArrayDataBuilder::new(value_list_type)
-            .len(7)
-            .add_buffer(Buffer::from(
-                vec![0i32, 2, 4, 7, 7, 8, 8, 9].to_byte_slice(),
-            ))
-            .add_child_data(expected_value_array_data)
-            .null_bit_buffer(Some(Buffer::from(vec![0b01010111])))
-            .build()
-            .unwrap();
-        let expected_stocks_entries_data = ArrayDataBuilder::new(entries_struct_type)
-            .len(7)
-            .add_child_data(expected_keys)
-            .add_child_data(expected_values)
-            .build()
-            .unwrap();
-        let expected_stocks_data =
-            ArrayDataBuilder::new(stocks_field.data_type().clone())
-                .len(3)
-                .add_buffer(Buffer::from(vec![0i32, 2, 4, 7].to_byte_slice()))
-                .add_child_data(expected_stocks_entries_data)
-                .build()
-                .unwrap();
-
-        let expected_stocks = make_array(expected_stocks_data);
-
-        // compare with result from json reader
-        let batch = reader.next().unwrap().unwrap();
-        assert_eq!(batch.num_rows(), 3);
-        assert_eq!(batch.num_columns(), 2);
-        let col1 = batch.column(0);
-        assert_eq!(col1.as_ref(), &expected_accounts);
-        // Compare the map
-        let col2 = batch.column(1);
-        assert_eq!(col2.as_ref(), &expected_stocks);
-    }
-
-    #[test]
-    fn test_dictionary_from_json_basic_with_nulls() {
-        let schema = Schema::new(vec![Field::new(
-            "d",
-            Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
-            true,
-        )]);
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let d = schema.column_with_name("d").unwrap();
-        assert_eq!(
-            &Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
-            d.1.data_type()
-        );
-
-        let dd = batch
-            .column(d.0)
-            .as_any()
-            .downcast_ref::<DictionaryArray<Int16Type>>()
-            .unwrap();
-        assert!(!dd.is_valid(0));
-        assert!(dd.is_valid(1));
-        assert!(dd.is_valid(2));
-        assert!(!dd.is_valid(11));
-
-        assert_eq!(
-            dd.keys(),
-            &Int16Array::from(vec![
-                None,
-                Some(0),
-                Some(1),
-                Some(0),
-                None,
-                None,
-                Some(0),
-                None,
-                Some(1),
-                Some(0),
-                Some(0),
-                None
-            ])
-        );
-    }
-
-    #[test]
-    fn test_dictionary_from_json_int8() {
-        let schema = Schema::new(vec![Field::new(
-            "d",
-            Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
-            true,
-        )]);
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let d = schema.column_with_name("d").unwrap();
-        assert_eq!(
-            &Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
-            d.1.data_type()
-        );
-    }
-
-    #[test]
-    fn test_dictionary_from_json_int32() {
-        let schema = Schema::new(vec![Field::new(
-            "d",
-            Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
-            true,
-        )]);
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let d = schema.column_with_name("d").unwrap();
-        assert_eq!(
-            &Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
-            d.1.data_type()
-        );
-    }
-
-    #[test]
-    fn test_dictionary_from_json_int64() {
-        let schema = Schema::new(vec![Field::new(
-            "d",
-            Dictionary(Box::new(DataType::Int64), Box::new(DataType::Utf8)),
-            true,
-        )]);
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let d = schema.column_with_name("d").unwrap();
-        assert_eq!(
-            &Dictionary(Box::new(DataType::Int64), Box::new(DataType::Utf8)),
-            d.1.data_type()
-        );
-    }
-
-    #[test]
-    fn test_skip_empty_lines() {
-        let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
-        let json_content = "
-        {\"a\": 1}
-
-        {\"a\": 2}
-
-        {\"a\": 3}";
-        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(3, batch.num_rows());
-
-        let schema = reader.schema();
-        let c = schema.column_with_name("a").unwrap();
-        assert_eq!(&DataType::Int64, c.1.data_type());
-    }
-
-    #[test]
-    fn test_row_type_validation() {
-        let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
-        let json_content = "
-        [1, \"hello\"]
-        \"world\"";
-        let re = builder.build(Cursor::new(json_content));
-        assert_eq!(
-            re.err().unwrap().to_string(),
-            r#"Json error: Expected JSON record to be an object, found Array [Number(1), String("hello")]"#,
-        );
-    }
-
-    #[test]
-    fn test_list_of_string_dictionary_from_json() {
-        let schema = Schema::new(vec![Field::new(
-            "events",
-            List(Arc::new(Field::new(
-                "item",
-                Dictionary(Box::new(DataType::UInt64), Box::new(DataType::Utf8)),
-                true,
-            ))),
-            true,
-        )]);
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/list_string_dict_nested.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(3, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let events = schema.column_with_name("events").unwrap();
-        assert_eq!(
-            &List(Arc::new(Field::new(
-                "item",
-                Dictionary(Box::new(DataType::UInt64), Box::new(DataType::Utf8)),
-                true
-            ))),
-            events.1.data_type()
-        );
-
-        let evs_list = batch
-            .column(events.0)
-            .as_any()
-            .downcast_ref::<ListArray>()
-            .unwrap();
-        let evs_list = evs_list.values().as_dictionary::<UInt64Type>();
-        assert_eq!(6, evs_list.len());
-        assert!(evs_list.is_valid(1));
-        assert_eq!(DataType::Utf8, evs_list.value_type());
-
-        // dict from the events list
-        let dict_el = evs_list.values().as_string::<i32>();
-        assert_eq!(3, dict_el.len());
-        assert_eq!("Elect Leader", dict_el.value(0));
-        assert_eq!("Do Ballot", dict_el.value(1));
-        assert_eq!("Send Data", dict_el.value(2));
-    }
-
-    #[test]
-    fn test_list_of_string_dictionary_from_json_with_nulls() {
-        let schema = Schema::new(vec![Field::new(
-            "events",
-            List(Arc::new(Field::new(
-                "item",
-                Dictionary(Box::new(DataType::UInt64), Box::new(DataType::Utf8)),
-                true,
-            ))),
-            true,
-        )]);
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(
-                File::open("test/data/list_string_dict_nested_nulls.json").unwrap(),
-            )
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(3, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let events = schema.column_with_name("events").unwrap();
-        assert_eq!(
-            &List(Arc::new(Field::new(
-                "item",
-                Dictionary(Box::new(DataType::UInt64), Box::new(DataType::Utf8)),
-                true
-            ))),
-            events.1.data_type()
-        );
-
-        let evs_list = batch
-            .column(events.0)
-            .as_any()
-            .downcast_ref::<ListArray>()
-            .unwrap();
-        let evs_list = evs_list.values().as_dictionary::<UInt64Type>();
-        assert_eq!(8, evs_list.len());
-        assert!(evs_list.is_valid(1));
-        assert_eq!(DataType::Utf8, evs_list.value_type());
-
-        // dict from the events list
-        let dict_el = evs_list.values();
-        let dict_el = dict_el.as_any().downcast_ref::<StringArray>().unwrap();
-        assert_eq!(2, evs_list.null_count());
-        assert_eq!(3, dict_el.len());
-        assert_eq!("Elect Leader", dict_el.value(0));
-        assert_eq!("Do Ballot", dict_el.value(1));
-        assert_eq!("Send Data", dict_el.value(2));
-    }
-
-    #[test]
-    fn test_dictionary_from_json_uint8() {
-        let schema = Schema::new(vec![Field::new(
-            "d",
-            Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
-            true,
-        )]);
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let d = schema.column_with_name("d").unwrap();
-        assert_eq!(
-            &Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
-            d.1.data_type()
-        );
-    }
-
-    #[test]
-    fn test_dictionary_from_json_uint32() {
-        let schema = Schema::new(vec![Field::new(
-            "d",
-            Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
-            true,
-        )]);
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let d = schema.column_with_name("d").unwrap();
-        assert_eq!(
-            &Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
-            d.1.data_type()
-        );
-    }
-
-    #[test]
-    fn test_dictionary_from_json_uint64() {
-        let schema = Schema::new(vec![Field::new(
-            "d",
-            Dictionary(Box::new(DataType::UInt64), Box::new(DataType::Utf8)),
-            true,
-        )]);
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let d = schema.column_with_name("d").unwrap();
-        assert_eq!(
-            &Dictionary(Box::new(DataType::UInt64), Box::new(DataType::Utf8)),
-            d.1.data_type()
-        );
-    }
-
-    #[test]
-    fn test_with_multiple_batches() {
-        let builder = ReaderBuilder::new()
-            .infer_schema(Some(4))
-            .with_batch_size(5);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
-            .unwrap();
-
-        let mut num_records = Vec::new();
-        while let Some(rb) = reader.next().unwrap() {
-            num_records.push(rb.num_rows());
-        }
-
-        assert_eq!(vec![5, 5, 2], num_records);
-    }
-
-    #[test]
-    fn test_json_infer_schema() {
-        let schema = Schema::new(vec![
-            Field::new("a", DataType::Int64, true),
-            Field::new(
-                "b",
-                DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
-                true,
-            ),
-            Field::new(
-                "c",
-                DataType::List(Arc::new(Field::new("item", DataType::Boolean, true))),
-                true,
-            ),
-            Field::new(
-                "d",
-                DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
-                true,
-            ),
-        ]);
-
-        let mut reader =
-            BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
-        let inferred_schema = infer_json_schema_from_seekable(&mut reader, None).unwrap();
-
-        assert_eq!(inferred_schema, schema);
-
-        let file = File::open("test/data/mixed_arrays.json.gz").unwrap();
-        let mut reader = BufReader::new(GzDecoder::new(&file));
-        let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
-
-        assert_eq!(inferred_schema, schema);
-    }
-
-    #[test]
-    fn test_json_infer_schema_nested_structs() {
-        let schema = Schema::new(vec![
-            Field::new(
-                "c1",
-                DataType::Struct(Fields::from(vec![
-                    Field::new("a", DataType::Boolean, true),
-                    Field::new(
-                        "b",
-                        DataType::Struct(
-                            vec![Field::new("c", DataType::Utf8, true)].into(),
-                        ),
-                        true,
-                    ),
-                ])),
-                true,
-            ),
-            Field::new("c2", DataType::Int64, true),
-            Field::new("c3", DataType::Utf8, true),
-        ]);
-
-        let inferred_schema = infer_json_schema_from_iterator(
-            vec![
-                Ok(serde_json::json!({"c1": {"a": true, "b": {"c": "text"}}, "c2": 1})),
-                Ok(serde_json::json!({"c1": {"a": false, "b": null}, "c2": 0})),
-                Ok(serde_json::json!({"c1": {"a": true, "b": {"c": "text"}}, "c3": "ok"})),
-            ]
-            .into_iter(),
-        )
-        .unwrap();
-
-        assert_eq!(inferred_schema, schema);
-    }
-
-    #[test]
-    fn test_json_infer_schema_struct_in_list() {
-        let schema = Schema::new(vec![
-            Field::new(
-                "c1",
-                DataType::List(Arc::new(Field::new(
-                    "item",
-                    DataType::Struct(Fields::from(vec![
-                        Field::new("a", DataType::Utf8, true),
-                        Field::new("b", DataType::Int64, true),
-                        Field::new("c", DataType::Boolean, true),
-                    ])),
-                    true,
-                ))),
-                true,
-            ),
-            Field::new("c2", DataType::Float64, true),
-            Field::new(
-                "c3",
-                // empty json array's inner types are inferred as null
-                DataType::List(Arc::new(Field::new("item", DataType::Null, true))),
-                true,
-            ),
-        ]);
-
-        let inferred_schema = infer_json_schema_from_iterator(
-            vec![
-                Ok(serde_json::json!({
-                    "c1": [{"a": "foo", "b": 100}], "c2": 1, "c3": [],
-                })),
-                Ok(serde_json::json!({
-                    "c1": [{"a": "bar", "b": 2}, {"a": "foo", "c": true}], "c2": 0, "c3": [],
-                })),
-                Ok(serde_json::json!({"c1": [], "c2": 0.5, "c3": []})),
-            ]
-            .into_iter(),
-        )
-        .unwrap();
-
-        assert_eq!(inferred_schema, schema);
-    }
-
-    #[test]
-    fn test_json_infer_schema_nested_list() {
-        let schema = Schema::new(vec![
-            Field::new(
-                "c1",
-                DataType::List(Arc::new(Field::new(
-                    "item",
-                    DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
-                    true,
-                ))),
-                true,
-            ),
-            Field::new("c2", DataType::Float64, true),
-        ]);
-
-        let inferred_schema = infer_json_schema_from_iterator(
-            vec![
-                Ok(serde_json::json!({
-                    "c1": [],
-                    "c2": 12,
-                })),
-                Ok(serde_json::json!({
-                    "c1": [["a", "b"], ["c"]],
-                })),
-                Ok(serde_json::json!({
-                    "c1": [["foo"]],
-                    "c2": 0.11,
-                })),
-            ]
-            .into_iter(),
-        )
-        .unwrap();
-
-        assert_eq!(inferred_schema, schema);
-    }
-
-    #[test]
-    fn test_timestamp_from_json_seconds() {
-        let schema = Schema::new(vec![Field::new(
-            "a",
-            DataType::Timestamp(TimeUnit::Second, None),
-            true,
-        )]);
-
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let a = schema.column_with_name("a").unwrap();
-        assert_eq!(
-            &DataType::Timestamp(TimeUnit::Second, None),
-            a.1.data_type()
-        );
-
-        let aa = batch
-            .column(a.0)
-            .as_any()
-            .downcast_ref::<TimestampSecondArray>()
-            .unwrap();
-        assert!(aa.is_valid(0));
-        assert!(!aa.is_valid(1));
-        assert!(!aa.is_valid(2));
-        assert_eq!(1, aa.value(0));
-        assert_eq!(1, aa.value(3));
-        assert_eq!(5, aa.value(7));
-    }
-
-    #[test]
-    fn test_timestamp_from_json_milliseconds() {
-        let schema = Schema::new(vec![Field::new(
-            "a",
-            DataType::Timestamp(TimeUnit::Millisecond, None),
-            true,
-        )]);
-
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let a = schema.column_with_name("a").unwrap();
-        assert_eq!(
-            &DataType::Timestamp(TimeUnit::Millisecond, None),
-            a.1.data_type()
-        );
-
-        let aa = batch
-            .column(a.0)
-            .as_any()
-            .downcast_ref::<TimestampMillisecondArray>()
-            .unwrap();
-        assert!(aa.is_valid(0));
-        assert!(!aa.is_valid(1));
-        assert!(!aa.is_valid(2));
-        assert_eq!(1, aa.value(0));
-        assert_eq!(1, aa.value(3));
-        assert_eq!(5, aa.value(7));
-    }
-
-    #[test]
-    fn test_date_from_json_milliseconds() {
-        let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
-
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let a = schema.column_with_name("a").unwrap();
-        assert_eq!(&DataType::Date64, a.1.data_type());
-
-        let aa = batch
-            .column(a.0)
-            .as_any()
-            .downcast_ref::<Date64Array>()
-            .unwrap();
-        assert!(aa.is_valid(0));
-        assert!(!aa.is_valid(1));
-        assert!(!aa.is_valid(2));
-        assert_eq!(1, aa.value(0));
-        assert_eq!(1, aa.value(3));
-        assert_eq!(5, aa.value(7));
-    }
-
-    #[test]
-    fn test_time_from_json_nanoseconds() {
-        let schema = Schema::new(vec![Field::new(
-            "a",
-            DataType::Time64(TimeUnit::Nanosecond),
-            true,
-        )]);
-
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let a = schema.column_with_name("a").unwrap();
-        assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
-
-        let aa = batch
-            .column(a.0)
-            .as_any()
-            .downcast_ref::<Time64NanosecondArray>()
-            .unwrap();
-        assert!(aa.is_valid(0));
-        assert!(!aa.is_valid(1));
-        assert!(!aa.is_valid(2));
-        assert_eq!(1, aa.value(0));
-        assert_eq!(1, aa.value(3));
-        assert_eq!(5, aa.value(7));
-    }
-
-    #[test]
-    fn test_time_from_string() {
-        parse_string_column::<Time64NanosecondType>(4);
-        parse_string_column::<Time64MicrosecondType>(4);
-        parse_string_column::<Time32MillisecondType>(4);
-        parse_string_column::<Time32SecondType>(4);
-    }
-
-    fn parse_string_column<T>(value: T::Native)
-    where
-        T: ArrowPrimitiveType,
-    {
-        let schema = Schema::new(vec![Field::new("d", T::DATA_TYPE, true)]);
-
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
-            .unwrap();
-
-        let batch = reader.next().unwrap().unwrap();
-        let dd = batch
-            .column(0)
-            .as_any()
-            .downcast_ref::<PrimitiveArray<T>>()
-            .unwrap();
-        assert_eq!(value, dd.value(1));
-        assert!(!dd.is_valid(2));
-    }
-
-    #[test]
-    fn test_json_read_nested_list() {
-        let schema = Schema::new(vec![Field::new(
-            "c1",
-            DataType::List(Arc::new(Field::new(
-                "item",
-                DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
-                true,
-            ))),
-            true,
-        )]);
-
-        let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
-        let batch = decoder
-            .next_batch(
-                &mut vec![
-                    Ok(serde_json::json!({
-                        "c1": [],
-                    })),
-                    Ok(serde_json::json!({
-                        "c1": [["a", "b"], ["c"], ["e", "f"], ["g"], ["h"], ["i"], ["j"], ["k"]],
-                    })),
-                    Ok(serde_json::json!({
-                        "c1": [["foo"], ["bar"]],
-                    })),
-                ]
-                .into_iter(),
-            )
-            .unwrap()
-            .unwrap();
-
-        assert_eq!(batch.num_columns(), 1);
-        assert_eq!(batch.num_rows(), 3);
-    }
-
-    #[test]
-    fn test_json_read_list_of_structs() {
-        let schema = Schema::new(vec![Field::new(
-            "c1",
-            DataType::List(Arc::new(Field::new(
-                "item",
-                DataType::Struct(vec![Field::new("a", DataType::Int64, true)].into()),
-                true,
-            ))),
-            true,
-        )]);
-
-        let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
-        let batch = decoder
-            .next_batch(
-                // NOTE: total struct element count needs to be greater than
-                // bit_util::ceil(array_count, 8) to test validity bit buffer length calculation
-                // logic
-                &mut vec![
-                    Ok(serde_json::json!({
-                        "c1": [{"a": 1}],
-                    })),
-                    Ok(serde_json::json!({
-                        "c1": [{"a": 2}, {"a": 3}, {"a": 4}, {"a": 5}, {"a": 6}, {"a": 7}],
-                    })),
-                    Ok(serde_json::json!({
-                        "c1": [{"a": 10}, {"a": 11}],
-                    })),
-                ]
-                .into_iter(),
-            )
-            .unwrap()
-            .unwrap();
-
-        assert_eq!(batch.num_columns(), 1);
-        assert_eq!(batch.num_rows(), 3);
-    }
-
-    #[test]
-    fn test_json_read_binary_structs() {
-        let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]);
-        let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
-        let batch = decoder
-            .next_batch(
-                &mut vec![
-                    Ok(serde_json::json!({
-                        "c1": "₁₂₃",
-                    })),
-                    Ok(serde_json::json!({
-                        "c1": "foo",
-                    })),
-                ]
-                .into_iter(),
-            )
-            .unwrap()
-            .unwrap();
-        let data = batch.columns().iter().collect::<Vec<_>>();
-
-        let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]);
-        let binary_values = BinaryArray::from(vec!["₁₂₃".as_bytes(), "foo".as_bytes()]);
-        let expected_batch =
-            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(binary_values)])
-                .unwrap();
-        let expected_data = expected_batch.columns().iter().collect::<Vec<_>>();
-
-        assert_eq!(data, expected_data);
-        assert_eq!(batch.num_columns(), 1);
-        assert_eq!(batch.num_rows(), 2);
-    }
-
-    #[test]
-    fn test_json_iterator() {
-        let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(5);
-        let reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic.json").unwrap())
-            .unwrap();
-        let schema = reader.schema();
-        let (col_a_index, _) = schema.column_with_name("a").unwrap();
-
-        let mut sum_num_rows = 0;
-        let mut num_batches = 0;
-        let mut sum_a = 0;
-        for batch in reader {
-            let batch = batch.unwrap();
-            assert_eq!(7, batch.num_columns());
-            sum_num_rows += batch.num_rows();
-            num_batches += 1;
-            let batch_schema = batch.schema();
-            assert_eq!(schema, batch_schema);
-            let a_array = batch
-                .column(col_a_index)
-                .as_any()
-                .downcast_ref::<Int64Array>()
-                .unwrap();
-            sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
-        }
-        assert_eq!(12, sum_num_rows);
-        assert_eq!(3, num_batches);
-        assert_eq!(100000000000011, sum_a);
-    }
-
-    #[test]
-    fn test_options_clone() {
-        // ensure options have appropriate derivation
-        let options = DecoderOptions::new().with_batch_size(64);
-        let cloned = options.clone();
-        assert_eq!(options, cloned);
-    }
-
-    pub fn decimal_json_tests<T: DecimalType>(data_type: DataType) {
-        let schema = Schema::new(vec![
-            Field::new("a", data_type.clone(), true),
-            Field::new("b", data_type.clone(), true),
-            Field::new("f", data_type.clone(), true),
-        ]);
-
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .with_batch_size(64);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open("test/data/basic.json").unwrap())
-            .unwrap();
-        let batch = reader.next().unwrap().unwrap();
-
-        assert_eq!(3, batch.num_columns());
-        assert_eq!(12, batch.num_rows());
-
-        let schema = reader.schema();
-        let batch_schema = batch.schema();
-        assert_eq!(schema, batch_schema);
-
-        let a = schema.column_with_name("a").unwrap();
-        let b = schema.column_with_name("b").unwrap();
-        let f = schema.column_with_name("f").unwrap();
-        assert_eq!(&data_type, a.1.data_type());
-        assert_eq!(&data_type, b.1.data_type());
-        assert_eq!(&data_type, f.1.data_type());
-
-        let aa = batch
-            .column(a.0)
-            .as_any()
-            .downcast_ref::<PrimitiveArray<T>>()
-            .unwrap();
-        assert_eq!(T::Native::usize_as(100), aa.value(0));
-        assert_eq!(T::Native::usize_as(100), aa.value(3));
-        assert_eq!(T::Native::usize_as(500), aa.value(7));
-
-        let bb = batch
-            .column(b.0)
-            .as_any()
-            .downcast_ref::<PrimitiveArray<T>>()
-            .unwrap();
-        assert_eq!(T::Native::usize_as(200), bb.value(0));
-        assert_eq!(T::Native::usize_as(350).neg_wrapping(), bb.value(1));
-        assert_eq!(T::Native::usize_as(60), bb.value(8));
-
-        let ff = batch
-            .column(f.0)
-            .as_any()
-            .downcast_ref::<PrimitiveArray<T>>()
-            .unwrap();
-        assert_eq!(T::Native::usize_as(102), ff.value(0));
-        assert_eq!(T::Native::usize_as(30).neg_wrapping(), ff.value(1));
-        assert_eq!(T::Native::usize_as(137722), ff.value(2));
-
-        assert_eq!(T::Native::usize_as(133700), ff.value(3));
-        assert_eq!(T::Native::usize_as(9999999999), ff.value(7));
-    }
-
-    #[test]
-    fn test_decimal_from_json() {
-        decimal_json_tests::<Decimal128Type>(DataType::Decimal128(10, 2));
-        decimal_json_tests::<Decimal256Type>(DataType::Decimal256(10, 2));
-    }
-}
diff --git a/arrow-json/src/raw/boolean_array.rs b/arrow-json/src/reader/boolean_array.rs
similarity index 94%
rename from arrow-json/src/raw/boolean_array.rs
rename to arrow-json/src/reader/boolean_array.rs
index 12917785e..9a7f22680 100644
--- a/arrow-json/src/raw/boolean_array.rs
+++ b/arrow-json/src/reader/boolean_array.rs
@@ -20,8 +20,8 @@ use arrow_array::Array;
 use arrow_data::ArrayData;
 use arrow_schema::ArrowError;
 
-use crate::raw::tape::{Tape, TapeElement};
-use crate::raw::{tape_error, ArrayDecoder};
+use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{tape_error, ArrayDecoder};
 
 #[derive(Default)]
 pub struct BooleanArrayDecoder {}
diff --git a/arrow-json/src/raw/decimal_array.rs b/arrow-json/src/reader/decimal_array.rs
similarity index 96%
rename from arrow-json/src/raw/decimal_array.rs
rename to arrow-json/src/reader/decimal_array.rs
index 0518b4cef..508409ec7 100644
--- a/arrow-json/src/raw/decimal_array.rs
+++ b/arrow-json/src/reader/decimal_array.rs
@@ -24,8 +24,8 @@ use arrow_cast::parse::parse_decimal;
 use arrow_data::ArrayData;
 use arrow_schema::ArrowError;
 
-use crate::raw::tape::{Tape, TapeElement};
-use crate::raw::{tape_error, ArrayDecoder};
+use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{tape_error, ArrayDecoder};
 
 pub struct DecimalArrayDecoder<D: DecimalType> {
     precision: u8,
diff --git a/arrow-json/src/raw/list_array.rs b/arrow-json/src/reader/list_array.rs
similarity index 97%
rename from arrow-json/src/raw/list_array.rs
rename to arrow-json/src/reader/list_array.rs
index a57f42733..ac35f9988 100644
--- a/arrow-json/src/raw/list_array.rs
+++ b/arrow-json/src/reader/list_array.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::raw::tape::{Tape, TapeElement};
-use crate::raw::{make_decoder, tape_error, ArrayDecoder};
+use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{make_decoder, tape_error, ArrayDecoder};
 use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
 use arrow_array::OffsetSizeTrait;
 use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
diff --git a/arrow-json/src/raw/map_array.rs b/arrow-json/src/reader/map_array.rs
similarity index 97%
rename from arrow-json/src/raw/map_array.rs
rename to arrow-json/src/reader/map_array.rs
index dee142bef..3662e594b 100644
--- a/arrow-json/src/raw/map_array.rs
+++ b/arrow-json/src/reader/map_array.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::raw::tape::{Tape, TapeElement};
-use crate::raw::{make_decoder, tape_error, ArrayDecoder};
+use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{make_decoder, tape_error, ArrayDecoder};
 use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
 use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
 use arrow_buffer::ArrowNativeType;
diff --git a/arrow-json/src/raw/mod.rs b/arrow-json/src/reader/mod.rs
similarity index 68%
rename from arrow-json/src/raw/mod.rs
rename to arrow-json/src/reader/mod.rs
index 38b4cce9b..d36493a47 100644
--- a/arrow-json/src/raw/mod.rs
+++ b/arrow-json/src/reader/mod.rs
@@ -15,13 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! A faster JSON reader that will eventually replace [`Reader`]
+//! JSON reader
 //!
-//! [`Reader`]: crate::reader::Reader
+//! This JSON reader allows JSON line-delimited files to be read into the Arrow memory
+//! model. Records are loaded in batches and are then converted from row-based data to
+//! columnar data.
 //!
 //! # Basic Usage
 //!
-//! [`RawReader`] can be used directly with synchronous data sources, such as [`std::fs::File`]
+//! [`Reader`] can be used directly with synchronous data sources, such as [`std::fs::File`]
 //!
 //! ```
 //! # use arrow_schema::*;
@@ -37,13 +39,13 @@
 //!
 //! let file = File::open("test/data/basic.json").unwrap();
 //!
-//! let mut json = arrow_json::RawReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
+//! let mut json = arrow_json::ReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
 //! let batch = json.next().unwrap().unwrap();
 //! ```
 //!
 //! # Async Usage
 //!
-//! The lower-level [`RawDecoder`] can be integrated with various forms of async data streams,
+//! The lower-level [`Decoder`] can be integrated with various forms of async data streams,
 //! and is designed to be agnostic to the various different kinds of async IO primitives found
 //! within the Rust ecosystem.
 //!
@@ -55,10 +57,10 @@
 //! # use arrow_schema::ArrowError;
 //! # use futures::stream::{Stream, StreamExt};
 //! # use arrow_array::RecordBatch;
-//! # use arrow_json::RawDecoder;
+//! # use arrow_json::reader::Decoder;
 //! #
 //! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
-//!     mut decoder: RawDecoder,
+//!     mut decoder: Decoder,
 //!     mut input: S,
 //! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
 //!     let mut buffered = Bytes::new();
@@ -97,10 +99,10 @@
 //! # use futures::{Stream, TryStreamExt};
 //! # use tokio::io::AsyncBufRead;
 //! # use arrow_array::RecordBatch;
-//! # use arrow_json::RawDecoder;
+//! # use arrow_json::reader::Decoder;
 //! # use arrow_schema::ArrowError;
 //! fn decode_stream<R: AsyncBufRead + Unpin>(
-//!     mut decoder: RawDecoder,
+//!     mut decoder: Decoder,
 //!     mut reader: R,
 //! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
 //!     futures::stream::poll_fn(move |cx| {
@@ -127,46 +129,51 @@
 //! ```
 //!
 
-use crate::raw::boolean_array::BooleanArrayDecoder;
-use crate::raw::decimal_array::DecimalArrayDecoder;
-use crate::raw::list_array::ListArrayDecoder;
-use crate::raw::map_array::MapArrayDecoder;
-use crate::raw::primitive_array::PrimitiveArrayDecoder;
-use crate::raw::string_array::StringArrayDecoder;
-use crate::raw::struct_array::StructArrayDecoder;
-use crate::raw::tape::{Tape, TapeDecoder, TapeElement};
-use crate::raw::timestamp_array::TimestampArrayDecoder;
+use std::io::BufRead;
+
+use chrono::Utc;
+use serde::Serialize;
+
 use arrow_array::timezone::Tz;
 use arrow_array::types::Float32Type;
 use arrow_array::types::*;
-use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader};
+use arrow_array::{downcast_integer, RecordBatch, RecordBatchReader, StructArray};
 use arrow_data::ArrayData;
 use arrow_schema::{ArrowError, DataType, SchemaRef, TimeUnit};
-use chrono::Utc;
-use serde::Serialize;
-use std::io::BufRead;
+pub use schema::*;
+
+use crate::reader::boolean_array::BooleanArrayDecoder;
+use crate::reader::decimal_array::DecimalArrayDecoder;
+use crate::reader::list_array::ListArrayDecoder;
+use crate::reader::map_array::MapArrayDecoder;
+use crate::reader::primitive_array::PrimitiveArrayDecoder;
+use crate::reader::string_array::StringArrayDecoder;
+use crate::reader::struct_array::StructArrayDecoder;
+use crate::reader::tape::{Tape, TapeDecoder, TapeElement};
+use crate::reader::timestamp_array::TimestampArrayDecoder;
 
 mod boolean_array;
 mod decimal_array;
 mod list_array;
 mod map_array;
 mod primitive_array;
+mod schema;
 mod serializer;
 mod string_array;
 mod struct_array;
 mod tape;
 mod timestamp_array;
 
-/// A builder for [`RawReader`] and [`RawDecoder`]
-pub struct RawReaderBuilder {
+/// A builder for [`Reader`] and [`Decoder`]
+pub struct ReaderBuilder {
     batch_size: usize,
     coerce_primitive: bool,
 
     schema: SchemaRef,
 }
 
-impl RawReaderBuilder {
-    /// Create a new [`RawReaderBuilder`] with the provided [`SchemaRef`]
+impl ReaderBuilder {
+    /// Create a new [`ReaderBuilder`] with the provided [`SchemaRef`]
     ///
     /// This could be obtained using [`infer_json_schema`] if not known
     ///
@@ -194,16 +201,16 @@ impl RawReaderBuilder {
         }
     }
 
-    /// Create a [`RawReader`] with the provided [`BufRead`]
-    pub fn build<R: BufRead>(self, reader: R) -> Result<RawReader<R>, ArrowError> {
-        Ok(RawReader {
+    /// Create a [`Reader`] with the provided [`BufRead`]
+    pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
+        Ok(Reader {
             reader,
             decoder: self.build_decoder()?,
         })
     }
 
-    /// Create a [`RawDecoder`]
-    pub fn build_decoder(self) -> Result<RawDecoder, ArrowError> {
+    /// Create a [`Decoder`]
+    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
         let decoder = make_decoder(
             DataType::Struct(self.schema.fields.clone()),
             self.coerce_primitive,
@@ -211,7 +218,7 @@ impl RawReaderBuilder {
         )?;
         let num_fields = self.schema.all_fields().len();
 
-        Ok(RawDecoder {
+        Ok(Decoder {
             decoder,
             tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
             batch_size: self.batch_size,
@@ -222,26 +229,21 @@ impl RawReaderBuilder {
 
 /// Reads JSON data with a known schema directly into arrow [`RecordBatch`]
 ///
-/// This is significantly faster than [`Reader`] and eventually intended
-/// to replace it ([#3610](https://github.com/apache/arrow-rs/issues/3610))
-///
 /// Lines consisting solely of ASCII whitespace are ignored
-///
-/// [`Reader`]: crate::reader::Reader
-pub struct RawReader<R> {
+pub struct Reader<R> {
     reader: R,
-    decoder: RawDecoder,
+    decoder: Decoder,
 }
 
-impl<R> std::fmt::Debug for RawReader<R> {
+impl<R> std::fmt::Debug for Reader<R> {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("RawReader")
+        f.debug_struct("Reader")
             .field("decoder", &self.decoder)
             .finish()
     }
 }
 
-impl<R: BufRead> RawReader<R> {
+impl<R: BufRead> Reader<R> {
     /// Reads the next [`RecordBatch`] returning `Ok(None)` if EOF
     fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
         loop {
@@ -261,7 +263,7 @@ impl<R: BufRead> RawReader<R> {
     }
 }
 
-impl<R: BufRead> Iterator for RawReader<R> {
+impl<R: BufRead> Iterator for Reader<R> {
     type Item = Result<RecordBatch, ArrowError>;
 
     fn next(&mut self) -> Option<Self::Item> {
@@ -269,7 +271,7 @@ impl<R: BufRead> Iterator for RawReader<R> {
     }
 }
 
-impl<R: BufRead> RecordBatchReader for RawReader<R> {
+impl<R: BufRead> RecordBatchReader for Reader<R> {
     fn schema(&self) -> SchemaRef {
         self.decoder.schema.clone()
     }
@@ -277,7 +279,7 @@ impl<R: BufRead> RecordBatchReader for RawReader<R> {
 
 /// A low-level interface for reading JSON data from a byte stream
 ///
-/// See [`RawReader`] for a higher-level interface for interface with [`BufRead`]
+/// See [`Reader`] for a higher-level interface for interface with [`BufRead`]
 ///
 /// The push-based interface facilitates integration with sources that yield arbitrarily
 /// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from
@@ -286,17 +288,17 @@ impl<R: BufRead> RecordBatchReader for RawReader<R> {
 /// ```
 /// # use std::io::BufRead;
 /// # use arrow_array::RecordBatch;
-/// # use arrow_json::{RawDecoder, RawReaderBuilder};
+/// # use arrow_json::reader::{Decoder, ReaderBuilder};
 /// # use arrow_schema::{ArrowError, SchemaRef};
 /// #
 /// fn read_from_json<R: BufRead>(
 ///     mut reader: R,
 ///     schema: SchemaRef,
 /// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
-///     let mut decoder = RawReaderBuilder::new(schema).build_decoder()?;
+///     let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
 ///     let mut next = move || {
 ///         loop {
-///             // RawDecoder is agnostic that buf doesn't contain whole records
+///             // Decoder is agnostic that buf doesn't contain whole records
 ///             let buf = reader.fill_buf()?;
 ///             if buf.is_empty() {
 ///                 break; // Input exhausted
@@ -315,23 +317,23 @@ impl<R: BufRead> RecordBatchReader for RawReader<R> {
 ///     Ok(std::iter::from_fn(move || next().transpose()))
 /// }
 /// ```
-pub struct RawDecoder {
+pub struct Decoder {
     tape_decoder: TapeDecoder,
     decoder: Box<dyn ArrayDecoder>,
     batch_size: usize,
     schema: SchemaRef,
 }
 
-impl std::fmt::Debug for RawDecoder {
+impl std::fmt::Debug for Decoder {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("RawDecoder")
+        f.debug_struct("Decoder")
             .field("schema", &self.schema)
             .field("batch_size", &self.batch_size)
             .finish()
     }
 }
 
-impl RawDecoder {
+impl Decoder {
     /// Read JSON objects from `buf`, returning the number of bytes read
     ///
     /// This method returns once `batch_size` objects have been parsed since the
@@ -344,7 +346,7 @@ impl RawDecoder {
         self.tape_decoder.decode(buf)
     }
 
-    /// Serialize `rows` to this [`RawDecoder`]
+    /// Serialize `rows` to this [`Decoder`]
     ///
     /// This provides a simple way to convert [serde]-compatible datastructures into arrow
     /// [`RecordBatch`].
@@ -360,12 +362,12 @@ impl RawDecoder {
     /// # use serde_json::{Value, json};
     /// # use arrow_array::cast::AsArray;
     /// # use arrow_array::types::Float32Type;
-    /// # use arrow_json::RawReaderBuilder;
+    /// # use arrow_json::ReaderBuilder;
     /// # use arrow_schema::{DataType, Field, Schema};
     /// let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
     ///
     /// let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
-    /// let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
+    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
     ///
     /// decoder.serialize(&json).unwrap();
     /// let batch = decoder.flush().unwrap().unwrap();
@@ -379,7 +381,7 @@ impl RawDecoder {
     ///
     /// ```
     /// # use std::sync::Arc;
-    /// # use arrow_json::RawReaderBuilder;
+    /// # use arrow_json::ReaderBuilder;
     /// # use arrow_schema::{DataType, Field, Schema};
     /// # use serde::Serialize;
     /// # use arrow_array::cast::AsArray;
@@ -401,7 +403,7 @@ impl RawDecoder {
     ///     MyStruct{ int32: 4, float: 67.53 },
     /// ];
     ///
-    /// let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
+    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
     /// decoder.serialize(&rows).unwrap();
     ///
     /// let batch = decoder.flush().unwrap().unwrap();
@@ -421,7 +423,7 @@ impl RawDecoder {
     /// # use std::sync::Arc;
     /// # use arrow_array::StructArray;
     /// # use arrow_cast::display::{ArrayFormatter, FormatOptions};
-    /// # use arrow_json::RawReaderBuilder;
+    /// # use arrow_json::ReaderBuilder;
     /// # use arrow_schema::{DataType, Field, Fields, Schema};
     /// # use serde::Serialize;
     /// #
@@ -501,7 +503,7 @@ impl RawDecoder {
     /// ];
     ///
     /// let schema = Schema::new(MyStruct::fields());
-    /// let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
+    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
     /// decoder.serialize(&data).unwrap();
     /// let batch = decoder.flush().unwrap().unwrap();
     /// assert_eq!(batch.num_rows(), 3);
@@ -554,14 +556,8 @@ impl RawDecoder {
         assert_eq!(decoded.null_count(), 0);
         assert_eq!(decoded.len(), pos.len());
 
-        // Clear out buffer
-        let columns = decoded
-            .child_data()
-            .iter()
-            .map(|x| make_array(x.clone()))
-            .collect();
-
-        let batch = RecordBatch::try_new(self.schema.clone(), columns)?;
+        let batch = RecordBatch::from(StructArray::from(decoded))
+            .with_schema(self.schema.clone())?;
         Ok(Some(batch))
     }
 }
@@ -641,20 +637,25 @@ fn tape_error(d: TapeElement, expected: &str) -> ArrowError {
 }
 
 #[cfg(test)]
-#[allow(deprecated)]
 mod tests {
-    use super::*;
-    use crate::reader::infer_json_schema;
-    use crate::ReaderBuilder;
+    use std::fs::File;
+    use std::io::{BufReader, Cursor, Seek};
+    use std::sync::Arc;
+
     use arrow_array::cast::AsArray;
     use arrow_array::types::Int32Type;
-    use arrow_array::{Array, StructArray};
-    use arrow_buffer::ArrowNativeType;
+    use arrow_array::{
+        make_array, Array, BooleanArray, ListArray, StringArray, StructArray,
+    };
+    use arrow_buffer::{ArrowNativeType, Buffer};
     use arrow_cast::display::{ArrayFormatter, FormatOptions};
+    use arrow_data::ArrayDataBuilder;
     use arrow_schema::{DataType, Field, Schema};
-    use std::fs::File;
-    use std::io::{BufReader, Cursor, Seek};
-    use std::sync::Arc;
+
+    use crate::reader::infer_json_schema;
+    use crate::ReaderBuilder;
+
+    use super::*;
 
     fn do_read(
         buf: &str,
@@ -666,7 +667,7 @@ mod tests {
 
         // Test with different batch sizes to test for boundary conditions
         for batch_size in [1, 3, 100, batch_size] {
-            unbuffered = RawReaderBuilder::new(schema.clone())
+            unbuffered = ReaderBuilder::new(schema.clone())
                 .with_batch_size(batch_size)
                 .coerce_primitive(coerce_primitive)
                 .build(Cursor::new(buf.as_bytes()))
@@ -680,7 +681,7 @@ mod tests {
 
             // Test with different buffer sizes to test for boundary conditions
             for b in [1, 3, 5] {
-                let buffered = RawReaderBuilder::new(schema.clone())
+                let buffered = ReaderBuilder::new(schema.clone())
                     .with_batch_size(batch_size)
                     .coerce_primitive(coerce_primitive)
                     .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
@@ -956,39 +957,12 @@ mod tests {
         assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
     }
 
-    #[test]
-    fn integration_test() {
-        let files = [
-            "test/data/basic.json",
-            "test/data/basic_nulls.json",
-            "test/data/list_string_dict_nested_nulls.json",
-        ];
-
-        for file in files {
-            let mut f = BufReader::new(File::open(file).unwrap());
-            let schema = Arc::new(infer_json_schema(&mut f, None).unwrap());
-
-            f.rewind().unwrap();
-            let a = ReaderBuilder::new()
-                .with_schema(schema.clone())
-                .build(&mut f)
-                .unwrap();
-            let a_result = a.into_iter().collect::<Result<Vec<_>, _>>().unwrap();
-
-            f.rewind().unwrap();
-            let b = RawReaderBuilder::new(schema).build(f).unwrap();
-            let b_result = b.into_iter().collect::<Result<Vec<_>, _>>().unwrap();
-
-            assert_eq!(a_result, b_result);
-        }
-    }
-
     #[test]
     fn test_not_coercing_primitive_into_string_without_flag() {
         let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
 
         let buf = r#"{"a": 1}"#;
-        let result = RawReaderBuilder::new(schema.clone())
+        let result = ReaderBuilder::new(schema.clone())
             .with_batch_size(1024)
             .build(Cursor::new(buf.as_bytes()))
             .unwrap()
@@ -1001,7 +975,7 @@ mod tests {
         );
 
         let buf = r#"{"a": true}"#;
-        let result = RawReaderBuilder::new(schema)
+        let result = ReaderBuilder::new(schema)
             .with_batch_size(1024)
             .build(Cursor::new(buf.as_bytes()))
             .unwrap()
@@ -1337,20 +1311,20 @@ mod tests {
                 vec![Field::new("bar", child, false)],
                 true,
             )]));
-            let mut reader = RawReaderBuilder::new(schema.clone())
+            let mut reader = ReaderBuilder::new(schema.clone())
                 .build(Cursor::new(non_null.as_bytes()))
                 .unwrap();
             assert!(reader.next().unwrap().is_err()); // Should error as not nullable
 
             let null = r#"{"foo": {bar: null}}"#;
-            let mut reader = RawReaderBuilder::new(schema.clone())
+            let mut reader = ReaderBuilder::new(schema.clone())
                 .build(Cursor::new(null.as_bytes()))
                 .unwrap();
             assert!(reader.next().unwrap().is_err()); // Should error as not nullable
 
             // Test nulls in nullable parent can mask nulls in non-nullable child
             let null = r#"{"foo": null}"#;
-            let mut reader = RawReaderBuilder::new(schema)
+            let mut reader = ReaderBuilder::new(schema)
                 .build(Cursor::new(null.as_bytes()))
                 .unwrap();
             let batch = reader.next().unwrap().unwrap();
@@ -1399,4 +1373,497 @@ mod tests {
         let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
         assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
     }
+
+    fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
+        let file = File::open(path).unwrap();
+        let mut reader = BufReader::new(file);
+        let schema = schema.unwrap_or_else(|| {
+            let schema = infer_json_schema(&mut reader, None).unwrap();
+            reader.rewind().unwrap();
+            schema
+        });
+        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
+        builder.build(reader).unwrap()
+    }
+
+    #[test]
+    fn test_json_basic() {
+        let mut reader = read_file("test/data/basic.json", None);
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(7, batch.num_columns());
+        assert_eq!(12, batch.num_rows());
+
+        let schema = reader.schema();
+        let batch_schema = batch.schema();
+        assert_eq!(schema, batch_schema);
+
+        let a = schema.column_with_name("a").unwrap();
+        assert_eq!(0, a.0);
+        assert_eq!(&DataType::Int64, a.1.data_type());
+        let b = schema.column_with_name("b").unwrap();
+        assert_eq!(1, b.0);
+        assert_eq!(&DataType::Float64, b.1.data_type());
+        let c = schema.column_with_name("c").unwrap();
+        assert_eq!(2, c.0);
+        assert_eq!(&DataType::Boolean, c.1.data_type());
+        let d = schema.column_with_name("d").unwrap();
+        assert_eq!(3, d.0);
+        assert_eq!(&DataType::Utf8, d.1.data_type());
+
+        let aa = batch.column(a.0).as_primitive::<Int64Type>();
+        assert_eq!(1, aa.value(0));
+        assert_eq!(-10, aa.value(1));
+        let bb = batch.column(b.0).as_primitive::<Float64Type>();
+        assert_eq!(2.0, bb.value(0));
+        assert_eq!(-3.5, bb.value(1));
+        let cc = batch.column(c.0).as_boolean();
+        assert!(!cc.value(0));
+        assert!(cc.value(10));
+        let dd = batch.column(d.0).as_string::<i32>();
+        assert_eq!("4", dd.value(0));
+        assert_eq!("text", dd.value(8));
+    }
+
+    #[test]
+    fn test_json_empty_projection() {
+        let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(0, batch.num_columns());
+        assert_eq!(12, batch.num_rows());
+    }
+
+    #[test]
+    fn test_json_basic_with_nulls() {
+        let mut reader = read_file("test/data/basic_nulls.json", None);
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(4, batch.num_columns());
+        assert_eq!(12, batch.num_rows());
+
+        let schema = reader.schema();
+        let batch_schema = batch.schema();
+        assert_eq!(schema, batch_schema);
+
+        let a = schema.column_with_name("a").unwrap();
+        assert_eq!(&DataType::Int64, a.1.data_type());
+        let b = schema.column_with_name("b").unwrap();
+        assert_eq!(&DataType::Float64, b.1.data_type());
+        let c = schema.column_with_name("c").unwrap();
+        assert_eq!(&DataType::Boolean, c.1.data_type());
+        let d = schema.column_with_name("d").unwrap();
+        assert_eq!(&DataType::Utf8, d.1.data_type());
+
+        let aa = batch.column(a.0).as_primitive::<Int64Type>();
+        assert!(aa.is_valid(0));
+        assert!(!aa.is_valid(1));
+        assert!(!aa.is_valid(11));
+        let bb = batch.column(b.0).as_primitive::<Float64Type>();
+        assert!(bb.is_valid(0));
+        assert!(!bb.is_valid(2));
+        assert!(!bb.is_valid(11));
+        let cc = batch.column(c.0).as_boolean();
+        assert!(cc.is_valid(0));
+        assert!(!cc.is_valid(4));
+        assert!(!cc.is_valid(11));
+        let dd = batch.column(d.0).as_string::<i32>();
+        assert!(!dd.is_valid(0));
+        assert!(dd.is_valid(1));
+        assert!(!dd.is_valid(4));
+        assert!(!dd.is_valid(11));
+    }
+
+    #[test]
+    fn test_json_basic_schema() {
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int64, true),
+            Field::new("b", DataType::Float32, false),
+            Field::new("c", DataType::Boolean, false),
+            Field::new("d", DataType::Utf8, false),
+        ]);
+
+        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
+        let reader_schema = reader.schema();
+        assert_eq!(reader_schema.as_ref(), &schema);
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(4, batch.num_columns());
+        assert_eq!(12, batch.num_rows());
+
+        let schema = batch.schema();
+
+        let a = schema.column_with_name("a").unwrap();
+        assert_eq!(&DataType::Int64, a.1.data_type());
+        let b = schema.column_with_name("b").unwrap();
+        assert_eq!(&DataType::Float32, b.1.data_type());
+        let c = schema.column_with_name("c").unwrap();
+        assert_eq!(&DataType::Boolean, c.1.data_type());
+        let d = schema.column_with_name("d").unwrap();
+        assert_eq!(&DataType::Utf8, d.1.data_type());
+
+        let aa = batch.column(a.0).as_primitive::<Int64Type>();
+        assert_eq!(1, aa.value(0));
+        assert_eq!(100000000000000, aa.value(11));
+        let bb = batch.column(b.0).as_primitive::<Float32Type>();
+        assert_eq!(2.0, bb.value(0));
+        assert_eq!(-3.5, bb.value(1));
+    }
+
+    #[test]
+    fn test_json_basic_schema_projection() {
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int64, true),
+            Field::new("c", DataType::Boolean, false),
+        ]);
+
+        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(2, batch.num_columns());
+        assert_eq!(2, batch.schema().fields().len());
+        assert_eq!(12, batch.num_rows());
+
+        assert_eq!(batch.schema().as_ref(), &schema);
+
+        let a = schema.column_with_name("a").unwrap();
+        assert_eq!(0, a.0);
+        assert_eq!(&DataType::Int64, a.1.data_type());
+        let c = schema.column_with_name("c").unwrap();
+        assert_eq!(1, c.0);
+        assert_eq!(&DataType::Boolean, c.1.data_type());
+    }
+
+    #[test]
+    fn test_json_arrays() {
+        let mut reader = read_file("test/data/arrays.json", None);
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(4, batch.num_columns());
+        assert_eq!(3, batch.num_rows());
+
+        let schema = batch.schema();
+
+        let a = schema.column_with_name("a").unwrap();
+        assert_eq!(&DataType::Int64, a.1.data_type());
+        let b = schema.column_with_name("b").unwrap();
+        assert_eq!(
+            &DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
+            b.1.data_type()
+        );
+        let c = schema.column_with_name("c").unwrap();
+        assert_eq!(
+            &DataType::List(Arc::new(Field::new("item", DataType::Boolean, true))),
+            c.1.data_type()
+        );
+        let d = schema.column_with_name("d").unwrap();
+        assert_eq!(&DataType::Utf8, d.1.data_type());
+
+        let aa = batch.column(a.0).as_primitive::<Int64Type>();
+        assert_eq!(1, aa.value(0));
+        assert_eq!(-10, aa.value(1));
+        assert_eq!(1627668684594000000, aa.value(2));
+        let bb = batch.column(b.0).as_list::<i32>();
+        let bb = bb.values().as_primitive::<Float64Type>();
+        assert_eq!(9, bb.len());
+        assert_eq!(2.0, bb.value(0));
+        assert_eq!(-6.1, bb.value(5));
+        assert!(!bb.is_valid(7));
+
+        let cc = batch
+            .column(c.0)
+            .as_any()
+            .downcast_ref::<ListArray>()
+            .unwrap();
+        let cc = cc.values().as_boolean();
+        assert_eq!(6, cc.len());
+        assert!(!cc.value(0));
+        assert!(!cc.value(4));
+        assert!(!cc.is_valid(5));
+    }
+
+    #[test]
+    fn test_nested_list_json_arrays() {
+        let c_field =
+            Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
+        let a_struct_field = Field::new_struct(
+            "a",
+            vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
+            true,
+        );
+        let a_field =
+            Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
+        let schema = Arc::new(Schema::new(vec![a_field.clone()]));
+        let builder = ReaderBuilder::new(schema).with_batch_size(64);
+        let json_content = r#"
+        {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
+        {"a": [{"b": false, "c": null}]}
+        {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
+        {"a": null}
+        {"a": []}
+        {"a": [null]}
+        "#;
+        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
+
+        // build expected output
+        let d = StringArray::from(vec![
+            Some("a_text"),
+            Some("b_text"),
+            None,
+            Some("c_text"),
+            Some("d_text"),
+            None,
+            None,
+        ]);
+        let c = ArrayDataBuilder::new(c_field.data_type().clone())
+            .len(7)
+            .add_child_data(d.to_data())
+            .null_bit_buffer(Some(Buffer::from(vec![0b00111011])))
+            .build()
+            .unwrap();
+        let b = BooleanArray::from(vec![
+            Some(true),
+            Some(false),
+            Some(false),
+            Some(true),
+            None,
+            Some(true),
+            None,
+        ]);
+        let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
+            .len(7)
+            .add_child_data(b.to_data())
+            .add_child_data(c.clone())
+            .null_bit_buffer(Some(Buffer::from(vec![0b00111111])))
+            .build()
+            .unwrap();
+        let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
+            .len(6)
+            .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
+            .add_child_data(a)
+            .null_bit_buffer(Some(Buffer::from(vec![0b00110111])))
+            .build()
+            .unwrap();
+        let expected = make_array(a_list);
+
+        // compare `a` with result from json reader
+        let batch = reader.next().unwrap().unwrap();
+        let read = batch.column(0);
+        assert_eq!(read.len(), 6);
+        // compare the arrays the long way around, to better detect differences
+        let read: &ListArray = read.as_list::<i32>();
+        let expected = expected.as_list::<i32>();
+        assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
+        // compare list null buffers
+        assert_eq!(read.nulls(), expected.nulls());
+        // build struct from list
+        let struct_array = read.values().as_struct();
+        let expected_struct_array = expected.values().as_struct();
+
+        assert_eq!(7, struct_array.len());
+        assert_eq!(1, struct_array.null_count());
+        assert_eq!(7, expected_struct_array.len());
+        assert_eq!(1, expected_struct_array.null_count());
+        // test struct's nulls
+        assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
+        // test struct's fields
+        let read_b = struct_array.column(0);
+        assert_eq!(read_b.as_ref(), &b);
+        let read_c = struct_array.column(1);
+        assert_eq!(read_c.to_data(), c);
+        let read_c = read_c.as_struct();
+        let read_d = read_c.column(0);
+        assert_eq!(read_d.as_ref(), &d);
+
+        assert_eq!(read, expected);
+    }
+
+    #[test]
+    fn test_skip_empty_lines() {
+        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
+        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
+        let json_content = "
+        {\"a\": 1}
+        {\"a\": 2}
+        {\"a\": 3}";
+        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(3, batch.num_rows());
+
+        let schema = reader.schema();
+        let c = schema.column_with_name("a").unwrap();
+        assert_eq!(&DataType::Int64, c.1.data_type());
+    }
+
+    #[test]
+    fn test_with_multiple_batches() {
+        let file = File::open("test/data/basic_nulls.json").unwrap();
+        let mut reader = BufReader::new(file);
+        let schema = infer_json_schema(&mut reader, None).unwrap();
+        reader.rewind().unwrap();
+
+        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
+        let mut reader = builder.build(reader).unwrap();
+
+        let mut num_records = Vec::new();
+        while let Some(rb) = reader.next().transpose().unwrap() {
+            num_records.push(rb.num_rows());
+        }
+
+        assert_eq!(vec![5, 5, 2], num_records);
+    }
+
+    #[test]
+    fn test_timestamp_from_json_seconds() {
+        let schema = Schema::new(vec![Field::new(
+            "a",
+            DataType::Timestamp(TimeUnit::Second, None),
+            true,
+        )]);
+
+        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(12, batch.num_rows());
+
+        let schema = reader.schema();
+        let batch_schema = batch.schema();
+        assert_eq!(schema, batch_schema);
+
+        let a = schema.column_with_name("a").unwrap();
+        assert_eq!(
+            &DataType::Timestamp(TimeUnit::Second, None),
+            a.1.data_type()
+        );
+
+        let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
+        assert!(aa.is_valid(0));
+        assert!(!aa.is_valid(1));
+        assert!(!aa.is_valid(2));
+        assert_eq!(1, aa.value(0));
+        assert_eq!(1, aa.value(3));
+        assert_eq!(5, aa.value(7));
+    }
+
+    #[test]
+    fn test_timestamp_from_json_milliseconds() {
+        let schema = Schema::new(vec![Field::new(
+            "a",
+            DataType::Timestamp(TimeUnit::Millisecond, None),
+            true,
+        )]);
+
+        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(12, batch.num_rows());
+
+        let schema = reader.schema();
+        let batch_schema = batch.schema();
+        assert_eq!(schema, batch_schema);
+
+        let a = schema.column_with_name("a").unwrap();
+        assert_eq!(
+            &DataType::Timestamp(TimeUnit::Millisecond, None),
+            a.1.data_type()
+        );
+
+        let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
+        assert!(aa.is_valid(0));
+        assert!(!aa.is_valid(1));
+        assert!(!aa.is_valid(2));
+        assert_eq!(1, aa.value(0));
+        assert_eq!(1, aa.value(3));
+        assert_eq!(5, aa.value(7));
+    }
+
+    #[test]
+    fn test_date_from_json_milliseconds() {
+        let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
+
+        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(12, batch.num_rows());
+
+        let schema = reader.schema();
+        let batch_schema = batch.schema();
+        assert_eq!(schema, batch_schema);
+
+        let a = schema.column_with_name("a").unwrap();
+        assert_eq!(&DataType::Date64, a.1.data_type());
+
+        let aa = batch.column(a.0).as_primitive::<Date64Type>();
+        assert!(aa.is_valid(0));
+        assert!(!aa.is_valid(1));
+        assert!(!aa.is_valid(2));
+        assert_eq!(1, aa.value(0));
+        assert_eq!(1, aa.value(3));
+        assert_eq!(5, aa.value(7));
+    }
+
+    #[test]
+    fn test_time_from_json_nanoseconds() {
+        let schema = Schema::new(vec![Field::new(
+            "a",
+            DataType::Time64(TimeUnit::Nanosecond),
+            true,
+        )]);
+
+        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(12, batch.num_rows());
+
+        let schema = reader.schema();
+        let batch_schema = batch.schema();
+        assert_eq!(schema, batch_schema);
+
+        let a = schema.column_with_name("a").unwrap();
+        assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
+
+        let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
+        assert!(aa.is_valid(0));
+        assert!(!aa.is_valid(1));
+        assert!(!aa.is_valid(2));
+        assert_eq!(1, aa.value(0));
+        assert_eq!(1, aa.value(3));
+        assert_eq!(5, aa.value(7));
+    }
+
+    #[test]
+    fn test_json_iterator() {
+        let file = File::open("test/data/basic.json").unwrap();
+        let mut reader = BufReader::new(file);
+        let schema = infer_json_schema(&mut reader, None).unwrap();
+        reader.rewind().unwrap();
+
+        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
+        let reader = builder.build(reader).unwrap();
+        let schema = reader.schema();
+        let (col_a_index, _) = schema.column_with_name("a").unwrap();
+
+        let mut sum_num_rows = 0;
+        let mut num_batches = 0;
+        let mut sum_a = 0;
+        for batch in reader {
+            let batch = batch.unwrap();
+            assert_eq!(7, batch.num_columns());
+            sum_num_rows += batch.num_rows();
+            num_batches += 1;
+            let batch_schema = batch.schema();
+            assert_eq!(schema, batch_schema);
+            let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
+            sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
+        }
+        assert_eq!(12, sum_num_rows);
+        assert_eq!(3, num_batches);
+        assert_eq!(100000000000011, sum_a);
+    }
 }
diff --git a/arrow-json/src/raw/primitive_array.rs b/arrow-json/src/reader/primitive_array.rs
similarity index 97%
rename from arrow-json/src/raw/primitive_array.rs
rename to arrow-json/src/reader/primitive_array.rs
index 6985821d6..2d45d9c45 100644
--- a/arrow-json/src/raw/primitive_array.rs
+++ b/arrow-json/src/reader/primitive_array.rs
@@ -24,8 +24,8 @@ use arrow_cast::parse::Parser;
 use arrow_data::ArrayData;
 use arrow_schema::{ArrowError, DataType};
 
-use crate::raw::tape::{Tape, TapeElement};
-use crate::raw::{tape_error, ArrayDecoder};
+use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{tape_error, ArrayDecoder};
 
 /// A trait for JSON-specific primitive parsing logic
 ///
diff --git a/arrow-json/src/reader/schema.rs b/arrow-json/src/reader/schema.rs
new file mode 100644
index 000000000..22d25c8be
--- /dev/null
+++ b/arrow-json/src/reader/schema.rs
@@ -0,0 +1,710 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_schema::{ArrowError, DataType, Field, Fields, Schema};
+use indexmap::map::IndexMap as HashMap;
+use indexmap::set::IndexSet as HashSet;
+use serde_json::Value;
+use std::borrow::Borrow;
+use std::io::{BufRead, BufReader, Read, Seek};
+use std::sync::Arc;
+
+#[derive(Debug, Clone)]
+enum InferredType {
+    Scalar(HashSet<DataType>),
+    Array(Box<InferredType>),
+    Object(HashMap<String, InferredType>),
+    Any,
+}
+
+impl InferredType {
+    fn merge(&mut self, other: InferredType) -> Result<(), ArrowError> {
+        match (self, other) {
+            (InferredType::Array(s), InferredType::Array(o)) => {
+                s.merge(*o)?;
+            }
+            (InferredType::Scalar(self_hs), InferredType::Scalar(other_hs)) => {
+                other_hs.into_iter().for_each(|v| {
+                    self_hs.insert(v);
+                });
+            }
+            (InferredType::Object(self_map), InferredType::Object(other_map)) => {
+                for (k, v) in other_map {
+                    self_map.entry(k).or_insert(InferredType::Any).merge(v)?;
+                }
+            }
+            (s @ InferredType::Any, v) => {
+                *s = v;
+            }
+            (_, InferredType::Any) => {}
+            // convert a scalar type to a single-item scalar array type.
+            (
+                InferredType::Array(self_inner_type),
+                other_scalar @ InferredType::Scalar(_),
+            ) => {
+                self_inner_type.merge(other_scalar)?;
+            }
+            (s @ InferredType::Scalar(_), InferredType::Array(mut other_inner_type)) => {
+                other_inner_type.merge(s.clone())?;
+                *s = InferredType::Array(other_inner_type);
+            }
+            // incompatible types
+            (s, o) => {
+                return Err(ArrowError::JsonError(format!(
+                    "Incompatible type found during schema inference: {s:?} v.s. {o:?}",
+                )));
+            }
+        }
+
+        Ok(())
+    }
+}
+
+/// Coerce data type during inference
+///
+/// * `Int64` and `Float64` should be `Float64`
+/// * Lists and scalars are coerced to a list of a compatible scalar
+/// * All other types are coerced to `Utf8`
+fn coerce_data_type(dt: Vec<&DataType>) -> DataType {
+    let mut dt_iter = dt.into_iter().cloned();
+    let dt_init = dt_iter.next().unwrap_or(DataType::Utf8);
+
+    dt_iter.fold(dt_init, |l, r| match (l, r) {
+        (DataType::Boolean, DataType::Boolean) => DataType::Boolean,
+        (DataType::Int64, DataType::Int64) => DataType::Int64,
+        (DataType::Float64, DataType::Float64)
+        | (DataType::Float64, DataType::Int64)
+        | (DataType::Int64, DataType::Float64) => DataType::Float64,
+        (DataType::List(l), DataType::List(r)) => DataType::List(Arc::new(Field::new(
+            "item",
+            coerce_data_type(vec![l.data_type(), r.data_type()]),
+            true,
+        ))),
+        // coerce scalar and scalar array into scalar array
+        (DataType::List(e), not_list) | (not_list, DataType::List(e)) => {
+            DataType::List(Arc::new(Field::new(
+                "item",
+                coerce_data_type(vec![e.data_type(), &not_list]),
+                true,
+            )))
+        }
+        _ => DataType::Utf8,
+    })
+}
+
+fn generate_datatype(t: &InferredType) -> Result<DataType, ArrowError> {
+    Ok(match t {
+        InferredType::Scalar(hs) => coerce_data_type(hs.iter().collect()),
+        InferredType::Object(spec) => DataType::Struct(generate_fields(spec)?),
+        InferredType::Array(ele_type) => DataType::List(Arc::new(Field::new(
+            "item",
+            generate_datatype(ele_type)?,
+            true,
+        ))),
+        InferredType::Any => DataType::Null,
+    })
+}
+
+fn generate_fields(spec: &HashMap<String, InferredType>) -> Result<Fields, ArrowError> {
+    spec.iter()
+        .map(|(k, types)| Ok(Field::new(k, generate_datatype(types)?, true)))
+        .collect()
+}
+
+/// Generate schema from JSON field names and inferred data types
+fn generate_schema(spec: HashMap<String, InferredType>) -> Result<Schema, ArrowError> {
+    Ok(Schema::new(generate_fields(&spec)?))
+}
+
+/// JSON file reader that produces a serde_json::Value iterator from a Read trait
+///
+/// # Example
+///
+/// ```
+/// use std::fs::File;
+/// use std::io::BufReader;
+/// use arrow_json::reader::ValueIter;
+///
+/// let mut reader =
+///     BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
+/// let mut value_reader = ValueIter::new(&mut reader, None);
+/// for value in value_reader {
+///     println!("JSON value: {}", value.unwrap());
+/// }
+/// ```
+#[derive(Debug)]
+pub struct ValueIter<'a, R: Read> {
+    reader: &'a mut BufReader<R>,
+    max_read_records: Option<usize>,
+    record_count: usize,
+    // reuse line buffer to avoid allocation on each record
+    line_buf: String,
+}
+
+impl<'a, R: Read> ValueIter<'a, R> {
+    /// Creates a new `ValueIter`
+    pub fn new(reader: &'a mut BufReader<R>, max_read_records: Option<usize>) -> Self {
+        Self {
+            reader,
+            max_read_records,
+            record_count: 0,
+            line_buf: String::new(),
+        }
+    }
+}
+
+impl<'a, R: Read> Iterator for ValueIter<'a, R> {
+    type Item = Result<Value, ArrowError>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if let Some(max) = self.max_read_records {
+            if self.record_count >= max {
+                return None;
+            }
+        }
+
+        loop {
+            self.line_buf.truncate(0);
+            match self.reader.read_line(&mut self.line_buf) {
+                Ok(0) => {
+                    // read_line returns 0 when stream reached EOF
+                    return None;
+                }
+                Err(e) => {
+                    return Some(Err(ArrowError::JsonError(format!(
+                        "Failed to read JSON record: {e}"
+                    ))));
+                }
+                _ => {
+                    let trimmed_s = self.line_buf.trim();
+                    if trimmed_s.is_empty() {
+                        // ignore empty lines
+                        continue;
+                    }
+
+                    self.record_count += 1;
+                    return Some(serde_json::from_str(trimmed_s).map_err(|e| {
+                        ArrowError::JsonError(format!("Not valid JSON: {e}"))
+                    }));
+                }
+            }
+        }
+    }
+}
+
+/// Infer the fields of a JSON file by reading the first n records of the file, with
+/// `max_read_records` controlling the maximum number of records to read.
+///
+/// If `max_read_records` is not set, the whole file is read to infer its field types.
+///
+/// Contrary to [`infer_json_schema`], this function will seek back to the start of the `reader`.
+/// That way, the `reader` can be used immediately afterwards to create a [`Reader`].
+///
+/// # Examples
+/// ```
+/// use std::fs::File;
+/// use std::io::BufReader;
+/// use arrow_json::reader::infer_json_schema_from_seekable;
+///
+/// let file = File::open("test/data/mixed_arrays.json").unwrap();
+/// // file's cursor's offset at 0
+/// let mut reader = BufReader::new(file);
+/// let inferred_schema = infer_json_schema_from_seekable(&mut reader, None).unwrap();
+/// // file's cursor's offset automatically set at 0
+/// ```
+///
+/// [`Reader`]: super::Reader
+pub fn infer_json_schema_from_seekable<R: Read + Seek>(
+    reader: &mut BufReader<R>,
+    max_read_records: Option<usize>,
+) -> Result<Schema, ArrowError> {
+    let schema = infer_json_schema(reader, max_read_records);
+    // return the reader seek back to the start
+    reader.rewind()?;
+
+    schema
+}
+
+/// Infer the fields of a JSON file by reading the first n records of the buffer, with
+/// `max_read_records` controlling the maximum number of records to read.
+///
+/// If `max_read_records` is not set, the whole file is read to infer its field types.
+///
+/// This function will not seek back to the start of the `reader`. The user has to manage the
+/// original file's cursor. This function is useful when the `reader`'s cursor is not available
+/// (does not implement [`Seek`]), such is the case for compressed streams decoders.
+///
+/// # Examples
+/// ```
+/// use std::fs::File;
+/// use std::io::{BufReader, SeekFrom, Seek};
+/// use flate2::read::GzDecoder;
+/// use arrow_json::reader::infer_json_schema;
+///
+/// let mut file = File::open("test/data/mixed_arrays.json.gz").unwrap();
+///
+/// // file's cursor's offset at 0
+/// let mut reader = BufReader::new(GzDecoder::new(&file));
+/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
+/// // cursor's offset at end of file
+///
+/// // seek back to start so that the original file is usable again
+/// file.seek(SeekFrom::Start(0)).unwrap();
+/// ```
+pub fn infer_json_schema<R: Read>(
+    reader: &mut BufReader<R>,
+    max_read_records: Option<usize>,
+) -> Result<Schema, ArrowError> {
+    infer_json_schema_from_iterator(ValueIter::new(reader, max_read_records))
+}
+
+fn set_object_scalar_field_type(
+    field_types: &mut HashMap<String, InferredType>,
+    key: &str,
+    ftype: DataType,
+) -> Result<(), ArrowError> {
+    if !field_types.contains_key(key) {
+        field_types.insert(key.to_string(), InferredType::Scalar(HashSet::new()));
+    }
+
+    match field_types.get_mut(key).unwrap() {
+        InferredType::Scalar(hs) => {
+            hs.insert(ftype);
+            Ok(())
+        }
+        // in case of column contains both scalar type and scalar array type, we convert type of
+        // this column to scalar array.
+        scalar_array @ InferredType::Array(_) => {
+            let mut hs = HashSet::new();
+            hs.insert(ftype);
+            scalar_array.merge(InferredType::Scalar(hs))?;
+            Ok(())
+        }
+        t => Err(ArrowError::JsonError(format!(
+            "Expected scalar or scalar array JSON type, found: {t:?}",
+        ))),
+    }
+}
+
+fn infer_scalar_array_type(array: &[Value]) -> Result<InferredType, ArrowError> {
+    let mut hs = HashSet::new();
+
+    for v in array {
+        match v {
+            Value::Null => {}
+            Value::Number(n) => {
+                if n.is_i64() {
+                    hs.insert(DataType::Int64);
+                } else {
+                    hs.insert(DataType::Float64);
+                }
+            }
+            Value::Bool(_) => {
+                hs.insert(DataType::Boolean);
+            }
+            Value::String(_) => {
+                hs.insert(DataType::Utf8);
+            }
+            Value::Array(_) | Value::Object(_) => {
+                return Err(ArrowError::JsonError(format!(
+                    "Expected scalar value for scalar array, got: {v:?}"
+                )));
+            }
+        }
+    }
+
+    Ok(InferredType::Scalar(hs))
+}
+
+fn infer_nested_array_type(array: &[Value]) -> Result<InferredType, ArrowError> {
+    let mut inner_ele_type = InferredType::Any;
+
+    for v in array {
+        match v {
+            Value::Array(inner_array) => {
+                inner_ele_type.merge(infer_array_element_type(inner_array)?)?;
+            }
+            x => {
+                return Err(ArrowError::JsonError(format!(
+                    "Got non array element in nested array: {x:?}"
+                )));
+            }
+        }
+    }
+
+    Ok(InferredType::Array(Box::new(inner_ele_type)))
+}
+
+fn infer_struct_array_type(array: &[Value]) -> Result<InferredType, ArrowError> {
+    let mut field_types = HashMap::new();
+
+    for v in array {
+        match v {
+            Value::Object(map) => {
+                collect_field_types_from_object(&mut field_types, map)?;
+            }
+            _ => {
+                return Err(ArrowError::JsonError(format!(
+                    "Expected struct value for struct array, got: {v:?}"
+                )));
+            }
+        }
+    }
+
+    Ok(InferredType::Object(field_types))
+}
+
+fn infer_array_element_type(array: &[Value]) -> Result<InferredType, ArrowError> {
+    match array.iter().take(1).next() {
+        None => Ok(InferredType::Any), // empty array, return any type that can be updated later
+        Some(a) => match a {
+            Value::Array(_) => infer_nested_array_type(array),
+            Value::Object(_) => infer_struct_array_type(array),
+            _ => infer_scalar_array_type(array),
+        },
+    }
+}
+
+fn collect_field_types_from_object(
+    field_types: &mut HashMap<String, InferredType>,
+    map: &serde_json::map::Map<String, Value>,
+) -> Result<(), ArrowError> {
+    for (k, v) in map {
+        match v {
+            Value::Array(array) => {
+                let ele_type = infer_array_element_type(array)?;
+
+                if !field_types.contains_key(k) {
+                    match ele_type {
+                        InferredType::Scalar(_) => {
+                            field_types.insert(
+                                k.to_string(),
+                                InferredType::Array(Box::new(InferredType::Scalar(
+                                    HashSet::new(),
+                                ))),
+                            );
+                        }
+                        InferredType::Object(_) => {
+                            field_types.insert(
+                                k.to_string(),
+                                InferredType::Array(Box::new(InferredType::Object(
+                                    HashMap::new(),
+                                ))),
+                            );
+                        }
+                        InferredType::Any | InferredType::Array(_) => {
+                            // set inner type to any for nested array as well
+                            // so it can be updated properly from subsequent type merges
+                            field_types.insert(
+                                k.to_string(),
+                                InferredType::Array(Box::new(InferredType::Any)),
+                            );
+                        }
+                    }
+                }
+
+                match field_types.get_mut(k).unwrap() {
+                    InferredType::Array(inner_type) => {
+                        inner_type.merge(ele_type)?;
+                    }
+                    // in case of column contains both scalar type and scalar array type, we
+                    // convert type of this column to scalar array.
+                    field_type @ InferredType::Scalar(_) => {
+                        field_type.merge(ele_type)?;
+                        *field_type = InferredType::Array(Box::new(field_type.clone()));
+                    }
+                    t => {
+                        return Err(ArrowError::JsonError(format!(
+                            "Expected array json type, found: {t:?}",
+                        )));
+                    }
+                }
+            }
+            Value::Bool(_) => {
+                set_object_scalar_field_type(field_types, k, DataType::Boolean)?;
+            }
+            Value::Null => {
+                // do nothing, we treat json as nullable by default when
+                // inferring
+            }
+            Value::Number(n) => {
+                if n.is_f64() {
+                    set_object_scalar_field_type(field_types, k, DataType::Float64)?;
+                } else {
+                    // default to i64
+                    set_object_scalar_field_type(field_types, k, DataType::Int64)?;
+                }
+            }
+            Value::String(_) => {
+                set_object_scalar_field_type(field_types, k, DataType::Utf8)?;
+            }
+            Value::Object(inner_map) => {
+                if !field_types.contains_key(k) {
+                    field_types
+                        .insert(k.to_string(), InferredType::Object(HashMap::new()));
+                }
+                match field_types.get_mut(k).unwrap() {
+                    InferredType::Object(inner_field_types) => {
+                        collect_field_types_from_object(inner_field_types, inner_map)?;
+                    }
+                    t => {
+                        return Err(ArrowError::JsonError(format!(
+                            "Expected object json type, found: {t:?}",
+                        )));
+                    }
+                }
+            }
+        }
+    }
+
+    Ok(())
+}
+
+/// Infer the fields of a JSON file by reading all items from the JSON Value Iterator.
+///
+/// The following type coercion logic is implemented:
+/// * `Int64` and `Float64` are converted to `Float64`
+/// * Lists and scalars are coerced to a list of a compatible scalar
+/// * All other cases are coerced to `Utf8` (String)
+///
+/// Note that the above coercion logic is different from what Spark has, where it would default to
+/// String type in case of List and Scalar values appeared in the same field.
+///
+/// The reason we diverge here is because we don't have utilities to deal with JSON data once it's
+/// interpreted as Strings. We should match Spark's behavior once we added more JSON parsing
+/// kernels in the future.
+pub fn infer_json_schema_from_iterator<I, V>(value_iter: I) -> Result<Schema, ArrowError>
+where
+    I: Iterator<Item = Result<V, ArrowError>>,
+    V: Borrow<Value>,
+{
+    let mut field_types: HashMap<String, InferredType> = HashMap::new();
+
+    for record in value_iter {
+        match record?.borrow() {
+            Value::Object(map) => {
+                collect_field_types_from_object(&mut field_types, map)?;
+            }
+            value => {
+                return Err(ArrowError::JsonError(format!(
+                    "Expected JSON record to be an object, found {value:?}"
+                )));
+            }
+        };
+    }
+
+    generate_schema(field_types)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use flate2::read::GzDecoder;
+    use std::fs::File;
+    use std::io::Cursor;
+
+    #[test]
+    fn test_json_infer_schema() {
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int64, true),
+            Field::new(
+                "b",
+                DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
+                true,
+            ),
+            Field::new(
+                "c",
+                DataType::List(Arc::new(Field::new("item", DataType::Boolean, true))),
+                true,
+            ),
+            Field::new(
+                "d",
+                DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
+                true,
+            ),
+        ]);
+
+        let mut reader =
+            BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
+        let inferred_schema = infer_json_schema_from_seekable(&mut reader, None).unwrap();
+
+        assert_eq!(inferred_schema, schema);
+
+        let file = File::open("test/data/mixed_arrays.json.gz").unwrap();
+        let mut reader = BufReader::new(GzDecoder::new(&file));
+        let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
+
+        assert_eq!(inferred_schema, schema);
+    }
+
+    #[test]
+    fn test_json_infer_schema_nested_structs() {
+        let schema = Schema::new(vec![
+            Field::new(
+                "c1",
+                DataType::Struct(Fields::from(vec![
+                    Field::new("a", DataType::Boolean, true),
+                    Field::new(
+                        "b",
+                        DataType::Struct(
+                            vec![Field::new("c", DataType::Utf8, true)].into(),
+                        ),
+                        true,
+                    ),
+                ])),
+                true,
+            ),
+            Field::new("c2", DataType::Int64, true),
+            Field::new("c3", DataType::Utf8, true),
+        ]);
+
+        let inferred_schema = infer_json_schema_from_iterator(
+            vec![
+                Ok(serde_json::json!({"c1": {"a": true, "b": {"c": "text"}}, "c2": 1})),
+                Ok(serde_json::json!({"c1": {"a": false, "b": null}, "c2": 0})),
+                Ok(serde_json::json!({"c1": {"a": true, "b": {"c": "text"}}, "c3": "ok"})),
+            ]
+                .into_iter(),
+        )
+            .unwrap();
+
+        assert_eq!(inferred_schema, schema);
+    }
+
+    #[test]
+    fn test_json_infer_schema_struct_in_list() {
+        let schema = Schema::new(vec![
+            Field::new(
+                "c1",
+                DataType::List(Arc::new(Field::new(
+                    "item",
+                    DataType::Struct(Fields::from(vec![
+                        Field::new("a", DataType::Utf8, true),
+                        Field::new("b", DataType::Int64, true),
+                        Field::new("c", DataType::Boolean, true),
+                    ])),
+                    true,
+                ))),
+                true,
+            ),
+            Field::new("c2", DataType::Float64, true),
+            Field::new(
+                "c3",
+                // empty json array's inner types are inferred as null
+                DataType::List(Arc::new(Field::new("item", DataType::Null, true))),
+                true,
+            ),
+        ]);
+
+        let inferred_schema = infer_json_schema_from_iterator(
+            vec![
+                Ok(serde_json::json!({
+                    "c1": [{"a": "foo", "b": 100}], "c2": 1, "c3": [],
+                })),
+                Ok(serde_json::json!({
+                    "c1": [{"a": "bar", "b": 2}, {"a": "foo", "c": true}], "c2": 0, "c3": [],
+                })),
+                Ok(serde_json::json!({"c1": [], "c2": 0.5, "c3": []})),
+            ]
+                .into_iter(),
+        )
+            .unwrap();
+
+        assert_eq!(inferred_schema, schema);
+    }
+
+    #[test]
+    fn test_json_infer_schema_nested_list() {
+        let schema = Schema::new(vec![
+            Field::new(
+                "c1",
+                DataType::List(Arc::new(Field::new(
+                    "item",
+                    DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
+                    true,
+                ))),
+                true,
+            ),
+            Field::new("c2", DataType::Float64, true),
+        ]);
+
+        let inferred_schema = infer_json_schema_from_iterator(
+            vec![
+                Ok(serde_json::json!({
+                    "c1": [],
+                    "c2": 12,
+                })),
+                Ok(serde_json::json!({
+                    "c1": [["a", "b"], ["c"]],
+                })),
+                Ok(serde_json::json!({
+                    "c1": [["foo"]],
+                    "c2": 0.11,
+                })),
+            ]
+            .into_iter(),
+        )
+        .unwrap();
+
+        assert_eq!(inferred_schema, schema);
+    }
+
+    #[test]
+    fn test_coercion_scalar_and_list() {
+        use arrow_schema::DataType::*;
+
+        assert_eq!(
+            List(Arc::new(Field::new("item", Float64, true))),
+            coerce_data_type(vec![
+                &Float64,
+                &List(Arc::new(Field::new("item", Float64, true)))
+            ])
+        );
+        assert_eq!(
+            List(Arc::new(Field::new("item", Float64, true))),
+            coerce_data_type(vec![
+                &Float64,
+                &List(Arc::new(Field::new("item", Int64, true)))
+            ])
+        );
+        assert_eq!(
+            List(Arc::new(Field::new("item", Int64, true))),
+            coerce_data_type(vec![
+                &Int64,
+                &List(Arc::new(Field::new("item", Int64, true)))
+            ])
+        );
+        // boolean and number are incompatible, return utf8
+        assert_eq!(
+            List(Arc::new(Field::new("item", Utf8, true))),
+            coerce_data_type(vec![
+                &Boolean,
+                &List(Arc::new(Field::new("item", Float64, true)))
+            ])
+        );
+    }
+
+    #[test]
+    fn test_invalid_json_infer_schema() {
+        let re =
+            infer_json_schema_from_seekable(&mut BufReader::new(Cursor::new(b"}")), None);
+        assert_eq!(
+            re.err().unwrap().to_string(),
+            "Json error: Not valid JSON: expected value at line 1 column 1",
+        );
+    }
+}
diff --git a/arrow-json/src/raw/serializer.rs b/arrow-json/src/reader/serializer.rs
similarity index 99%
rename from arrow-json/src/raw/serializer.rs
rename to arrow-json/src/reader/serializer.rs
index d743b6dba..a68d1d547 100644
--- a/arrow-json/src/raw/serializer.rs
+++ b/arrow-json/src/reader/serializer.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::raw::tape::TapeElement;
+use crate::reader::tape::TapeElement;
 use lexical_core::FormattedSize;
 use serde::ser::{
     Impossible, SerializeMap, SerializeSeq, SerializeStruct, SerializeTuple,
diff --git a/arrow-json/src/raw/string_array.rs b/arrow-json/src/reader/string_array.rs
similarity index 97%
rename from arrow-json/src/raw/string_array.rs
rename to arrow-json/src/reader/string_array.rs
index 104e4e83f..8060804c9 100644
--- a/arrow-json/src/raw/string_array.rs
+++ b/arrow-json/src/reader/string_array.rs
@@ -21,8 +21,8 @@ use arrow_data::ArrayData;
 use arrow_schema::ArrowError;
 use std::marker::PhantomData;
 
-use crate::raw::tape::{Tape, TapeElement};
-use crate::raw::{tape_error, ArrayDecoder};
+use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{tape_error, ArrayDecoder};
 
 const TRUE: &str = "true";
 const FALSE: &str = "false";
diff --git a/arrow-json/src/raw/struct_array.rs b/arrow-json/src/reader/struct_array.rs
similarity index 98%
rename from arrow-json/src/raw/struct_array.rs
rename to arrow-json/src/reader/struct_array.rs
index a73bb1486..013f862c5 100644
--- a/arrow-json/src/raw/struct_array.rs
+++ b/arrow-json/src/reader/struct_array.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::raw::tape::{Tape, TapeElement};
-use crate::raw::{make_decoder, tape_error, ArrayDecoder};
+use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{make_decoder, tape_error, ArrayDecoder};
 use arrow_array::builder::BooleanBufferBuilder;
 use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
 use arrow_data::{ArrayData, ArrayDataBuilder};
diff --git a/arrow-json/src/raw/tape.rs b/arrow-json/src/reader/tape.rs
similarity index 99%
rename from arrow-json/src/raw/tape.rs
rename to arrow-json/src/reader/tape.rs
index 2720c2502..885257ed1 100644
--- a/arrow-json/src/raw/tape.rs
+++ b/arrow-json/src/reader/tape.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::raw::serializer::TapeSerializer;
+use crate::reader::serializer::TapeSerializer;
 use arrow_schema::ArrowError;
 use serde::Serialize;
 use std::fmt::{Display, Formatter};
diff --git a/arrow-json/src/raw/timestamp_array.rs b/arrow-json/src/reader/timestamp_array.rs
similarity index 97%
rename from arrow-json/src/raw/timestamp_array.rs
rename to arrow-json/src/reader/timestamp_array.rs
index 07feaa974..73d1cda91 100644
--- a/arrow-json/src/raw/timestamp_array.rs
+++ b/arrow-json/src/reader/timestamp_array.rs
@@ -26,8 +26,8 @@ use arrow_cast::parse::string_to_datetime;
 use arrow_data::ArrayData;
 use arrow_schema::{ArrowError, DataType, TimeUnit};
 
-use crate::raw::tape::{Tape, TapeElement};
-use crate::raw::{tape_error, ArrayDecoder};
+use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{tape_error, ArrayDecoder};
 
 /// A specialized [`ArrayDecoder`] for timestamps
 pub struct TimestampArrayDecoder<P: ArrowTimestampType, Tz: TimeZone> {
diff --git a/arrow-json/src/writer.rs b/arrow-json/src/writer.rs
index cf65e8a93..60b212101 100644
--- a/arrow-json/src/writer.rs
+++ b/arrow-json/src/writer.rs
@@ -589,11 +589,10 @@ where
 #[cfg(test)]
 mod tests {
     use std::fs::{read_to_string, File};
-    use std::io::BufReader;
+    use std::io::{BufReader, Seek};
     use std::sync::Arc;
 
     use crate::reader::*;
-    use crate::RawReaderBuilder;
     use arrow_buffer::{Buffer, ToByteSlice};
     use arrow_data::ArrayData;
     use serde_json::json;
@@ -1205,14 +1204,14 @@ mod tests {
         );
     }
 
-    #[allow(deprecated)]
     fn test_write_for_file(test_file: &str) {
-        let builder = ReaderBuilder::new()
-            .infer_schema(None)
-            .with_batch_size(1024);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open(test_file).unwrap())
-            .unwrap();
+        let file = File::open(test_file).unwrap();
+        let mut reader = BufReader::new(file);
+        let schema = infer_json_schema(&mut reader, None).unwrap();
+        reader.rewind().unwrap();
+
+        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
+        let mut reader = builder.build(reader).unwrap();
         let batch = reader.next().unwrap().unwrap();
 
         let mut buf = Vec::new();
@@ -1298,7 +1297,7 @@ mod tests {
         let list_type = DataType::List(Arc::new(Field::new("item", ints_struct, true)));
         let list_field = Field::new("list", list_type, true);
         let schema = Arc::new(Schema::new(vec![list_field]));
-        let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
+        let builder = ReaderBuilder::new(schema).with_batch_size(64);
         let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
 
         let batch = reader.next().unwrap().unwrap();
@@ -1395,15 +1394,15 @@ mod tests {
     }
 
     #[test]
-    #[allow(deprecated)]
     fn test_write_single_batch() {
         let test_file = "test/data/basic.json";
-        let builder = ReaderBuilder::new()
-            .infer_schema(None)
-            .with_batch_size(1024);
-        let mut reader: Reader<File> = builder
-            .build::<File>(File::open(test_file).unwrap())
-            .unwrap();
+        let file = File::open(test_file).unwrap();
+        let mut reader = BufReader::new(file);
+        let schema = infer_json_schema(&mut reader, None).unwrap();
+        reader.rewind().unwrap();
+
+        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
+        let mut reader = builder.build(reader).unwrap();
         let batch = reader.next().unwrap().unwrap();
 
         let mut buf = Vec::new();
@@ -1440,7 +1439,7 @@ mod tests {
             Field::new("g", DataType::Timestamp(TimeUnit::Millisecond, None), true),
         ]));
 
-        let mut reader = RawReaderBuilder::new(schema.clone())
+        let mut reader = ReaderBuilder::new(schema.clone())
             .build(BufReader::new(File::open(test_file).unwrap()))
             .unwrap();
         let batch = reader.next().unwrap().unwrap();
diff --git a/arrow/benches/json_reader.rs b/arrow/benches/json_reader.rs
index 8ad6cfd3a..8cebc42e4 100644
--- a/arrow/benches/json_reader.rs
+++ b/arrow/benches/json_reader.rs
@@ -22,31 +22,16 @@ use arrow::util::bench_util::{
     create_primitive_array, create_string_array, create_string_array_with_len,
 };
 use arrow_array::RecordBatch;
-use arrow_json::LineDelimitedWriter;
-use arrow_json::RawReaderBuilder;
+use arrow_json::{LineDelimitedWriter, ReaderBuilder};
 use std::io::Cursor;
 use std::sync::Arc;
 
 #[allow(deprecated)]
 fn do_bench(c: &mut Criterion, name: &str, json: &str, schema: SchemaRef) {
-    c.bench_function(&format!("{name} (basic)"), |b| {
+    c.bench_function(name, |b| {
         b.iter(|| {
             let cursor = Cursor::new(black_box(json));
-            let builder = arrow_json::ReaderBuilder::new()
-                .with_schema(schema.clone())
-                .with_batch_size(64);
-
-            let mut reader = builder.build(cursor).unwrap();
-            while let Some(next) = reader.next().transpose() {
-                next.unwrap();
-            }
-        })
-    });
-
-    c.bench_function(&format!("{name} (raw)"), |b| {
-        b.iter(|| {
-            let cursor = Cursor::new(black_box(json));
-            let builder = RawReaderBuilder::new(schema.clone()).with_batch_size(64);
+            let builder = ReaderBuilder::new(schema.clone()).with_batch_size(64);
             let reader = builder.build(cursor).unwrap();
             for next in reader {
                 next.unwrap();
diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs
index 41b846b04..8bad29bf7 100644
--- a/arrow/src/lib.rs
+++ b/arrow/src/lib.rs
@@ -273,7 +273,7 @@
 //!
 //! # Serde Compatibility
 //!
-//! [`arrow_json::RawDecoder`] provides a mechanism to convert arbitrary, serde-compatible
+//! [`arrow_json::reader::Decoder`] provides a mechanism to convert arbitrary, serde-compatible
 //! structures into [`RecordBatch`].
 //!
 //! Whilst likely less performant than implementing a custom builder, as described in
@@ -281,7 +281,7 @@
 //!
 //! ```
 //! # use std::sync::Arc;
-//! # use arrow_json::RawReaderBuilder;
+//! # use arrow_json::ReaderBuilder;
 //! # use arrow_schema::{DataType, Field, Schema};
 //! # use serde::Serialize;
 //! # use arrow_array::cast::AsArray;
@@ -303,7 +303,7 @@
 //!     MyStruct{ int32: 8, string: "foo".to_string() },
 //! ];
 //!
-//! let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
+//! let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
 //! decoder.serialize(&rows).unwrap();
 //!
 //! let batch = decoder.flush().unwrap().unwrap();
diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs
index 680d31480..d662a16ea 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -1145,7 +1145,7 @@ mod tests {
             false,
         );
         let schema = Arc::new(Schema::new(vec![stocks_field]));
-        let builder = arrow::json::RawReaderBuilder::new(schema).with_batch_size(64);
+        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
         let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
 
         let batch = reader.next().unwrap().unwrap();
diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs
index d026f971e..3987cccf6 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -1016,7 +1016,7 @@ mod tests {
             true,
         );
         let schema = Arc::new(Schema::new(vec![stocks_field]));
-        let builder = arrow::json::RawReaderBuilder::new(schema).with_batch_size(64);
+        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
         let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
 
         let batch = reader.next().unwrap().unwrap();