You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2019/03/04 14:03:56 UTC
[arrow] branch master updated: ARROW-3838: [Rust] CSV Writer
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 1099305 ARROW-3838: [Rust] CSV Writer
1099305 is described below
commit 1099305c56db299454c9f6c6bee8f22da466bc35
Author: Neville Dipale <ne...@gmail.com>
AuthorDate: Mon Mar 4 07:03:47 2019 -0700
ARROW-3838: [Rust] CSV Writer
This supersedes #3111.
Supports:
* writing using the `csv` crate
* customising delimiter, and whether or not to write headers
Limitations:
* writes to `std::fs::File`, I struggled to make it write to any `std::io::Write` interface. @paddyhoran any ideas as you did it for `Reader`?
* values are converted first to `String` before being written. It could be more performant if we could convert directly to byte slice as `csv` crate supports that. I also struggled with this
Potential Further Work:
* writing temporal arrays (after #3726 [ARROW-4386])
Author: Neville Dipale <ne...@gmail.com>
Closes #3790 from nevi-me/ARROW-3838 and squashes the following commits:
7839949 <Neville Dipale> try fix tmp file issue
d60d0ce <Neville Dipale> change writer tests to write to target folder
74db488 <Neville Dipale> cargo fmt
1693c9b <Neville Dipale> ARROW-3838: CSV Writer
---
rust/arrow/Cargo.toml | 4 +
rust/arrow/benches/csv_writer.rs | 74 ++++++++
rust/arrow/src/csv/mod.rs | 3 +
rust/arrow/src/csv/writer.rs | 357 +++++++++++++++++++++++++++++++++++++++
rust/arrow/src/error.rs | 6 +
rust/arrow/src/util/test_util.rs | 27 +++
6 files changed, 471 insertions(+)
diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml
index 1c9fa3b..c319f24 100644
--- a/rust/arrow/Cargo.toml
+++ b/rust/arrow/Cargo.toml
@@ -71,3 +71,7 @@ harness = false
[[bench]]
name = "comparison_kernels"
harness = false
+
+[[bench]]
+name = "csv_writer"
+harness = false
diff --git a/rust/arrow/benches/csv_writer.rs b/rust/arrow/benches/csv_writer.rs
new file mode 100644
index 0000000..ec3bc5a
--- /dev/null
+++ b/rust/arrow/benches/csv_writer.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+extern crate arrow;
+extern crate criterion;
+
+use criterion::*;
+
+use arrow::array::*;
+use arrow::csv;
+use arrow::datatypes::*;
+use arrow::record_batch::RecordBatch;
+use std::fs::File;
+use std::sync::Arc;
+
+fn record_batches_to_csv() {
+ let schema = Schema::new(vec![
+ Field::new("c1", DataType::Utf8, false),
+ Field::new("c2", DataType::Float64, true),
+ Field::new("c3", DataType::UInt32, false),
+ Field::new("c3", DataType::Boolean, true),
+ ]);
+
+ let c1 = BinaryArray::from(vec![
+ "Lorem ipsum dolor sit amet",
+ "consectetur adipiscing elit",
+ "sed do eiusmod tempor",
+ ]);
+ let c2 = PrimitiveArray::<Float64Type>::from(vec![
+ Some(123.564532),
+ None,
+ Some(-556132.25),
+ ]);
+ let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
+ let c4 = PrimitiveArray::<BooleanType>::from(vec![Some(true), Some(false), None]);
+
+ let b = RecordBatch::new(
+ Arc::new(schema),
+ vec![Arc::new(c1), Arc::new(c2), Arc::new(c3), Arc::new(c4)],
+ );
+ let file = File::create("target/bench_write_csv.csv").unwrap();
+ let writer = csv::Writer::new(file);
+ criterion::black_box(
+ writer
+ .write(vec![&b, &b, &b, &b, &b, &b, &b, &b, &b, &b, &b])
+ .unwrap(),
+ );
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+ c.bench(
+ "record_batches_to_csv",
+ Benchmark::new("record_batches_to_csv", move |b| {
+ b.iter(|| record_batches_to_csv())
+ }),
+ );
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/rust/arrow/src/csv/mod.rs b/rust/arrow/src/csv/mod.rs
index b3deb3f..55de4b2 100644
--- a/rust/arrow/src/csv/mod.rs
+++ b/rust/arrow/src/csv/mod.rs
@@ -18,6 +18,9 @@
//! Transfer data between the Arrow memory format and CSV (comma-separated values).
pub mod reader;
+pub mod writer;
pub use self::reader::Reader;
pub use self::reader::ReaderBuilder;
+pub use self::writer::Writer;
+pub use self::writer::WriterBuilder;
diff --git a/rust/arrow/src/csv/writer.rs b/rust/arrow/src/csv/writer.rs
new file mode 100644
index 0000000..bf1e582
--- /dev/null
+++ b/rust/arrow/src/csv/writer.rs
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CSV Writer
+//!
+//! This CSV writer allows Arrow data (in record batches) to be written as CSV files.
+//! The writer does not support writing `ListArray` and `StructArray`.
+//!
+//! Example:
+//!
+//! ```
+//! use arrow::array::*;
+//! use arrow::csv;
+//! use arrow::datatypes::*;
+//! use arrow::record_batch::RecordBatch;
+//! use arrow::util::test_util::get_temp_file;
+//! use std::fs::File;
+//! use std::sync::Arc;
+//!
+//! let schema = Schema::new(vec![
+//! Field::new("c1", DataType::Utf8, false),
+//! Field::new("c2", DataType::Float64, true),
+//! Field::new("c3", DataType::UInt32, false),
+//! Field::new("c3", DataType::Boolean, true),
+//! ]);
+//! let c1 = BinaryArray::from(vec![
+//! "Lorem ipsum dolor sit amet",
+//! "consectetur adipiscing elit",
+//! "sed do eiusmod tempor",
+//! ]);
+//! let c2 = PrimitiveArray::<Float64Type>::from(vec![
+//! Some(123.564532),
+//! None,
+//! Some(-556132.25),
+//! ]);
+//! let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
+//! let c4 = PrimitiveArray::<BooleanType>::from(vec![Some(true), Some(false), None]);
+//!
+//! let batch = RecordBatch::new(
+//! Arc::new(schema),
+//! vec![Arc::new(c1), Arc::new(c2), Arc::new(c3), Arc::new(c4)],
+//! );
+//!
+//! let file = get_temp_file("out.csv", &[]);
+//!
+//! let writer = csv::Writer::new(file);
+//! writer.write(vec![&batch, &batch]).unwrap();
+//! ```
+
+use csv as csv_crate;
+
+use std::fs::File;
+
+use crate::array::*;
+use crate::datatypes::*;
+use crate::error::{ArrowError, Result};
+use crate::record_batch::RecordBatch;
+
+fn write_primitive_value<T>(array: &ArrayRef, i: usize) -> String
+where
+ T: ArrowNumericType,
+ T::Native: ::std::string::ToString,
+{
+ let c = array.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
+ c.value(i).to_string()
+}
+
+/// A CSV writer
+pub struct Writer {
+ /// The file to write to
+ file: File,
+ /// Column delimiter. Defaults to `b','`
+ delimiter: u8,
+ /// Whether file should be written with headers. Defaults to `true`
+ has_headers: bool,
+}
+
+impl Writer {
+ /// Create a new CsvWriter from a file, with default options
+ pub fn new(file: File) -> Self {
+ Writer {
+ file,
+ delimiter: b',',
+ has_headers: true,
+ }
+ }
+
+ /// Write a vector of record batches to a file
+ pub fn write(&self, batches: Vec<&RecordBatch>) -> Result<()> {
+ if batches.is_empty() {
+ return Err(ArrowError::CsvError(
+ "No record batches supplied to the CSV writer".to_string(),
+ ));
+ }
+ let mut builder = csv_crate::WriterBuilder::new();
+ let mut wtr = builder.delimiter(self.delimiter).from_writer(&self.file);
+ let num_columns = batches[0].num_columns();
+ if self.has_headers {
+ let mut headers: Vec<String> = Vec::with_capacity(num_columns);
+ &batches[0]
+ .schema()
+ .fields()
+ .iter()
+ .for_each(|field| headers.push(field.name().to_string()));
+ wtr.write_record(&headers[..])?;
+ }
+
+ for batch in batches {
+ for row_index in 0..batch.num_rows() {
+ // TODO: it'd be more efficient if we could create `record: Vec<&[u8]>
+ let mut record: Vec<String> = Vec::with_capacity(batch.num_columns());
+ for col_index in 0..batch.num_columns() {
+ let col = batch.column(col_index);
+ if col.is_null(row_index) {
+ // write an empty value
+ record.push(String::from(""));
+ continue;
+ }
+ let string = match col.data_type() {
+ DataType::Float64 => {
+ write_primitive_value::<Float64Type>(col, row_index)
+ }
+ DataType::Float32 => {
+ write_primitive_value::<Float32Type>(col, row_index)
+ }
+ DataType::Int8 => {
+ write_primitive_value::<Int8Type>(col, row_index)
+ }
+ DataType::Int16 => {
+ write_primitive_value::<Int16Type>(col, row_index)
+ }
+ DataType::Int32 => {
+ write_primitive_value::<Int32Type>(col, row_index)
+ }
+ DataType::Int64 => {
+ write_primitive_value::<Int64Type>(col, row_index)
+ }
+ DataType::UInt8 => {
+ write_primitive_value::<UInt8Type>(col, row_index)
+ }
+ DataType::UInt16 => {
+ write_primitive_value::<UInt16Type>(col, row_index)
+ }
+ DataType::UInt32 => {
+ write_primitive_value::<UInt32Type>(col, row_index)
+ }
+ DataType::UInt64 => {
+ write_primitive_value::<UInt64Type>(col, row_index)
+ }
+ DataType::Boolean => {
+ let c = col.as_any().downcast_ref::<BooleanArray>().unwrap();
+ c.value(row_index).to_string()
+ }
+ DataType::Utf8 => {
+ let c = col.as_any().downcast_ref::<BinaryArray>().unwrap();
+ String::from_utf8(c.value(row_index).to_vec())?
+ }
+ t => {
+ // List and Struct arrays not supported by the writer, any
+ // other type needs to be implemented
+ return Err(ArrowError::CsvError(format!(
+ "CSV Writer does not support {:?} data type",
+ t
+ )));
+ }
+ };
+
+ record.push(string);
+ }
+ wtr.write_record(&record[..])?;
+ }
+ wtr.flush()?;
+ }
+
+ Ok(())
+ }
+}
+
+/// A CSV writer builder
+pub struct WriterBuilder {
+ /// Optional column delimiter. Defaults to `b','`
+ delimiter: Option<u8>,
+ /// Whether to write column names as file headers. Defaults to `true`
+ has_headers: bool,
+}
+
+impl Default for WriterBuilder {
+ fn default() -> Self {
+ Self {
+ has_headers: true,
+ delimiter: None,
+ }
+ }
+}
+
+impl WriterBuilder {
+ /// Create a new builder for configuring CSV writing options.
+ ///
+ /// To convert a builder into a writer, call `WriterBuilder::build`
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// extern crate arrow;
+ ///
+ /// use arrow::csv;
+ /// use std::fs::File;
+ ///
+ /// fn example() -> csv::Writer {
+ /// let file = File::create("target/out.csv").unwrap();
+ ///
+ /// // create a builder that doesn't write headers
+ /// let builder = csv::WriterBuilder::new().has_headers(false);
+ /// let writer = builder.build(file);
+ ///
+ /// writer
+ /// }
+ /// ```
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Set whether to write headers
+ pub fn has_headers(mut self, has_headers: bool) -> Self {
+ self.has_headers = has_headers;
+ self
+ }
+
+ /// Set the CSV file's column delimiter as a byte character
+ pub fn with_delimiter(mut self, delimiter: u8) -> Self {
+ self.delimiter = Some(delimiter);
+ self
+ }
+
+ /// Create a new `Writer`
+ pub fn build(self, file: File) -> Writer {
+ Writer {
+ file,
+ delimiter: self.delimiter.unwrap_or(b','),
+ has_headers: self.has_headers,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use crate::datatypes::{Field, Schema};
+ use crate::util::test_util::get_temp_file;
+ use std::io::Read;
+ use std::sync::Arc;
+
+ #[test]
+ fn test_write_csv() {
+ let schema = Schema::new(vec![
+ Field::new("c1", DataType::Utf8, false),
+ Field::new("c2", DataType::Float64, true),
+ Field::new("c3", DataType::UInt32, false),
+ Field::new("c3", DataType::Boolean, true),
+ ]);
+
+ let c1 = BinaryArray::from(vec![
+ "Lorem ipsum dolor sit amet",
+ "consectetur adipiscing elit",
+ "sed do eiusmod tempor",
+ ]);
+ let c2 = PrimitiveArray::<Float64Type>::from(vec![
+ Some(123.564532),
+ None,
+ Some(-556132.25),
+ ]);
+ let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
+ let c4 = PrimitiveArray::<BooleanType>::from(vec![Some(true), Some(false), None]);
+
+ let batch = RecordBatch::new(
+ Arc::new(schema),
+ vec![Arc::new(c1), Arc::new(c2), Arc::new(c3), Arc::new(c4)],
+ );
+
+ let file = get_temp_file("columns.csv", &[]);
+
+ let writer = Writer::new(file);
+ writer.write(vec![&batch, &batch]).unwrap();
+
+ // check that file was written successfully
+ let mut file = File::open("target/debug/testdata/columns.csv").unwrap();
+ let mut buffer: Vec<u8> = vec![];
+ file.read_to_end(&mut buffer).unwrap();
+
+ assert_eq!(
+ "c1,c2,c3,c3\nLorem ipsum dolor sit amet,123.564532,3,true\nconsectetur adipiscing elit,,2,false\nsed do eiusmod tempor,-556132.25,1,\nLorem ipsum dolor sit amet,123.564532,3,true\nconsectetur adipiscing elit,,2,false\nsed do eiusmod tempor,-556132.25,1,\n"
+ .to_string(),
+ String::from_utf8(buffer).unwrap()
+ );
+ }
+
+ #[test]
+ fn test_write_csv_custom_options() {
+ let schema = Schema::new(vec![
+ Field::new("c1", DataType::Utf8, false),
+ Field::new("c2", DataType::Float64, true),
+ Field::new("c3", DataType::UInt32, false),
+ Field::new("c3", DataType::Boolean, true),
+ ]);
+
+ let c1 = BinaryArray::from(vec![
+ "Lorem ipsum dolor sit amet",
+ "consectetur adipiscing elit",
+ "sed do eiusmod tempor",
+ ]);
+ let c2 = PrimitiveArray::<Float64Type>::from(vec![
+ Some(123.564532),
+ None,
+ Some(-556132.25),
+ ]);
+ let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
+ let c4 = PrimitiveArray::<BooleanType>::from(vec![Some(true), Some(false), None]);
+
+ let batch = RecordBatch::new(
+ Arc::new(schema),
+ vec![Arc::new(c1), Arc::new(c2), Arc::new(c3), Arc::new(c4)],
+ );
+
+ let file = get_temp_file("custom_options.csv", &[]);
+
+ let builder = WriterBuilder::new().has_headers(false).with_delimiter(b'|');
+
+ let writer = builder.build(file);
+ writer.write(vec![&batch]).unwrap();
+
+ // check that file was written successfully
+ let mut file = File::open("target/debug/testdata/custom_options.csv").unwrap();
+ let mut buffer: Vec<u8> = vec![];
+ file.read_to_end(&mut buffer).unwrap();
+
+ assert_eq!(
+ "Lorem ipsum dolor sit amet|123.564532|3|true\nconsectetur adipiscing elit||2|false\nsed do eiusmod tempor|-556132.25|1|\n"
+ .to_string(),
+ String::from_utf8(buffer).unwrap()
+ );
+ }
+}
diff --git a/rust/arrow/src/error.rs b/rust/arrow/src/error.rs
index a981478..96ed944 100644
--- a/rust/arrow/src/error.rs
+++ b/rust/arrow/src/error.rs
@@ -62,4 +62,10 @@ impl From<csv_crate::Error> for ArrowError {
}
}
+impl From<::std::string::FromUtf8Error> for ArrowError {
+ fn from(error: ::std::string::FromUtf8Error) -> Self {
+ ArrowError::ParseError(error.description().to_string())
+ }
+}
+
pub type Result<T> = ::std::result::Result<T, ArrowError>;
diff --git a/rust/arrow/src/util/test_util.rs b/rust/arrow/src/util/test_util.rs
index 5d0e7b9..9b6cff2 100644
--- a/rust/arrow/src/util/test_util.rs
+++ b/rust/arrow/src/util/test_util.rs
@@ -18,6 +18,7 @@
//! Utils to make testing easier
use rand::{thread_rng, Rng};
+use std::{env, fs, io::Write};
/// Returns a vector of size `n`, filled with randomly generated bytes.
pub fn random_bytes(n: usize) -> Vec<u8> {
@@ -28,3 +29,29 @@ pub fn random_bytes(n: usize) -> Vec<u8> {
}
result
}
+
+/// Returns file handle for a temp file in 'target' directory with a provided content
+///
+/// TODO: Originates from `parquet` utils, can be merged in [ARROW-4064]
+pub fn get_temp_file(file_name: &str, content: &[u8]) -> fs::File {
+ // build tmp path to a file in "target/debug/testdata"
+ let mut path_buf = env::current_dir().unwrap();
+ path_buf.push("target");
+ path_buf.push("debug");
+ path_buf.push("testdata");
+ fs::create_dir_all(&path_buf).unwrap();
+ path_buf.push(file_name);
+
+ // write file content
+ let mut tmp_file = fs::File::create(path_buf.as_path()).unwrap();
+ tmp_file.write_all(content).unwrap();
+ tmp_file.sync_all().unwrap();
+
+ // return file handle for both read and write
+ let file = fs::OpenOptions::new()
+ .read(true)
+ .write(true)
+ .open(path_buf.as_path());
+ assert!(file.is_ok());
+ file.unwrap()
+}