You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/25 07:10:03 UTC

[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9256: WIP: ARROW-11310: [Rust] implement JSON writer

jorgecarleitao commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r563499210



##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)
+                .into_json_value()
+                .unwrap_or(Value::Null),
+        );
+    }
+}
+
+fn set_column_for_json_rows(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    match array.data_type() {
+        DataType::Null => {
+            for row in rows.iter_mut().take(row_count) {
+                row.insert(col_name.to_string(), Value::Null);
+            }
+        }
+        DataType::Boolean => {
+            let arr = as_boolean_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), arr.value(i).into());

Review comment:
       Same here. Try using `arr.iter()`

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)

Review comment:
       This won't take nulls into account.
   
   One way to go here is to use 
   
   ```rust
   rows.iter_mut().zip(primitive_arr.iter()).take(row_count).for_each(|(row, maybe_value)| {
   ...
   })
   ```
   
   `maybe_value` will be of type `Option<T::Native>`, where `None` represents a null value.
   
   

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)
+                .into_json_value()
+                .unwrap_or(Value::Null),
+        );
+    }
+}
+
+fn set_column_for_json_rows(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    match array.data_type() {
+        DataType::Null => {
+            for row in rows.iter_mut().take(row_count) {
+                row.insert(col_name.to_string(), Value::Null);
+            }
+        }
+        DataType::Boolean => {
+            let arr = as_boolean_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), arr.value(i).into());
+            }
+        }
+        DataType::Int8 => {
+            set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int16 => {
+            set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int32 => {
+            set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int64 => {
+            set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt8 => {
+            set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt16 => {
+            set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt32 => {
+            set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt64 => {
+            set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float32 => {
+            set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float64 => {
+            set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Utf8 => {
+            let strarr = as_string_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), strarr.value(i).into());
+            }
+        }
+        DataType::Struct(_) => {
+            let arr = as_struct_array(array);
+            let inner_col_names = arr.column_names();
+
+            let mut inner_objs = iter::repeat(JsonMap::new())
+                .take(row_count)
+                .collect::<Vec<JsonMap<String, Value>>>();
+
+            arr.columns()
+                .iter()
+                .enumerate()
+                .for_each(|(j, struct_col)| {
+                    set_column_for_json_rows(
+                        &mut inner_objs,
+                        row_count,
+                        struct_col,
+                        inner_col_names[j],
+                    );
+                });
+
+            rows.iter_mut()
+                .take(row_count)
+                .zip(inner_objs.into_iter())
+                .for_each(|(row, obj)| {
+                    row.insert(col_name.to_string(), Value::Object(obj));
+                });
+        }
+        _ => {
+            panic!(format!("Unsupported datatype: {:#?}", array.data_type()));
+        }
+    }
+}
+
+pub fn record_batches_to_json_rows(
+    batches: &[RecordBatch],
+) -> Vec<JsonMap<String, Value>> {
+    let mut rows: Vec<JsonMap<String, Value>> = iter::repeat(JsonMap::new())
+        .take(batches.iter().map(|b| b.num_rows()).sum())
+        .collect();
+
+    if !rows.is_empty() {
+        let schema = batches[0].schema();
+        let mut base = 0;
+        batches.iter().for_each(|batch| {
+            let row_count = batch.num_rows();
+            batch.columns().iter().enumerate().for_each(|(j, col)| {
+                let col_name = schema.field(j).name();
+                set_column_for_json_rows(&mut rows[base..], row_count, col, col_name);
+            });
+            base += row_count;
+        });
+    }
+
+    rows
+}
+
+/// A JSON writer
+#[derive(Debug)]
+pub struct Writer<W: Write> {
+    writer: BufWriter<W>,
+}
+
+impl<W: Write> Writer<W> {
+    pub fn new(writer: W) -> Self {
+        Self::from_buf_writer(BufWriter::new(writer))
+    }
+
+    pub fn from_buf_writer(writer: BufWriter<W>) -> Self {
+        Self { writer }
+    }
+
+    pub fn write_row(&mut self, row: &Value) -> Result<()> {
+        self.writer.write_all(&serde_json::to_vec(row)?)?;
+        self.writer.write_all(b"\n")?;
+        Ok(())
+    }
+
+    pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> {
+        for row in record_batches_to_json_rows(batches) {
+            self.write_row(&Value::Object(row))?;
+        }
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::fs::{read_to_string, File};
+    use std::sync::Arc;
+
+    use crate::json::reader::*;
+
+    use super::*;
+
+    #[test]
+    fn write_simple_rows() {
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Utf8, false),
+        ]);
+
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);

Review comment:
       could have use `vec![Some(1), None, Some(3), Some(4,) Some(5)]`, so that we also test null values?

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)
+                .into_json_value()
+                .unwrap_or(Value::Null),
+        );
+    }
+}
+
+fn set_column_for_json_rows(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    match array.data_type() {
+        DataType::Null => {
+            for row in rows.iter_mut().take(row_count) {
+                row.insert(col_name.to_string(), Value::Null);
+            }
+        }
+        DataType::Boolean => {
+            let arr = as_boolean_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), arr.value(i).into());
+            }
+        }
+        DataType::Int8 => {
+            set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int16 => {
+            set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int32 => {
+            set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int64 => {
+            set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt8 => {
+            set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt16 => {
+            set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt32 => {
+            set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt64 => {
+            set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float32 => {
+            set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float64 => {
+            set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Utf8 => {
+            let strarr = as_string_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), strarr.value(i).into());

Review comment:
       same here.

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)
+                .into_json_value()
+                .unwrap_or(Value::Null),
+        );
+    }
+}
+
+fn set_column_for_json_rows(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    match array.data_type() {
+        DataType::Null => {
+            for row in rows.iter_mut().take(row_count) {
+                row.insert(col_name.to_string(), Value::Null);
+            }
+        }
+        DataType::Boolean => {
+            let arr = as_boolean_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), arr.value(i).into());
+            }
+        }
+        DataType::Int8 => {
+            set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int16 => {
+            set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int32 => {
+            set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int64 => {
+            set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt8 => {
+            set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt16 => {
+            set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt32 => {
+            set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt64 => {
+            set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float32 => {
+            set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float64 => {
+            set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Utf8 => {
+            let strarr = as_string_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), strarr.value(i).into());
+            }
+        }
+        DataType::Struct(_) => {
+            let arr = as_struct_array(array);
+            let inner_col_names = arr.column_names();
+
+            let mut inner_objs = iter::repeat(JsonMap::new())
+                .take(row_count)
+                .collect::<Vec<JsonMap<String, Value>>>();
+
+            arr.columns()
+                .iter()
+                .enumerate()
+                .for_each(|(j, struct_col)| {
+                    set_column_for_json_rows(
+                        &mut inner_objs,
+                        row_count,
+                        struct_col,
+                        inner_col_names[j],
+                    );
+                });
+
+            rows.iter_mut()
+                .take(row_count)
+                .zip(inner_objs.into_iter())
+                .for_each(|(row, obj)| {
+                    row.insert(col_name.to_string(), Value::Object(obj));
+                });
+        }
+        _ => {
+            panic!(format!("Unsupported datatype: {:#?}", array.data_type()));
+        }
+    }
+}
+
+pub fn record_batches_to_json_rows(
+    batches: &[RecordBatch],
+) -> Vec<JsonMap<String, Value>> {
+    let mut rows: Vec<JsonMap<String, Value>> = iter::repeat(JsonMap::new())
+        .take(batches.iter().map(|b| b.num_rows()).sum())
+        .collect();
+
+    if !rows.is_empty() {
+        let schema = batches[0].schema();
+        let mut base = 0;
+        batches.iter().for_each(|batch| {
+            let row_count = batch.num_rows();
+            batch.columns().iter().enumerate().for_each(|(j, col)| {
+                let col_name = schema.field(j).name();
+                set_column_for_json_rows(&mut rows[base..], row_count, col, col_name);
+            });
+            base += row_count;
+        });
+    }
+
+    rows
+}
+
+/// A JSON writer
+#[derive(Debug)]
+pub struct Writer<W: Write> {
+    writer: BufWriter<W>,
+}
+
+impl<W: Write> Writer<W> {
+    pub fn new(writer: W) -> Self {
+        Self::from_buf_writer(BufWriter::new(writer))
+    }
+
+    pub fn from_buf_writer(writer: BufWriter<W>) -> Self {
+        Self { writer }
+    }
+
+    pub fn write_row(&mut self, row: &Value) -> Result<()> {
+        self.writer.write_all(&serde_json::to_vec(row)?)?;
+        self.writer.write_all(b"\n")?;
+        Ok(())
+    }
+
+    pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> {
+        for row in record_batches_to_json_rows(batches) {
+            self.write_row(&Value::Object(row))?;
+        }
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::fs::{read_to_string, File};
+    use std::sync::Arc;
+
+    use crate::json::reader::*;
+
+    use super::*;
+
+    #[test]
+    fn write_simple_rows() {
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Utf8, false),
+        ]);
+
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        let b = StringArray::from(vec!["a", "b", "c", "d", "e"]);
+
+        let batch =
+            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])
+                .unwrap();
+
+        let mut buf = Vec::new();
+        {
+            let mut writer = Writer::new(&mut buf);
+            writer.write_batches(&vec![batch]).unwrap();
+        }
+
+        assert_eq!(
+            String::from_utf8(buf).unwrap(),
+            r#"{"a":1,"b":"a"}
+{"a":2,"b":"b"}
+{"a":3,"b":"c"}
+{"a":4,"b":"d"}
+{"a":5,"b":"e"}
+"#
+        );
+    }
+
+    fn test_write_for_file(test_file: &str) {
+        let builder = ReaderBuilder::new()
+            .infer_schema(None)
+            .with_batch_size(1024);
+        let mut reader: Reader<File> = builder
+            .build::<File>(File::open(test_file).unwrap())
+            .unwrap();
+        let batch = reader.next().unwrap().unwrap();
+
+        let mut buf = Vec::new();
+        {
+            let mut writer = Writer::new(&mut buf);
+            writer.write_batches(&vec![batch]).unwrap();
+        }
+
+        let result = String::from_utf8(buf).unwrap();
+        let expected = read_to_string(test_file).unwrap();
+        for (r, e) in result.lines().zip(expected.lines()) {
+            assert_eq!(
+                serde_json::from_str::<Value>(r).unwrap(),
+                serde_json::from_str::<Value>(e).unwrap()
+            );
+        }
+    }
+
+    #[test]
+    fn write_basic_rows() {
+        test_write_for_file("test/data/basic.json");

Review comment:
       AFAI understand this will populate a file and not delete it at the end.
   
   Would it be possible to write to a byte stream or something instead of a file? Alternatively, use `tmp` or something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org