You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/19 04:21:51 UTC

[GitHub] [arrow] houqp opened a new pull request #9256: WIP: ARROW-11310: [Rust] implement JSON writer

houqp opened a new pull request #9256:
URL: https://github.com/apache/arrow/pull/9256


   Sending out PR for early review while I work on finishing up the following TODOs:
   
   - [ ] List type support
   - [ ] Dictionary type support
   - [ ] Date & Time types support


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9256: WIP: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r563499210



##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)
+                .into_json_value()
+                .unwrap_or(Value::Null),
+        );
+    }
+}
+
+fn set_column_for_json_rows(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    match array.data_type() {
+        DataType::Null => {
+            for row in rows.iter_mut().take(row_count) {
+                row.insert(col_name.to_string(), Value::Null);
+            }
+        }
+        DataType::Boolean => {
+            let arr = as_boolean_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), arr.value(i).into());

Review comment:
       Same here. Try using `arr.iter()`

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)

Review comment:
       This won't take nulls into account.
   
   One way to go here is to use 
   
   ```rust
   rows.iter_mut().zip(primitive_arr.iter()).take(row_count).for_each(|(row, maybe_value)| {
   ...
   })
   ```
   
   `maybe_value` will be of type `Option<T::Native>`, where `None` represents a null value.
   
   

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)
+                .into_json_value()
+                .unwrap_or(Value::Null),
+        );
+    }
+}
+
+fn set_column_for_json_rows(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    match array.data_type() {
+        DataType::Null => {
+            for row in rows.iter_mut().take(row_count) {
+                row.insert(col_name.to_string(), Value::Null);
+            }
+        }
+        DataType::Boolean => {
+            let arr = as_boolean_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), arr.value(i).into());
+            }
+        }
+        DataType::Int8 => {
+            set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int16 => {
+            set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int32 => {
+            set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int64 => {
+            set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt8 => {
+            set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt16 => {
+            set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt32 => {
+            set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt64 => {
+            set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float32 => {
+            set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float64 => {
+            set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Utf8 => {
+            let strarr = as_string_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), strarr.value(i).into());
+            }
+        }
+        DataType::Struct(_) => {
+            let arr = as_struct_array(array);
+            let inner_col_names = arr.column_names();
+
+            let mut inner_objs = iter::repeat(JsonMap::new())
+                .take(row_count)
+                .collect::<Vec<JsonMap<String, Value>>>();
+
+            arr.columns()
+                .iter()
+                .enumerate()
+                .for_each(|(j, struct_col)| {
+                    set_column_for_json_rows(
+                        &mut inner_objs,
+                        row_count,
+                        struct_col,
+                        inner_col_names[j],
+                    );
+                });
+
+            rows.iter_mut()
+                .take(row_count)
+                .zip(inner_objs.into_iter())
+                .for_each(|(row, obj)| {
+                    row.insert(col_name.to_string(), Value::Object(obj));
+                });
+        }
+        _ => {
+            panic!(format!("Unsupported datatype: {:#?}", array.data_type()));
+        }
+    }
+}
+
+pub fn record_batches_to_json_rows(
+    batches: &[RecordBatch],
+) -> Vec<JsonMap<String, Value>> {
+    let mut rows: Vec<JsonMap<String, Value>> = iter::repeat(JsonMap::new())
+        .take(batches.iter().map(|b| b.num_rows()).sum())
+        .collect();
+
+    if !rows.is_empty() {
+        let schema = batches[0].schema();
+        let mut base = 0;
+        batches.iter().for_each(|batch| {
+            let row_count = batch.num_rows();
+            batch.columns().iter().enumerate().for_each(|(j, col)| {
+                let col_name = schema.field(j).name();
+                set_column_for_json_rows(&mut rows[base..], row_count, col, col_name);
+            });
+            base += row_count;
+        });
+    }
+
+    rows
+}
+
+/// A JSON writer
+#[derive(Debug)]
+pub struct Writer<W: Write> {
+    writer: BufWriter<W>,
+}
+
+impl<W: Write> Writer<W> {
+    pub fn new(writer: W) -> Self {
+        Self::from_buf_writer(BufWriter::new(writer))
+    }
+
+    pub fn from_buf_writer(writer: BufWriter<W>) -> Self {
+        Self { writer }
+    }
+
+    pub fn write_row(&mut self, row: &Value) -> Result<()> {
+        self.writer.write_all(&serde_json::to_vec(row)?)?;
+        self.writer.write_all(b"\n")?;
+        Ok(())
+    }
+
+    pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> {
+        for row in record_batches_to_json_rows(batches) {
+            self.write_row(&Value::Object(row))?;
+        }
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::fs::{read_to_string, File};
+    use std::sync::Arc;
+
+    use crate::json::reader::*;
+
+    use super::*;
+
+    #[test]
+    fn write_simple_rows() {
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Utf8, false),
+        ]);
+
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);

Review comment:
       could have use `vec![Some(1), None, Some(3), Some(4,) Some(5)]`, so that we also test null values?

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)
+                .into_json_value()
+                .unwrap_or(Value::Null),
+        );
+    }
+}
+
+fn set_column_for_json_rows(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    match array.data_type() {
+        DataType::Null => {
+            for row in rows.iter_mut().take(row_count) {
+                row.insert(col_name.to_string(), Value::Null);
+            }
+        }
+        DataType::Boolean => {
+            let arr = as_boolean_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), arr.value(i).into());
+            }
+        }
+        DataType::Int8 => {
+            set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int16 => {
+            set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int32 => {
+            set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int64 => {
+            set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt8 => {
+            set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt16 => {
+            set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt32 => {
+            set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt64 => {
+            set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float32 => {
+            set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float64 => {
+            set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Utf8 => {
+            let strarr = as_string_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), strarr.value(i).into());

Review comment:
       same here.

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)
+                .into_json_value()
+                .unwrap_or(Value::Null),
+        );
+    }
+}
+
+fn set_column_for_json_rows(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    match array.data_type() {
+        DataType::Null => {
+            for row in rows.iter_mut().take(row_count) {
+                row.insert(col_name.to_string(), Value::Null);
+            }
+        }
+        DataType::Boolean => {
+            let arr = as_boolean_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), arr.value(i).into());
+            }
+        }
+        DataType::Int8 => {
+            set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int16 => {
+            set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int32 => {
+            set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int64 => {
+            set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt8 => {
+            set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt16 => {
+            set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt32 => {
+            set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt64 => {
+            set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float32 => {
+            set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float64 => {
+            set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Utf8 => {
+            let strarr = as_string_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), strarr.value(i).into());
+            }
+        }
+        DataType::Struct(_) => {
+            let arr = as_struct_array(array);
+            let inner_col_names = arr.column_names();
+
+            let mut inner_objs = iter::repeat(JsonMap::new())
+                .take(row_count)
+                .collect::<Vec<JsonMap<String, Value>>>();
+
+            arr.columns()
+                .iter()
+                .enumerate()
+                .for_each(|(j, struct_col)| {
+                    set_column_for_json_rows(
+                        &mut inner_objs,
+                        row_count,
+                        struct_col,
+                        inner_col_names[j],
+                    );
+                });
+
+            rows.iter_mut()
+                .take(row_count)
+                .zip(inner_objs.into_iter())
+                .for_each(|(row, obj)| {
+                    row.insert(col_name.to_string(), Value::Object(obj));
+                });
+        }
+        _ => {
+            panic!(format!("Unsupported datatype: {:#?}", array.data_type()));
+        }
+    }
+}
+
+pub fn record_batches_to_json_rows(
+    batches: &[RecordBatch],
+) -> Vec<JsonMap<String, Value>> {
+    let mut rows: Vec<JsonMap<String, Value>> = iter::repeat(JsonMap::new())
+        .take(batches.iter().map(|b| b.num_rows()).sum())
+        .collect();
+
+    if !rows.is_empty() {
+        let schema = batches[0].schema();
+        let mut base = 0;
+        batches.iter().for_each(|batch| {
+            let row_count = batch.num_rows();
+            batch.columns().iter().enumerate().for_each(|(j, col)| {
+                let col_name = schema.field(j).name();
+                set_column_for_json_rows(&mut rows[base..], row_count, col, col_name);
+            });
+            base += row_count;
+        });
+    }
+
+    rows
+}
+
+/// A JSON writer
+#[derive(Debug)]
+pub struct Writer<W: Write> {
+    writer: BufWriter<W>,
+}
+
+impl<W: Write> Writer<W> {
+    pub fn new(writer: W) -> Self {
+        Self::from_buf_writer(BufWriter::new(writer))
+    }
+
+    pub fn from_buf_writer(writer: BufWriter<W>) -> Self {
+        Self { writer }
+    }
+
+    pub fn write_row(&mut self, row: &Value) -> Result<()> {
+        self.writer.write_all(&serde_json::to_vec(row)?)?;
+        self.writer.write_all(b"\n")?;
+        Ok(())
+    }
+
+    pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> {
+        for row in record_batches_to_json_rows(batches) {
+            self.write_row(&Value::Object(row))?;
+        }
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::fs::{read_to_string, File};
+    use std::sync::Arc;
+
+    use crate::json::reader::*;
+
+    use super::*;
+
+    #[test]
+    fn write_simple_rows() {
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Utf8, false),
+        ]);
+
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        let b = StringArray::from(vec!["a", "b", "c", "d", "e"]);
+
+        let batch =
+            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])
+                .unwrap();
+
+        let mut buf = Vec::new();
+        {
+            let mut writer = Writer::new(&mut buf);
+            writer.write_batches(&vec![batch]).unwrap();
+        }
+
+        assert_eq!(
+            String::from_utf8(buf).unwrap(),
+            r#"{"a":1,"b":"a"}
+{"a":2,"b":"b"}
+{"a":3,"b":"c"}
+{"a":4,"b":"d"}
+{"a":5,"b":"e"}
+"#
+        );
+    }
+
+    fn test_write_for_file(test_file: &str) {
+        let builder = ReaderBuilder::new()
+            .infer_schema(None)
+            .with_batch_size(1024);
+        let mut reader: Reader<File> = builder
+            .build::<File>(File::open(test_file).unwrap())
+            .unwrap();
+        let batch = reader.next().unwrap().unwrap();
+
+        let mut buf = Vec::new();
+        {
+            let mut writer = Writer::new(&mut buf);
+            writer.write_batches(&vec![batch]).unwrap();
+        }
+
+        let result = String::from_utf8(buf).unwrap();
+        let expected = read_to_string(test_file).unwrap();
+        for (r, e) in result.lines().zip(expected.lines()) {
+            assert_eq!(
+                serde_json::from_str::<Value>(r).unwrap(),
+                serde_json::from_str::<Value>(e).unwrap()
+            );
+        }
+    }
+
+    #[test]
+    fn write_basic_rows() {
+        test_write_for_file("test/data/basic.json");

Review comment:
       AFAI understand this will populate a file and not delete it at the end.
   
   Would it be possible to write to a byte stream or something instead of a file? Alternatively, use `tmp` or something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] houqp commented on a change in pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r568339523



##########
File path: rust/arrow/src/array/cast.rs
##########
@@ -40,12 +40,20 @@ where
         .expect("Unable to downcast to dictionary array")
 }
 
-pub fn as_list_array<S: OffsetSizeTrait>(arr: &ArrayRef) -> &GenericListArray<S> {
+pub fn as_generic_list_array<S: OffsetSizeTrait>(arr: &ArrayRef) -> &GenericListArray<S> {
     arr.as_any()
         .downcast_ref::<GenericListArray<S>>()
         .expect("Unable to downcast to list array")
 }
 
+pub fn as_list_array(arr: &ArrayRef) -> &ListArray {

Review comment:
       @jorgecarleitao the main reason for adding that is to keep these two cast function names consistent with others. i.e. for a particular `FooBarArray` type, there exists a `as_foo_bar_array` cast function. However, if you have strong opinion on this, I am happy to remove it.
   
   I have added the inline annotation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb closed pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
alamb closed pull request #9256:
URL: https://github.com/apache/arrow/pull/9256


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] houqp commented on a change in pull request #9256: WIP: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r565027825



##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)
+                .into_json_value()
+                .unwrap_or(Value::Null),
+        );
+    }
+}
+
+fn set_column_for_json_rows(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    match array.data_type() {
+        DataType::Null => {
+            for row in rows.iter_mut().take(row_count) {
+                row.insert(col_name.to_string(), Value::Null);
+            }
+        }
+        DataType::Boolean => {
+            let arr = as_boolean_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), arr.value(i).into());
+            }
+        }
+        DataType::Int8 => {
+            set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int16 => {
+            set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int32 => {
+            set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int64 => {
+            set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt8 => {
+            set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt16 => {
+            set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt32 => {
+            set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt64 => {
+            set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float32 => {
+            set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float64 => {
+            set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Utf8 => {
+            let strarr = as_string_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), strarr.value(i).into());
+            }
+        }
+        DataType::Struct(_) => {
+            let arr = as_struct_array(array);
+            let inner_col_names = arr.column_names();
+
+            let mut inner_objs = iter::repeat(JsonMap::new())
+                .take(row_count)
+                .collect::<Vec<JsonMap<String, Value>>>();
+
+            arr.columns()
+                .iter()
+                .enumerate()
+                .for_each(|(j, struct_col)| {
+                    set_column_for_json_rows(
+                        &mut inner_objs,
+                        row_count,
+                        struct_col,
+                        inner_col_names[j],
+                    );
+                });
+
+            rows.iter_mut()
+                .take(row_count)
+                .zip(inner_objs.into_iter())
+                .for_each(|(row, obj)| {
+                    row.insert(col_name.to_string(), Value::Object(obj));
+                });
+        }
+        _ => {
+            panic!(format!("Unsupported datatype: {:#?}", array.data_type()));
+        }
+    }
+}
+
+pub fn record_batches_to_json_rows(
+    batches: &[RecordBatch],
+) -> Vec<JsonMap<String, Value>> {
+    let mut rows: Vec<JsonMap<String, Value>> = iter::repeat(JsonMap::new())
+        .take(batches.iter().map(|b| b.num_rows()).sum())
+        .collect();
+
+    if !rows.is_empty() {
+        let schema = batches[0].schema();
+        let mut base = 0;
+        batches.iter().for_each(|batch| {
+            let row_count = batch.num_rows();
+            batch.columns().iter().enumerate().for_each(|(j, col)| {
+                let col_name = schema.field(j).name();
+                set_column_for_json_rows(&mut rows[base..], row_count, col, col_name);
+            });
+            base += row_count;
+        });
+    }
+
+    rows
+}
+
+/// A JSON writer
+#[derive(Debug)]
+pub struct Writer<W: Write> {
+    writer: BufWriter<W>,
+}
+
+impl<W: Write> Writer<W> {
+    pub fn new(writer: W) -> Self {
+        Self::from_buf_writer(BufWriter::new(writer))
+    }
+
+    pub fn from_buf_writer(writer: BufWriter<W>) -> Self {
+        Self { writer }
+    }
+
+    pub fn write_row(&mut self, row: &Value) -> Result<()> {
+        self.writer.write_all(&serde_json::to_vec(row)?)?;
+        self.writer.write_all(b"\n")?;
+        Ok(())
+    }
+
+    pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> {
+        for row in record_batches_to_json_rows(batches) {
+            self.write_row(&Value::Object(row))?;
+        }
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::fs::{read_to_string, File};
+    use std::sync::Arc;
+
+    use crate::json::reader::*;
+
+    use super::*;
+
+    #[test]
+    fn write_simple_rows() {
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Utf8, false),
+        ]);
+
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        let b = StringArray::from(vec!["a", "b", "c", "d", "e"]);
+
+        let batch =
+            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])
+                .unwrap();
+
+        let mut buf = Vec::new();
+        {
+            let mut writer = Writer::new(&mut buf);
+            writer.write_batches(&vec![batch]).unwrap();
+        }
+
+        assert_eq!(
+            String::from_utf8(buf).unwrap(),
+            r#"{"a":1,"b":"a"}
+{"a":2,"b":"b"}
+{"a":3,"b":"c"}
+{"a":4,"b":"d"}
+{"a":5,"b":"e"}
+"#
+        );
+    }
+
+    fn test_write_for_file(test_file: &str) {
+        let builder = ReaderBuilder::new()
+            .infer_schema(None)
+            .with_batch_size(1024);
+        let mut reader: Reader<File> = builder
+            .build::<File>(File::open(test_file).unwrap())
+            .unwrap();
+        let batch = reader.next().unwrap().unwrap();
+
+        let mut buf = Vec::new();
+        {
+            let mut writer = Writer::new(&mut buf);
+            writer.write_batches(&vec![batch]).unwrap();
+        }
+
+        let result = String::from_utf8(buf).unwrap();
+        let expected = read_to_string(test_file).unwrap();
+        for (r, e) in result.lines().zip(expected.lines()) {
+            assert_eq!(
+                serde_json::from_str::<Value>(r).unwrap(),
+                serde_json::from_str::<Value>(e).unwrap()
+            );
+        }
+    }
+
+    #[test]
+    fn write_basic_rows() {
+        test_write_for_file("test/data/basic.json");

Review comment:
       This function reads json test file from existing test data, serializes it to an in memory byte stream, then use that result to compare against the same test file from disk for diff. So there should be no disk write involved.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r568205865



##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,635 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn primitive_array_to_json<T: ArrowPrimitiveType>(array: &ArrayRef) -> Vec<Value> {
+    as_primitive_array::<T>(array)
+        .iter()
+        .map(|maybe_value| match maybe_value {
+            Some(v) => v.into_json_value().unwrap_or(Value::Null),
+            None => Value::Null,
+        })
+        .collect()
+}
+
+fn struct_array_to_jsonmap_array(
+    array: &StructArray,
+    row_count: usize,
+) -> Vec<JsonMap<String, Value>> {
+    let inner_col_names = array.column_names();
+
+    let mut inner_objs = iter::repeat(JsonMap::new())
+        .take(row_count)
+        .collect::<Vec<JsonMap<String, Value>>>();
+
+    array
+        .columns()
+        .iter()
+        .enumerate()
+        .for_each(|(j, struct_col)| {
+            set_column_for_json_rows(
+                &mut inner_objs,
+                row_count,
+                struct_col,
+                inner_col_names[j],
+            );
+        });
+
+    inner_objs
+}
+
+pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
+    match array.data_type() {
+        DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(),
+        DataType::Boolean => as_boolean_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+
+        DataType::Utf8 => as_string_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Int8 => primitive_array_to_json::<Int8Type>(array),
+        DataType::Int16 => primitive_array_to_json::<Int16Type>(array),
+        DataType::Int32 => primitive_array_to_json::<Int32Type>(array),
+        DataType::Int64 => primitive_array_to_json::<Int64Type>(array),
+        DataType::UInt8 => primitive_array_to_json::<UInt8Type>(array),
+        DataType::UInt16 => primitive_array_to_json::<UInt16Type>(array),
+        DataType::UInt32 => primitive_array_to_json::<UInt32Type>(array),
+        DataType::UInt64 => primitive_array_to_json::<UInt64Type>(array),
+        DataType::Float32 => primitive_array_to_json::<Float32Type>(array),
+        DataType::Float64 => primitive_array_to_json::<Float64Type>(array),
+        DataType::List(_) => as_list_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => Value::Array(array_to_json_array(&v)),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Struct(_) => {
+            let jsonmaps =
+                struct_array_to_jsonmap_array(as_struct_array(array), array.len());
+            jsonmaps.into_iter().map(Value::Object).collect()
+        }
+        _ => {

Review comment:
       Yes I agree -- on both counts :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#issuecomment-771111068


   cc @andygrove @alamb @nevi-me @sunchao 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] houqp commented on a change in pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r568340019



##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,635 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn primitive_array_to_json<T: ArrowPrimitiveType>(array: &ArrayRef) -> Vec<Value> {
+    as_primitive_array::<T>(array)
+        .iter()
+        .map(|maybe_value| match maybe_value {
+            Some(v) => v.into_json_value().unwrap_or(Value::Null),
+            None => Value::Null,
+        })
+        .collect()
+}
+
+fn struct_array_to_jsonmap_array(
+    array: &StructArray,
+    row_count: usize,
+) -> Vec<JsonMap<String, Value>> {
+    let inner_col_names = array.column_names();
+
+    let mut inner_objs = iter::repeat(JsonMap::new())
+        .take(row_count)
+        .collect::<Vec<JsonMap<String, Value>>>();
+
+    array
+        .columns()
+        .iter()
+        .enumerate()
+        .for_each(|(j, struct_col)| {
+            set_column_for_json_rows(
+                &mut inner_objs,
+                row_count,
+                struct_col,
+                inner_col_names[j],
+            );
+        });
+
+    inner_objs
+}
+
+pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
+    match array.data_type() {
+        DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(),
+        DataType::Boolean => as_boolean_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+
+        DataType::Utf8 => as_string_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Int8 => primitive_array_to_json::<Int8Type>(array),
+        DataType::Int16 => primitive_array_to_json::<Int16Type>(array),
+        DataType::Int32 => primitive_array_to_json::<Int32Type>(array),
+        DataType::Int64 => primitive_array_to_json::<Int64Type>(array),
+        DataType::UInt8 => primitive_array_to_json::<UInt8Type>(array),
+        DataType::UInt16 => primitive_array_to_json::<UInt16Type>(array),
+        DataType::UInt32 => primitive_array_to_json::<UInt32Type>(array),
+        DataType::UInt64 => primitive_array_to_json::<UInt64Type>(array),
+        DataType::Float32 => primitive_array_to_json::<Float32Type>(array),
+        DataType::Float64 => primitive_array_to_json::<Float64Type>(array),
+        DataType::List(_) => as_list_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => Value::Array(array_to_json_array(&v)),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Struct(_) => {
+            let jsonmaps =
+                struct_array_to_jsonmap_array(as_struct_array(array), array.len());
+            jsonmaps.into_iter().map(Value::Object).collect()
+        }
+        _ => {

Review comment:
       yeah, i was planning to add dictionary support, but this patch set already grew larger than I expected, so I decided to leave that to a follow up PR for easier review.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#issuecomment-771974198


   I plan to review this PR later today and merge unless I hear otherwise


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r568340343



##########
File path: rust/arrow/src/array/cast.rs
##########
@@ -40,12 +40,20 @@ where
         .expect("Unable to downcast to dictionary array")
 }
 
-pub fn as_list_array<S: OffsetSizeTrait>(arr: &ArrayRef) -> &GenericListArray<S> {
+pub fn as_generic_list_array<S: OffsetSizeTrait>(arr: &ArrayRef) -> &GenericListArray<S> {
     arr.as_any()
         .downcast_ref::<GenericListArray<S>>()
         .expect("Unable to downcast to list array")
 }
 
+pub fn as_list_array(arr: &ArrayRef) -> &ListArray {

Review comment:
       Makes sense. Let's keep it πŸ‘




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] houqp commented on a change in pull request #9256: WIP: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r567489241



##########
File path: rust/arrow/src/array/cast.rs
##########
@@ -40,12 +40,20 @@ where
         .expect("Unable to downcast to dictionary array")
 }
 
-pub fn as_list_array<S: OffsetSizeTrait>(arr: &ArrayRef) -> &GenericListArray<S> {
+pub fn as_generic_list_array<S: OffsetSizeTrait>(arr: &ArrayRef) -> &GenericListArray<S> {

Review comment:
       breaking change to make naming more consistent with array type.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #9256: WIP: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#issuecomment-762596751


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=h1) Report
   > Merging [#9256](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=desc) (0c68983) into [master](https://codecov.io/gh/apache/arrow/commit/1393188e1aa1b3d59993ce7d4ade7f7ac8570959?el=desc) (1393188) will **decrease** coverage by `0.01%`.
   > The diff coverage is `79.20%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9256/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9256      +/-   ##
   ==========================================
   - Coverage   81.61%   81.59%   -0.02%     
   ==========================================
     Files         215      216       +1     
     Lines       51867    51987     +120     
   ==========================================
   + Hits        42329    42421      +92     
   - Misses       9538     9566      +28     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=tree) | Coverage Ξ” | |
   |---|---|---|
   | [rust/arrow/src/json/writer.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvanNvbi93cml0ZXIucnM=) | `72.91% <72.91%> (ΓΈ)` | |
   | [rust/parquet/src/arrow/schema.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9zY2hlbWEucnM=) | `91.66% <100.00%> (+0.16%)` | :arrow_up: |
   | [rust/parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9lbmNvZGluZ3MvZW5jb2RpbmcucnM=) | `94.86% <0.00%> (-0.20%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=footer). Last update [69a9a1c...0c68983](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#issuecomment-771974198


   I plan to review this PR later today and merge unless I hear otherwise


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io commented on pull request #9256: WIP: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#issuecomment-762596751


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=h1) Report
   > Merging [#9256](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=desc) (3c9fae3) into [master](https://codecov.io/gh/apache/arrow/commit/1393188e1aa1b3d59993ce7d4ade7f7ac8570959?el=desc) (1393188) will **decrease** coverage by `0.00%`.
   > The diff coverage is `79.20%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9256/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9256      +/-   ##
   ==========================================
   - Coverage   81.61%   81.60%   -0.01%     
   ==========================================
     Files         215      216       +1     
     Lines       51867    51987     +120     
   ==========================================
   + Hits        42329    42422      +93     
   - Misses       9538     9565      +27     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=tree) | Coverage Ξ” | |
   |---|---|---|
   | [rust/arrow/src/json/writer.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvanNvbi93cml0ZXIucnM=) | `72.91% <72.91%> (ΓΈ)` | |
   | [rust/parquet/src/arrow/schema.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9zY2hlbWEucnM=) | `91.66% <100.00%> (+0.16%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=footer). Last update [69a9a1c...0c68983](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] houqp commented on a change in pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r568339523



##########
File path: rust/arrow/src/array/cast.rs
##########
@@ -40,12 +40,20 @@ where
         .expect("Unable to downcast to dictionary array")
 }
 
-pub fn as_list_array<S: OffsetSizeTrait>(arr: &ArrayRef) -> &GenericListArray<S> {
+pub fn as_generic_list_array<S: OffsetSizeTrait>(arr: &ArrayRef) -> &GenericListArray<S> {
     arr.as_any()
         .downcast_ref::<GenericListArray<S>>()
         .expect("Unable to downcast to list array")
 }
 
+pub fn as_list_array(arr: &ArrayRef) -> &ListArray {

Review comment:
       @jorgecarleitao the main reason for adding that is to keep these two cast function names consistent with others. i.e. for a particular `FooBarArray` type, there exists a `as_foo_bar_array` cast function. However, if you have strong opinion on this, I am happy to remove it.
   
   I have added the inline annotation.

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,635 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn primitive_array_to_json<T: ArrowPrimitiveType>(array: &ArrayRef) -> Vec<Value> {
+    as_primitive_array::<T>(array)
+        .iter()
+        .map(|maybe_value| match maybe_value {
+            Some(v) => v.into_json_value().unwrap_or(Value::Null),
+            None => Value::Null,
+        })
+        .collect()
+}
+
+fn struct_array_to_jsonmap_array(
+    array: &StructArray,
+    row_count: usize,
+) -> Vec<JsonMap<String, Value>> {
+    let inner_col_names = array.column_names();
+
+    let mut inner_objs = iter::repeat(JsonMap::new())
+        .take(row_count)
+        .collect::<Vec<JsonMap<String, Value>>>();
+
+    array
+        .columns()
+        .iter()
+        .enumerate()
+        .for_each(|(j, struct_col)| {
+            set_column_for_json_rows(
+                &mut inner_objs,
+                row_count,
+                struct_col,
+                inner_col_names[j],
+            );
+        });
+
+    inner_objs
+}
+
+pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
+    match array.data_type() {
+        DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(),
+        DataType::Boolean => as_boolean_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+
+        DataType::Utf8 => as_string_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Int8 => primitive_array_to_json::<Int8Type>(array),
+        DataType::Int16 => primitive_array_to_json::<Int16Type>(array),
+        DataType::Int32 => primitive_array_to_json::<Int32Type>(array),
+        DataType::Int64 => primitive_array_to_json::<Int64Type>(array),
+        DataType::UInt8 => primitive_array_to_json::<UInt8Type>(array),
+        DataType::UInt16 => primitive_array_to_json::<UInt16Type>(array),
+        DataType::UInt32 => primitive_array_to_json::<UInt32Type>(array),
+        DataType::UInt64 => primitive_array_to_json::<UInt64Type>(array),
+        DataType::Float32 => primitive_array_to_json::<Float32Type>(array),
+        DataType::Float64 => primitive_array_to_json::<Float64Type>(array),
+        DataType::List(_) => as_list_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => Value::Array(array_to_json_array(&v)),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Struct(_) => {
+            let jsonmaps =
+                struct_array_to_jsonmap_array(as_struct_array(array), array.len());
+            jsonmaps.into_iter().map(Value::Object).collect()
+        }
+        _ => {

Review comment:
       yeah, i was planning to add dictionary support, but this patch set already grew larger than I expected, so I decided to leave that to a follow up PR for easier review.

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,635 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn primitive_array_to_json<T: ArrowPrimitiveType>(array: &ArrayRef) -> Vec<Value> {
+    as_primitive_array::<T>(array)
+        .iter()
+        .map(|maybe_value| match maybe_value {
+            Some(v) => v.into_json_value().unwrap_or(Value::Null),
+            None => Value::Null,
+        })
+        .collect()
+}
+
+fn struct_array_to_jsonmap_array(
+    array: &StructArray,
+    row_count: usize,
+) -> Vec<JsonMap<String, Value>> {
+    let inner_col_names = array.column_names();
+
+    let mut inner_objs = iter::repeat(JsonMap::new())
+        .take(row_count)
+        .collect::<Vec<JsonMap<String, Value>>>();
+
+    array
+        .columns()
+        .iter()
+        .enumerate()
+        .for_each(|(j, struct_col)| {
+            set_column_for_json_rows(
+                &mut inner_objs,
+                row_count,
+                struct_col,
+                inner_col_names[j],
+            );
+        });
+
+    inner_objs
+}
+
+pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
+    match array.data_type() {
+        DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(),
+        DataType::Boolean => as_boolean_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+
+        DataType::Utf8 => as_string_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Int8 => primitive_array_to_json::<Int8Type>(array),
+        DataType::Int16 => primitive_array_to_json::<Int16Type>(array),
+        DataType::Int32 => primitive_array_to_json::<Int32Type>(array),
+        DataType::Int64 => primitive_array_to_json::<Int64Type>(array),
+        DataType::UInt8 => primitive_array_to_json::<UInt8Type>(array),
+        DataType::UInt16 => primitive_array_to_json::<UInt16Type>(array),
+        DataType::UInt32 => primitive_array_to_json::<UInt32Type>(array),
+        DataType::UInt64 => primitive_array_to_json::<UInt64Type>(array),
+        DataType::Float32 => primitive_array_to_json::<Float32Type>(array),
+        DataType::Float64 => primitive_array_to_json::<Float64Type>(array),
+        DataType::List(_) => as_list_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => Value::Array(array_to_json_array(&v)),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Struct(_) => {
+            let jsonmaps =
+                struct_array_to_jsonmap_array(as_struct_array(array), array.len());
+            jsonmaps.into_iter().map(Value::Object).collect()
+        }
+        _ => {

Review comment:
       yeah, i was planning to add dictionary support, but this patch set grew larger than I expected, so I decided to leave that to a follow up PR for easier review.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #9256: WIP: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#issuecomment-762592205


   https://issues.apache.org/jira/browse/ARROW-11310


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nevi-me commented on a change in pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
nevi-me commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r568118067



##########
File path: rust/arrow/src/array/cast.rs
##########
@@ -40,12 +40,20 @@ where
         .expect("Unable to downcast to dictionary array")
 }
 
-pub fn as_list_array<S: OffsetSizeTrait>(arr: &ArrayRef) -> &GenericListArray<S> {
+pub fn as_generic_list_array<S: OffsetSizeTrait>(arr: &ArrayRef) -> &GenericListArray<S> {
     arr.as_any()
         .downcast_ref::<GenericListArray<S>>()
         .expect("Unable to downcast to list array")
 }
 
+pub fn as_list_array(arr: &ArrayRef) -> &ListArray {

Review comment:
       We should probably inline them. I'm indifferent on whether we want to just keep as_generic_list or also the 2 more specific ones.

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,635 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn primitive_array_to_json<T: ArrowPrimitiveType>(array: &ArrayRef) -> Vec<Value> {
+    as_primitive_array::<T>(array)
+        .iter()
+        .map(|maybe_value| match maybe_value {
+            Some(v) => v.into_json_value().unwrap_or(Value::Null),
+            None => Value::Null,
+        })
+        .collect()
+}
+
+fn struct_array_to_jsonmap_array(
+    array: &StructArray,
+    row_count: usize,
+) -> Vec<JsonMap<String, Value>> {
+    let inner_col_names = array.column_names();
+
+    let mut inner_objs = iter::repeat(JsonMap::new())
+        .take(row_count)
+        .collect::<Vec<JsonMap<String, Value>>>();
+
+    array
+        .columns()
+        .iter()
+        .enumerate()
+        .for_each(|(j, struct_col)| {
+            set_column_for_json_rows(
+                &mut inner_objs,
+                row_count,
+                struct_col,
+                inner_col_names[j],
+            );
+        });
+
+    inner_objs
+}
+
+pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
+    match array.data_type() {
+        DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(),
+        DataType::Boolean => as_boolean_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+
+        DataType::Utf8 => as_string_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Int8 => primitive_array_to_json::<Int8Type>(array),
+        DataType::Int16 => primitive_array_to_json::<Int16Type>(array),
+        DataType::Int32 => primitive_array_to_json::<Int32Type>(array),
+        DataType::Int64 => primitive_array_to_json::<Int64Type>(array),
+        DataType::UInt8 => primitive_array_to_json::<UInt8Type>(array),
+        DataType::UInt16 => primitive_array_to_json::<UInt16Type>(array),
+        DataType::UInt32 => primitive_array_to_json::<UInt32Type>(array),
+        DataType::UInt64 => primitive_array_to_json::<UInt64Type>(array),
+        DataType::Float32 => primitive_array_to_json::<Float32Type>(array),
+        DataType::Float64 => primitive_array_to_json::<Float64Type>(array),
+        DataType::List(_) => as_list_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => Value::Array(array_to_json_array(&v)),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Struct(_) => {
+            let jsonmaps =
+                struct_array_to_jsonmap_array(as_struct_array(array), array.len());
+            jsonmaps.into_iter().map(Value::Object).collect()
+        }
+        _ => {

Review comment:
       I think there'd be demand for dictionary support, but we can add that separately




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r568205865



##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,635 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn primitive_array_to_json<T: ArrowPrimitiveType>(array: &ArrayRef) -> Vec<Value> {
+    as_primitive_array::<T>(array)
+        .iter()
+        .map(|maybe_value| match maybe_value {
+            Some(v) => v.into_json_value().unwrap_or(Value::Null),
+            None => Value::Null,
+        })
+        .collect()
+}
+
+fn struct_array_to_jsonmap_array(
+    array: &StructArray,
+    row_count: usize,
+) -> Vec<JsonMap<String, Value>> {
+    let inner_col_names = array.column_names();
+
+    let mut inner_objs = iter::repeat(JsonMap::new())
+        .take(row_count)
+        .collect::<Vec<JsonMap<String, Value>>>();
+
+    array
+        .columns()
+        .iter()
+        .enumerate()
+        .for_each(|(j, struct_col)| {
+            set_column_for_json_rows(
+                &mut inner_objs,
+                row_count,
+                struct_col,
+                inner_col_names[j],
+            );
+        });
+
+    inner_objs
+}
+
+pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
+    match array.data_type() {
+        DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(),
+        DataType::Boolean => as_boolean_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+
+        DataType::Utf8 => as_string_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Int8 => primitive_array_to_json::<Int8Type>(array),
+        DataType::Int16 => primitive_array_to_json::<Int16Type>(array),
+        DataType::Int32 => primitive_array_to_json::<Int32Type>(array),
+        DataType::Int64 => primitive_array_to_json::<Int64Type>(array),
+        DataType::UInt8 => primitive_array_to_json::<UInt8Type>(array),
+        DataType::UInt16 => primitive_array_to_json::<UInt16Type>(array),
+        DataType::UInt32 => primitive_array_to_json::<UInt32Type>(array),
+        DataType::UInt64 => primitive_array_to_json::<UInt64Type>(array),
+        DataType::Float32 => primitive_array_to_json::<Float32Type>(array),
+        DataType::Float64 => primitive_array_to_json::<Float64Type>(array),
+        DataType::List(_) => as_list_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => Value::Array(array_to_json_array(&v)),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Struct(_) => {
+            let jsonmaps =
+                struct_array_to_jsonmap_array(as_struct_array(array), array.len());
+            jsonmaps.into_iter().map(Value::Object).collect()
+        }
+        _ => {

Review comment:
       Yes I agree -- on both counts :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] houqp commented on a change in pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r568340019



##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,635 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn primitive_array_to_json<T: ArrowPrimitiveType>(array: &ArrayRef) -> Vec<Value> {
+    as_primitive_array::<T>(array)
+        .iter()
+        .map(|maybe_value| match maybe_value {
+            Some(v) => v.into_json_value().unwrap_or(Value::Null),
+            None => Value::Null,
+        })
+        .collect()
+}
+
+fn struct_array_to_jsonmap_array(
+    array: &StructArray,
+    row_count: usize,
+) -> Vec<JsonMap<String, Value>> {
+    let inner_col_names = array.column_names();
+
+    let mut inner_objs = iter::repeat(JsonMap::new())
+        .take(row_count)
+        .collect::<Vec<JsonMap<String, Value>>>();
+
+    array
+        .columns()
+        .iter()
+        .enumerate()
+        .for_each(|(j, struct_col)| {
+            set_column_for_json_rows(
+                &mut inner_objs,
+                row_count,
+                struct_col,
+                inner_col_names[j],
+            );
+        });
+
+    inner_objs
+}
+
+pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
+    match array.data_type() {
+        DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(),
+        DataType::Boolean => as_boolean_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+
+        DataType::Utf8 => as_string_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Int8 => primitive_array_to_json::<Int8Type>(array),
+        DataType::Int16 => primitive_array_to_json::<Int16Type>(array),
+        DataType::Int32 => primitive_array_to_json::<Int32Type>(array),
+        DataType::Int64 => primitive_array_to_json::<Int64Type>(array),
+        DataType::UInt8 => primitive_array_to_json::<UInt8Type>(array),
+        DataType::UInt16 => primitive_array_to_json::<UInt16Type>(array),
+        DataType::UInt32 => primitive_array_to_json::<UInt32Type>(array),
+        DataType::UInt64 => primitive_array_to_json::<UInt64Type>(array),
+        DataType::Float32 => primitive_array_to_json::<Float32Type>(array),
+        DataType::Float64 => primitive_array_to_json::<Float64Type>(array),
+        DataType::List(_) => as_list_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => Value::Array(array_to_json_array(&v)),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Struct(_) => {
+            let jsonmaps =
+                struct_array_to_jsonmap_array(as_struct_array(array), array.len());
+            jsonmaps.into_iter().map(Value::Object).collect()
+        }
+        _ => {

Review comment:
       yeah, i was planning to add dictionary support, but this patch set grew larger than I expected, so I decided to leave that to a follow up PR for easier review.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r568090293



##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,635 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn primitive_array_to_json<T: ArrowPrimitiveType>(array: &ArrayRef) -> Vec<Value> {
+    as_primitive_array::<T>(array)
+        .iter()
+        .map(|maybe_value| match maybe_value {
+            Some(v) => v.into_json_value().unwrap_or(Value::Null),
+            None => Value::Null,
+        })
+        .collect()
+}
+
+fn struct_array_to_jsonmap_array(
+    array: &StructArray,
+    row_count: usize,
+) -> Vec<JsonMap<String, Value>> {
+    let inner_col_names = array.column_names();
+
+    let mut inner_objs = iter::repeat(JsonMap::new())
+        .take(row_count)
+        .collect::<Vec<JsonMap<String, Value>>>();
+
+    array
+        .columns()
+        .iter()
+        .enumerate()
+        .for_each(|(j, struct_col)| {
+            set_column_for_json_rows(
+                &mut inner_objs,
+                row_count,
+                struct_col,
+                inner_col_names[j],
+            );
+        });
+
+    inner_objs
+}
+
+pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
+    match array.data_type() {
+        DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(),
+        DataType::Boolean => as_boolean_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+
+        DataType::Utf8 => as_string_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Int8 => primitive_array_to_json::<Int8Type>(array),
+        DataType::Int16 => primitive_array_to_json::<Int16Type>(array),
+        DataType::Int32 => primitive_array_to_json::<Int32Type>(array),
+        DataType::Int64 => primitive_array_to_json::<Int64Type>(array),
+        DataType::UInt8 => primitive_array_to_json::<UInt8Type>(array),
+        DataType::UInt16 => primitive_array_to_json::<UInt16Type>(array),
+        DataType::UInt32 => primitive_array_to_json::<UInt32Type>(array),
+        DataType::UInt64 => primitive_array_to_json::<UInt64Type>(array),
+        DataType::Float32 => primitive_array_to_json::<Float32Type>(array),
+        DataType::Float64 => primitive_array_to_json::<Float64Type>(array),
+        DataType::List(_) => as_list_array(array)

Review comment:
       ```rust
           DataType::List(_) => as_large_list_array(array)
               .iter()
               .map(|maybe_value| match maybe_value {
                   Some(v) => Value::Array(array_to_json_array(&v)),
                   None => Value::Null,
               })
               .collect(),
   ```
   
   for completeness?

##########
File path: rust/arrow/src/array/cast.rs
##########
@@ -40,12 +40,20 @@ where
         .expect("Unable to downcast to dictionary array")
 }
 
-pub fn as_list_array<S: OffsetSizeTrait>(arr: &ArrayRef) -> &GenericListArray<S> {
+pub fn as_generic_list_array<S: OffsetSizeTrait>(arr: &ArrayRef) -> &GenericListArray<S> {
     arr.as_any()
         .downcast_ref::<GenericListArray<S>>()
         .expect("Unable to downcast to list array")
 }
 
+pub fn as_list_array(arr: &ArrayRef) -> &ListArray {

Review comment:
       Not sure whether we need an extra public function for a one-liner.

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,635 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn primitive_array_to_json<T: ArrowPrimitiveType>(array: &ArrayRef) -> Vec<Value> {
+    as_primitive_array::<T>(array)
+        .iter()
+        .map(|maybe_value| match maybe_value {
+            Some(v) => v.into_json_value().unwrap_or(Value::Null),
+            None => Value::Null,
+        })
+        .collect()
+}
+
+fn struct_array_to_jsonmap_array(
+    array: &StructArray,
+    row_count: usize,
+) -> Vec<JsonMap<String, Value>> {
+    let inner_col_names = array.column_names();
+
+    let mut inner_objs = iter::repeat(JsonMap::new())
+        .take(row_count)
+        .collect::<Vec<JsonMap<String, Value>>>();
+
+    array
+        .columns()
+        .iter()
+        .enumerate()
+        .for_each(|(j, struct_col)| {
+            set_column_for_json_rows(
+                &mut inner_objs,
+                row_count,
+                struct_col,
+                inner_col_names[j],
+            );
+        });
+
+    inner_objs
+}
+
+pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
+    match array.data_type() {
+        DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(),
+        DataType::Boolean => as_boolean_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+
+        DataType::Utf8 => as_string_array(array)
+            .iter()
+            .map(|maybe_value| match maybe_value {
+                Some(v) => v.into(),
+                None => Value::Null,
+            })
+            .collect(),
+        DataType::Int8 => primitive_array_to_json::<Int8Type>(array),
+        DataType::Int16 => primitive_array_to_json::<Int16Type>(array),
+        DataType::Int32 => primitive_array_to_json::<Int32Type>(array),
+        DataType::Int64 => primitive_array_to_json::<Int64Type>(array),
+        DataType::UInt8 => primitive_array_to_json::<UInt8Type>(array),
+        DataType::UInt16 => primitive_array_to_json::<UInt16Type>(array),
+        DataType::UInt32 => primitive_array_to_json::<UInt32Type>(array),
+        DataType::UInt64 => primitive_array_to_json::<UInt64Type>(array),
+        DataType::Float32 => primitive_array_to_json::<Float32Type>(array),
+        DataType::Float64 => primitive_array_to_json::<Float64Type>(array),
+        DataType::List(_) => as_list_array(array)

Review comment:
       ```rust
           DataType::LargeList(_) => as_large_list_array(array)
               .iter()
               .map(|maybe_value| match maybe_value {
                   Some(v) => Value::Array(array_to_json_array(&v)),
                   None => Value::Null,
               })
               .collect(),
   ```
   
   for completeness?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] houqp commented on a change in pull request #9256: WIP: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r564283209



##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)

Review comment:
       Good catch, will update the code and add test cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #9256: WIP: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#issuecomment-762596751


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=h1) Report
   > Merging [#9256](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=desc) (f200959) into [master](https://codecov.io/gh/apache/arrow/commit/3e47777441795c1970a22f6bad103da3e867dc98?el=desc) (3e47777) will **decrease** coverage by `0.01%`.
   > The diff coverage is `84.58%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9256/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9256      +/-   ##
   ==========================================
   - Coverage   81.89%   81.87%   -0.02%     
   ==========================================
     Files         215      216       +1     
     Lines       52988    53097     +109     
   ==========================================
   + Hits        43392    43474      +82     
   - Misses       9596     9623      +27     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=tree) | Coverage Ξ” | |
   |---|---|---|
   | [rust/arrow/src/array/array\_struct.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvYXJyYXkvYXJyYXlfc3RydWN0LnJz) | `88.43% <ΓΈ> (ΓΈ)` | |
   | [rust/arrow/src/array/equal/utils.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvYXJyYXkvZXF1YWwvdXRpbHMucnM=) | `75.49% <0.00%> (ΓΈ)` | |
   | [rust/arrow/src/bytes.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvYnl0ZXMucnM=) | `53.12% <ΓΈ> (ΓΈ)` | |
   | [rust/arrow/src/compute/kernels/aggregate.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvY29tcHV0ZS9rZXJuZWxzL2FnZ3JlZ2F0ZS5ycw==) | `74.93% <ΓΈ> (ΓΈ)` | |
   | [rust/arrow/src/ipc/gen/SparseTensor.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvaXBjL2dlbi9TcGFyc2VUZW5zb3IucnM=) | `0.00% <0.00%> (ΓΈ)` | |
   | [rust/arrow/src/ipc/gen/Tensor.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvaXBjL2dlbi9UZW5zb3IucnM=) | `0.00% <0.00%> (ΓΈ)` | |
   | [rust/benchmarks/src/bin/nyctaxi.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9iZW5jaG1hcmtzL3NyYy9iaW4vbnljdGF4aS5ycw==) | `0.00% <ΓΈ> (ΓΈ)` | |
   | [rust/benchmarks/src/bin/tpch.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9iZW5jaG1hcmtzL3NyYy9iaW4vdHBjaC5ycw==) | `6.97% <0.00%> (ΓΈ)` | |
   | [rust/datafusion/src/datasource/memory.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL21lbW9yeS5ycw==) | `79.75% <0.00%> (ΓΈ)` | |
   | [rust/datafusion/src/logical\_plan/operators.rs](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9sb2dpY2FsX3BsYW4vb3BlcmF0b3JzLnJz) | `75.00% <ΓΈ> (ΓΈ)` | |
   | ... and [136 more](https://codecov.io/gh/apache/arrow/pull/9256/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=footer). Last update [5665a0c...f200959](https://codecov.io/gh/apache/arrow/pull/9256?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb closed pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
alamb closed pull request #9256:
URL: https://github.com/apache/arrow/pull/9256


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9256: WIP: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r563499210



##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)
+                .into_json_value()
+                .unwrap_or(Value::Null),
+        );
+    }
+}
+
+fn set_column_for_json_rows(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    match array.data_type() {
+        DataType::Null => {
+            for row in rows.iter_mut().take(row_count) {
+                row.insert(col_name.to_string(), Value::Null);
+            }
+        }
+        DataType::Boolean => {
+            let arr = as_boolean_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), arr.value(i).into());

Review comment:
       Same here. Try using `arr.iter()`

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)

Review comment:
       This won't take nulls into account.
   
   One way to go here is to use 
   
   ```rust
   rows.iter_mut().zip(primitive_arr.iter()).take(row_count).for_each(|(row, maybe_value)| {
   ...
   })
   ```
   
   `maybe_value` will be of type `Option<T::Native>`, where `None` represents a null value.
   
   

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)
+                .into_json_value()
+                .unwrap_or(Value::Null),
+        );
+    }
+}
+
+fn set_column_for_json_rows(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    match array.data_type() {
+        DataType::Null => {
+            for row in rows.iter_mut().take(row_count) {
+                row.insert(col_name.to_string(), Value::Null);
+            }
+        }
+        DataType::Boolean => {
+            let arr = as_boolean_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), arr.value(i).into());
+            }
+        }
+        DataType::Int8 => {
+            set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int16 => {
+            set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int32 => {
+            set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int64 => {
+            set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt8 => {
+            set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt16 => {
+            set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt32 => {
+            set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt64 => {
+            set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float32 => {
+            set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float64 => {
+            set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Utf8 => {
+            let strarr = as_string_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), strarr.value(i).into());
+            }
+        }
+        DataType::Struct(_) => {
+            let arr = as_struct_array(array);
+            let inner_col_names = arr.column_names();
+
+            let mut inner_objs = iter::repeat(JsonMap::new())
+                .take(row_count)
+                .collect::<Vec<JsonMap<String, Value>>>();
+
+            arr.columns()
+                .iter()
+                .enumerate()
+                .for_each(|(j, struct_col)| {
+                    set_column_for_json_rows(
+                        &mut inner_objs,
+                        row_count,
+                        struct_col,
+                        inner_col_names[j],
+                    );
+                });
+
+            rows.iter_mut()
+                .take(row_count)
+                .zip(inner_objs.into_iter())
+                .for_each(|(row, obj)| {
+                    row.insert(col_name.to_string(), Value::Object(obj));
+                });
+        }
+        _ => {
+            panic!(format!("Unsupported datatype: {:#?}", array.data_type()));
+        }
+    }
+}
+
+pub fn record_batches_to_json_rows(
+    batches: &[RecordBatch],
+) -> Vec<JsonMap<String, Value>> {
+    let mut rows: Vec<JsonMap<String, Value>> = iter::repeat(JsonMap::new())
+        .take(batches.iter().map(|b| b.num_rows()).sum())
+        .collect();
+
+    if !rows.is_empty() {
+        let schema = batches[0].schema();
+        let mut base = 0;
+        batches.iter().for_each(|batch| {
+            let row_count = batch.num_rows();
+            batch.columns().iter().enumerate().for_each(|(j, col)| {
+                let col_name = schema.field(j).name();
+                set_column_for_json_rows(&mut rows[base..], row_count, col, col_name);
+            });
+            base += row_count;
+        });
+    }
+
+    rows
+}
+
+/// A JSON writer
+#[derive(Debug)]
+pub struct Writer<W: Write> {
+    writer: BufWriter<W>,
+}
+
+impl<W: Write> Writer<W> {
+    pub fn new(writer: W) -> Self {
+        Self::from_buf_writer(BufWriter::new(writer))
+    }
+
+    pub fn from_buf_writer(writer: BufWriter<W>) -> Self {
+        Self { writer }
+    }
+
+    pub fn write_row(&mut self, row: &Value) -> Result<()> {
+        self.writer.write_all(&serde_json::to_vec(row)?)?;
+        self.writer.write_all(b"\n")?;
+        Ok(())
+    }
+
+    pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> {
+        for row in record_batches_to_json_rows(batches) {
+            self.write_row(&Value::Object(row))?;
+        }
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::fs::{read_to_string, File};
+    use std::sync::Arc;
+
+    use crate::json::reader::*;
+
+    use super::*;
+
+    #[test]
+    fn write_simple_rows() {
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Utf8, false),
+        ]);
+
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);

Review comment:
       could have use `vec![Some(1), None, Some(3), Some(4,) Some(5)]`, so that we also test null values?

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)
+                .into_json_value()
+                .unwrap_or(Value::Null),
+        );
+    }
+}
+
+fn set_column_for_json_rows(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    match array.data_type() {
+        DataType::Null => {
+            for row in rows.iter_mut().take(row_count) {
+                row.insert(col_name.to_string(), Value::Null);
+            }
+        }
+        DataType::Boolean => {
+            let arr = as_boolean_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), arr.value(i).into());
+            }
+        }
+        DataType::Int8 => {
+            set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int16 => {
+            set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int32 => {
+            set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int64 => {
+            set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt8 => {
+            set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt16 => {
+            set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt32 => {
+            set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt64 => {
+            set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float32 => {
+            set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float64 => {
+            set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Utf8 => {
+            let strarr = as_string_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), strarr.value(i).into());

Review comment:
       same here.

##########
File path: rust/arrow/src/json/writer.rs
##########
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Writer
+//!
+//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
+//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
+//! objects as bytes.
+//!
+//! Serialize record batches into array of JSON objects:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
+//! assert_eq!(
+//!     serde_json::Value::Object(json_rows[1].clone()),
+//!     serde_json::json!({"a": 2}),
+//! );
+//! ```
+//!
+//! Serialize record batches into line-delimited JSON bytes:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
+//!
+//! let buf = Vec::new();
+//! let mut writer = json::Writer::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! ```
+
+use std::io::{BufWriter, Write};
+use std::iter;
+
+use serde_json::map::Map as JsonMap;
+use serde_json::Value;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::Result;
+use crate::record_batch::RecordBatch;
+
+fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    let primitive_arr = as_primitive_array::<T>(array);
+    for (i, row) in rows.iter_mut().enumerate().take(row_count) {
+        row.insert(
+            col_name.to_string(),
+            primitive_arr
+                .value(i)
+                .into_json_value()
+                .unwrap_or(Value::Null),
+        );
+    }
+}
+
+fn set_column_for_json_rows(
+    rows: &mut [JsonMap<String, Value>],
+    row_count: usize,
+    array: &ArrayRef,
+    col_name: &str,
+) {
+    match array.data_type() {
+        DataType::Null => {
+            for row in rows.iter_mut().take(row_count) {
+                row.insert(col_name.to_string(), Value::Null);
+            }
+        }
+        DataType::Boolean => {
+            let arr = as_boolean_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), arr.value(i).into());
+            }
+        }
+        DataType::Int8 => {
+            set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int16 => {
+            set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int32 => {
+            set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Int64 => {
+            set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt8 => {
+            set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt16 => {
+            set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt32 => {
+            set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name)
+        }
+        DataType::UInt64 => {
+            set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float32 => {
+            set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name)
+        }
+        DataType::Float64 => {
+            set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name)
+        }
+        DataType::Utf8 => {
+            let strarr = as_string_array(array);
+            for (i, row) in rows.iter_mut().take(row_count).enumerate() {
+                row.insert(col_name.to_string(), strarr.value(i).into());
+            }
+        }
+        DataType::Struct(_) => {
+            let arr = as_struct_array(array);
+            let inner_col_names = arr.column_names();
+
+            let mut inner_objs = iter::repeat(JsonMap::new())
+                .take(row_count)
+                .collect::<Vec<JsonMap<String, Value>>>();
+
+            arr.columns()
+                .iter()
+                .enumerate()
+                .for_each(|(j, struct_col)| {
+                    set_column_for_json_rows(
+                        &mut inner_objs,
+                        row_count,
+                        struct_col,
+                        inner_col_names[j],
+                    );
+                });
+
+            rows.iter_mut()
+                .take(row_count)
+                .zip(inner_objs.into_iter())
+                .for_each(|(row, obj)| {
+                    row.insert(col_name.to_string(), Value::Object(obj));
+                });
+        }
+        _ => {
+            panic!(format!("Unsupported datatype: {:#?}", array.data_type()));
+        }
+    }
+}
+
+pub fn record_batches_to_json_rows(
+    batches: &[RecordBatch],
+) -> Vec<JsonMap<String, Value>> {
+    let mut rows: Vec<JsonMap<String, Value>> = iter::repeat(JsonMap::new())
+        .take(batches.iter().map(|b| b.num_rows()).sum())
+        .collect();
+
+    if !rows.is_empty() {
+        let schema = batches[0].schema();
+        let mut base = 0;
+        batches.iter().for_each(|batch| {
+            let row_count = batch.num_rows();
+            batch.columns().iter().enumerate().for_each(|(j, col)| {
+                let col_name = schema.field(j).name();
+                set_column_for_json_rows(&mut rows[base..], row_count, col, col_name);
+            });
+            base += row_count;
+        });
+    }
+
+    rows
+}
+
+/// A JSON writer
+#[derive(Debug)]
+pub struct Writer<W: Write> {
+    writer: BufWriter<W>,
+}
+
+impl<W: Write> Writer<W> {
+    pub fn new(writer: W) -> Self {
+        Self::from_buf_writer(BufWriter::new(writer))
+    }
+
+    pub fn from_buf_writer(writer: BufWriter<W>) -> Self {
+        Self { writer }
+    }
+
+    pub fn write_row(&mut self, row: &Value) -> Result<()> {
+        self.writer.write_all(&serde_json::to_vec(row)?)?;
+        self.writer.write_all(b"\n")?;
+        Ok(())
+    }
+
+    pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> {
+        for row in record_batches_to_json_rows(batches) {
+            self.write_row(&Value::Object(row))?;
+        }
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::fs::{read_to_string, File};
+    use std::sync::Arc;
+
+    use crate::json::reader::*;
+
+    use super::*;
+
+    #[test]
+    fn write_simple_rows() {
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Utf8, false),
+        ]);
+
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        let b = StringArray::from(vec!["a", "b", "c", "d", "e"]);
+
+        let batch =
+            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])
+                .unwrap();
+
+        let mut buf = Vec::new();
+        {
+            let mut writer = Writer::new(&mut buf);
+            writer.write_batches(&vec![batch]).unwrap();
+        }
+
+        assert_eq!(
+            String::from_utf8(buf).unwrap(),
+            r#"{"a":1,"b":"a"}
+{"a":2,"b":"b"}
+{"a":3,"b":"c"}
+{"a":4,"b":"d"}
+{"a":5,"b":"e"}
+"#
+        );
+    }
+
+    fn test_write_for_file(test_file: &str) {
+        let builder = ReaderBuilder::new()
+            .infer_schema(None)
+            .with_batch_size(1024);
+        let mut reader: Reader<File> = builder
+            .build::<File>(File::open(test_file).unwrap())
+            .unwrap();
+        let batch = reader.next().unwrap().unwrap();
+
+        let mut buf = Vec::new();
+        {
+            let mut writer = Writer::new(&mut buf);
+            writer.write_batches(&vec![batch]).unwrap();
+        }
+
+        let result = String::from_utf8(buf).unwrap();
+        let expected = read_to_string(test_file).unwrap();
+        for (r, e) in result.lines().zip(expected.lines()) {
+            assert_eq!(
+                serde_json::from_str::<Value>(r).unwrap(),
+                serde_json::from_str::<Value>(e).unwrap()
+            );
+        }
+    }
+
+    #[test]
+    fn write_basic_rows() {
+        test_write_for_file("test/data/basic.json");

Review comment:
       AFAI understand this will populate a file and not delete it at the end.
   
   Would it be possible to write to a byte stream or something instead of a file? Alternatively, use `tmp` or something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] houqp commented on pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#issuecomment-770626148


   @jorgecarleitao @nevi-me ready for review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9256: ARROW-11310: [Rust] implement JSON writer

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #9256:
URL: https://github.com/apache/arrow/pull/9256#discussion_r568340343



##########
File path: rust/arrow/src/array/cast.rs
##########
@@ -40,12 +40,20 @@ where
         .expect("Unable to downcast to dictionary array")
 }
 
-pub fn as_list_array<S: OffsetSizeTrait>(arr: &ArrayRef) -> &GenericListArray<S> {
+pub fn as_generic_list_array<S: OffsetSizeTrait>(arr: &ArrayRef) -> &GenericListArray<S> {
     arr.as_any()
         .downcast_ref::<GenericListArray<S>>()
         .expect("Unable to downcast to list array")
 }
 
+pub fn as_list_array(arr: &ArrayRef) -> &ListArray {

Review comment:
       Makes sense. Let's keep it πŸ‘




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org