You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/08 20:17:19 UTC
[arrow-rs] branch master updated: Split out arrow-csv (#2594) (#3044)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new fe3318bba Split out arrow-csv (#2594) (#3044)
fe3318bba is described below
commit fe3318bba24abfe572fa037a0b8805a15bdf5c45
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Nov 9 09:17:14 2022 +1300
Split out arrow-csv (#2594) (#3044)
* Split out arrow-csv (#2594)
* Fix doc
* Update arrow-csv/Cargo.toml
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
.github/workflows/arrow.yml | 5 +
.github/workflows/arrow_flight.yml | 1 +
.github/workflows/dev_pr/labeler.yml | 1 +
.github/workflows/integration.yml | 1 +
.github/workflows/miri.yaml | 1 +
.github/workflows/parquet.yml | 1 +
Cargo.toml | 1 +
arrow-csv/Cargo.toml | 53 +++
arrow/src/csv/mod.rs => arrow-csv/src/lib.rs | 8 +-
{arrow/src/csv => arrow-csv/src}/reader.rs | 546 +++------------------------
{arrow/src/csv => arrow-csv/src}/writer.rs | 172 ++-------
arrow/Cargo.toml | 11 +-
arrow/src/lib.rs | 2 +-
arrow/tests/csv.rs | 486 ++++++++++++++++++++++++
arrow/tests/{ipc_integration.rs => ipc.rs} | 0
15 files changed, 656 insertions(+), 633 deletions(-)
diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml
index d930086ef..461e7e87e 100644
--- a/.github/workflows/arrow.yml
+++ b/.github/workflows/arrow.yml
@@ -34,6 +34,7 @@ on:
- arrow-select/**
- arrow-integration-test/**
- arrow-ipc/**
+ - arrow-csv/**
- .github/**
jobs:
@@ -64,6 +65,8 @@ jobs:
run: cargo test -p arrow-cast --all-features
- name: Test arrow-ipc with all features
run: cargo test -p arrow-ipc --all-features
+ - name: Test arrow-csv with all features
+ run: cargo test -p arrow-csv --all-features
- name: Test arrow-integration-test with all features
run: cargo test -p arrow-integration-test --all-features
- name: Test arrow with default features
@@ -174,5 +177,7 @@ jobs:
run: cargo clippy -p arrow-cast --all-targets --all-features -- -D warnings
- name: Clippy arrow-ipc with all features
run: cargo clippy -p arrow-ipc --all-targets --all-features -- -D warnings
+ - name: Clippy arrow-csv with all features
+ run: cargo clippy -p arrow-csv --all-targets --all-features -- -D warnings
- name: Clippy arrow
run: cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi,ipc_compression,dyn_cmp_dict,dyn_arith_dict,chrono-tz --all-targets -- -D warnings
diff --git a/.github/workflows/arrow_flight.yml b/.github/workflows/arrow_flight.yml
index ded4f5a67..1f830ccf2 100644
--- a/.github/workflows/arrow_flight.yml
+++ b/.github/workflows/arrow_flight.yml
@@ -36,6 +36,7 @@ on:
- arrow-select/**
- arrow-flight/**
- arrow-ipc/**
+ - arrow-csv/**
- .github/**
jobs:
diff --git a/.github/workflows/dev_pr/labeler.yml b/.github/workflows/dev_pr/labeler.yml
index 17ebf54de..04c7c080e 100644
--- a/.github/workflows/dev_pr/labeler.yml
+++ b/.github/workflows/dev_pr/labeler.yml
@@ -24,6 +24,7 @@ arrow:
- arrow-schema/**/*
- arrow-select/**/*
- arrow-ipc/**/*
+ - arrow-csv/**/*
arrow-flight:
- arrow-flight/**/*
diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml
index 8566230ea..9418b9042 100644
--- a/.github/workflows/integration.yml
+++ b/.github/workflows/integration.yml
@@ -32,6 +32,7 @@ on:
- arrow-schema/**
- arrow-select/**
- arrow-ipc/**
+ - arrow-csv/**
- arrow-pyarrow-integration-testing/**
- arrow-integration-test/**
- arrow-integration-testing/**
diff --git a/.github/workflows/miri.yaml b/.github/workflows/miri.yaml
index 2e85c9dd9..e58ebdb35 100644
--- a/.github/workflows/miri.yaml
+++ b/.github/workflows/miri.yaml
@@ -32,6 +32,7 @@ on:
- arrow-schema/**
- arrow-select/**
- arrow-ipc/**
+ - arrow-csv/**
- .github/**
jobs:
diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml
index b369ef69b..4f3cf5f80 100644
--- a/.github/workflows/parquet.yml
+++ b/.github/workflows/parquet.yml
@@ -35,6 +35,7 @@ on:
- arrow-schema/**
- arrow-select/**
- arrow-ipc/**
+ - arrow-csv/**
- parquet/**
- .github/**
diff --git a/Cargo.toml b/Cargo.toml
index 0ab4853c6..18497d043 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,6 +21,7 @@ members = [
"arrow-array",
"arrow-buffer",
"arrow-cast",
+ "arrow-csv",
"arrow-data",
"arrow-flight",
"arrow-integration-test",
diff --git a/arrow-csv/Cargo.toml b/arrow-csv/Cargo.toml
new file mode 100644
index 000000000..d40cef0db
--- /dev/null
+++ b/arrow-csv/Cargo.toml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "arrow-csv"
+version = "26.0.0"
+description = "Support for parsing CSV format into the Arrow format"
+homepage = "https://github.com/apache/arrow-rs"
+repository = "https://github.com/apache/arrow-rs"
+authors = ["Apache Arrow <de...@arrow.apache.org>"]
+license = "Apache-2.0"
+keywords = ["arrow"]
+include = [
+ "benches/*.rs",
+ "src/**/*.rs",
+ "Cargo.toml",
+]
+edition = "2021"
+rust-version = "1.62"
+
+[lib]
+name = "arrow_csv"
+path = "src/lib.rs"
+bench = false
+
+[dependencies]
+arrow-array = { version = "26.0.0", path = "../arrow-array" }
+arrow-buffer = { version = "26.0.0", path = "../arrow-buffer" }
+arrow-cast = { version = "26.0.0", path = "../arrow-cast" }
+arrow-data = { version = "26.0.0", path = "../arrow-data" }
+arrow-schema = { version = "26.0.0", path = "../arrow-schema" }
+chrono = { version = "0.4", default-features = false, features = ["clock"] }
+csv = { version = "1.1", default-features = false }
+lazy_static = { version = "1.4", default-features = false }
+lexical-core = { version = "^0.8", default-features = false }
+regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] }
+
+[dev-dependencies]
+tempfile = "3.3"
diff --git a/arrow/src/csv/mod.rs b/arrow-csv/src/lib.rs
similarity index 85%
rename from arrow/src/csv/mod.rs
rename to arrow-csv/src/lib.rs
index 46ba7d71e..a45cf082d 100644
--- a/arrow/src/csv/mod.rs
+++ b/arrow-csv/src/lib.rs
@@ -27,14 +27,14 @@ pub use self::writer::Writer;
pub use self::writer::WriterBuilder;
use arrow_schema::ArrowError;
-fn map_csv_error(error: csv_crate::Error) -> ArrowError {
+fn map_csv_error(error: csv::Error) -> ArrowError {
match error.kind() {
- csv_crate::ErrorKind::Io(error) => ArrowError::CsvError(error.to_string()),
- csv_crate::ErrorKind::Utf8 { pos: _, err } => ArrowError::CsvError(format!(
+ csv::ErrorKind::Io(error) => ArrowError::CsvError(error.to_string()),
+ csv::ErrorKind::Utf8 { pos: _, err } => ArrowError::CsvError(format!(
"Encountered UTF-8 error while reading CSV file: {}",
err
)),
- csv_crate::ErrorKind::UnequalLengths {
+ csv::ErrorKind::UnequalLengths {
expected_len, len, ..
} => ArrowError::CsvError(format!(
"Encountered unequal lengths between records on CSV file. Expected {} \
diff --git a/arrow/src/csv/reader.rs b/arrow-csv/src/reader.rs
similarity index 75%
rename from arrow/src/csv/reader.rs
rename to arrow-csv/src/reader.rs
index 404f37e93..459c23ad2 100644
--- a/arrow/src/csv/reader.rs
+++ b/arrow-csv/src/reader.rs
@@ -22,11 +22,11 @@
//!
//! Example:
//!
-//! ```
-//! use arrow::csv;
-//! use arrow::datatypes::{DataType, Field, Schema};
-//! use std::fs::File;
-//! use std::sync::Arc;
+//! ```no_run
+//! # use arrow_schema::*;
+//! # use arrow_csv::Reader;
+//! # use std::fs::File;
+//! # use std::sync::Arc;
//!
//! let schema = Schema::new(vec![
//! Field::new("city", DataType::Utf8, false),
@@ -36,7 +36,7 @@
//!
//! let file = File::open("test/data/uk_cities.csv").unwrap();
//!
-//! let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None);
+//! let mut csv = Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None);
//! let batch = csv.next().unwrap().unwrap();
//! ```
@@ -49,17 +49,15 @@ use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
-use crate::array::{
- ArrayRef, BooleanArray, Decimal128Builder, DictionaryArray, PrimitiveArray,
- StringArray,
-};
-use crate::datatypes::*;
-use crate::error::{ArrowError, Result};
-use crate::record_batch::{RecordBatch, RecordBatchOptions};
+use arrow_array::builder::Decimal128Builder;
+use arrow_array::types::*;
+use arrow_array::*;
use arrow_cast::parse::Parser;
+use arrow_schema::*;
-use crate::csv::map_csv_error;
-use csv_crate::{ByteRecord, StringRecord};
+use crate::map_csv_error;
+use arrow_data::decimal::validate_decimal_precision;
+use csv::{ByteRecord, StringRecord};
use std::ops::Neg;
lazy_static! {
@@ -128,7 +126,7 @@ pub fn infer_file_schema<R: Read + Seek>(
delimiter: u8,
max_read_records: Option<usize>,
has_header: bool,
-) -> Result<(Schema, usize)> {
+) -> Result<(Schema, usize), ArrowError> {
let roptions = ReaderOptions {
delimiter: Some(delimiter),
max_read_records,
@@ -142,7 +140,7 @@ pub fn infer_file_schema<R: Read + Seek>(
fn infer_file_schema_with_csv_options<R: Read + Seek>(
mut reader: R,
roptions: ReaderOptions,
-) -> Result<(Schema, usize)> {
+) -> Result<(Schema, usize), ArrowError> {
let saved_offset = reader.seek(SeekFrom::Current(0))?;
let (schema, records_count) =
@@ -164,7 +162,7 @@ pub fn infer_reader_schema<R: Read>(
delimiter: u8,
max_read_records: Option<usize>,
has_header: bool,
-) -> Result<(Schema, usize)> {
+) -> Result<(Schema, usize), ArrowError> {
let roptions = ReaderOptions {
delimiter: Some(delimiter),
max_read_records,
@@ -177,7 +175,7 @@ pub fn infer_reader_schema<R: Read>(
fn infer_reader_schema_with_csv_options<R: Read>(
reader: R,
roptions: ReaderOptions,
-) -> Result<(Schema, usize)> {
+) -> Result<(Schema, usize), ArrowError> {
let mut csv_reader = Reader::build_csv_reader(
reader,
roptions.has_header,
@@ -268,7 +266,7 @@ pub fn infer_schema_from_files(
delimiter: u8,
max_read_records: Option<usize>,
has_header: bool,
-) -> Result<Schema> {
+) -> Result<Schema, ArrowError> {
let mut schemas = vec![];
let mut records_to_read = max_read_records.unwrap_or(usize::MAX);
@@ -302,7 +300,7 @@ pub struct Reader<R: Read> {
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<usize>>,
/// File reader
- reader: csv_crate::Reader<R>,
+ reader: csv::Reader<R>,
/// Current line number
line_number: usize,
/// Maximum number of rows to read
@@ -410,8 +408,8 @@ impl<R: Read> Reader<R> {
escape: Option<u8>,
quote: Option<u8>,
terminator: Option<u8>,
- ) -> csv_crate::Reader<R> {
- let mut reader_builder = csv_crate::ReaderBuilder::new();
+ ) -> csv::Reader<R> {
+ let mut reader_builder = csv::ReaderBuilder::new();
reader_builder.has_headers(has_header);
if let Some(c) = delimiter {
@@ -422,13 +420,13 @@ impl<R: Read> Reader<R> {
reader_builder.quote(c);
}
if let Some(t) = terminator {
- reader_builder.terminator(csv_crate::Terminator::Any(t));
+ reader_builder.terminator(csv::Terminator::Any(t));
}
reader_builder.from_reader(reader)
}
fn from_csv_reader(
- mut csv_reader: csv_crate::Reader<R>,
+ mut csv_reader: csv::Reader<R>,
schema: SchemaRef,
has_header: bool,
batch_size: usize,
@@ -474,7 +472,7 @@ impl<R: Read> Reader<R> {
}
impl<R: Read> Iterator for Reader<R> {
- type Item = Result<RecordBatch>;
+ type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
let remaining = self.end - self.line_number;
@@ -522,8 +520,8 @@ impl<R: Read> Iterator for Reader<R> {
}
}
-/// parses a slice of [csv_crate::StringRecord] into a
-/// [RecordBatch](crate::record_batch::RecordBatch).
+/// parses a slice of [csv::StringRecord] into a
+/// [RecordBatch]
fn parse(
rows: &[StringRecord],
fields: &[Field],
@@ -531,13 +529,13 @@ fn parse(
projection: Option<&Vec<usize>>,
line_number: usize,
datetime_format: Option<&str>,
-) -> Result<RecordBatch> {
+) -> Result<RecordBatch, ArrowError> {
let projection: Vec<usize> = match projection {
Some(v) => v.clone(),
None => fields.iter().enumerate().map(|(i, _)| i).collect(),
};
- let arrays: Result<Vec<ArrayRef>> = projection
+ let arrays: Result<Vec<ArrayRef>, _> = projection
.iter()
.map(|i| {
let i = *i;
@@ -706,7 +704,7 @@ fn build_decimal_array(
col_idx: usize,
precision: u8,
scale: u8,
-) -> Result<ArrayRef> {
+) -> Result<ArrayRef, ArrowError> {
let mut decimal_builder = Decimal128Builder::with_capacity(rows.len());
for row in rows {
let col_s = row.get(col_idx);
@@ -720,7 +718,7 @@ fn build_decimal_array(
// append null
decimal_builder.append_null();
} else {
- let decimal_value: Result<i128> =
+ let decimal_value: Result<i128, _> =
parse_decimal_with_parameter(s, precision, scale);
match decimal_value {
Ok(v) => {
@@ -743,7 +741,11 @@ fn build_decimal_array(
// Parse the string format decimal value to i128 format and checking the precision and scale.
// The result i128 value can't be out of bounds.
-fn parse_decimal_with_parameter(s: &str, precision: u8, scale: u8) -> Result<i128> {
+fn parse_decimal_with_parameter(
+ s: &str,
+ precision: u8,
+ scale: u8,
+) -> Result<i128, ArrowError> {
if PARSE_DECIMAL_RE.is_match(s) {
let mut offset = s.len();
let len = s.len();
@@ -808,7 +810,7 @@ fn parse_decimal_with_parameter(s: &str, precision: u8, scale: u8) -> Result<i12
// Parse the string format decimal value to i128 format without checking the precision and scale.
// Like "125.12" to 12512_i128.
#[cfg(test)]
-fn parse_decimal(s: &str) -> Result<i128> {
+fn parse_decimal(s: &str) -> Result<i128, ArrowError> {
if PARSE_DECIMAL_RE.is_match(s) {
let mut offset = s.len();
// each byte is digit、'-' or '.'
@@ -856,7 +858,7 @@ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
rows: &[StringRecord],
col_idx: usize,
format: Option<&str>,
-) -> Result<ArrayRef> {
+) -> Result<ArrayRef, ArrowError> {
rows.iter()
.enumerate()
.map(|(row_index, row)| {
@@ -884,7 +886,7 @@ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
None => Ok(None),
}
})
- .collect::<Result<PrimitiveArray<T>>>()
+ .collect::<Result<PrimitiveArray<T>, ArrowError>>()
.map(|e| Arc::new(e) as ArrayRef)
}
@@ -893,7 +895,7 @@ fn build_boolean_array(
line_number: usize,
rows: &[StringRecord],
col_idx: usize,
-) -> Result<ArrayRef> {
+) -> Result<ArrayRef, ArrowError> {
rows.iter()
.enumerate()
.map(|(row_index, row)| {
@@ -918,7 +920,7 @@ fn build_boolean_array(
None => Ok(None),
}
})
- .collect::<Result<BooleanArray>>()
+ .collect::<Result<BooleanArray, _>>()
.map(|e| Arc::new(e) as ArrayRef)
}
@@ -988,16 +990,14 @@ impl ReaderBuilder {
/// # Example
///
/// ```
- /// extern crate arrow;
- ///
- /// use arrow::csv;
+ /// use arrow_csv::{Reader, ReaderBuilder};
/// use std::fs::File;
///
- /// fn example() -> csv::Reader<File> {
+ /// fn example() -> Reader<File> {
/// let file = File::open("test/data/uk_cities_with_headers.csv").unwrap();
///
/// // create a builder, inferring the schema with the first 100 records
- /// let builder = csv::ReaderBuilder::new().infer_schema(Some(100));
+ /// let builder = ReaderBuilder::new().infer_schema(Some(100));
///
/// let reader = builder.build(file).unwrap();
///
@@ -1086,7 +1086,7 @@ impl ReaderBuilder {
}
/// Create a new `Reader` from the `ReaderBuilder`
- pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<Reader<R>> {
+ pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<Reader<R>, ArrowError> {
// check if schema should be inferred
let delimiter = self.delimiter.unwrap_or(b',');
let schema = match self.schema {
@@ -1131,436 +1131,11 @@ impl ReaderBuilder {
mod tests {
use super::*;
- use std::fs::File;
- use std::io::{Cursor, Write};
+ use std::io::Write;
use tempfile::NamedTempFile;
- use crate::array::*;
- use crate::compute::cast;
- use crate::datatypes::Field;
use chrono::prelude::*;
- #[test]
- fn test_csv() {
- let _: Vec<()> = vec![None, Some("%Y-%m-%dT%H:%M:%S%.f%:z".to_string())]
- .into_iter()
- .map(|format| {
- let schema = Schema::new(vec![
- Field::new("city", DataType::Utf8, false),
- Field::new("lat", DataType::Float64, false),
- Field::new("lng", DataType::Float64, false),
- ]);
-
- let file = File::open("test/data/uk_cities.csv").unwrap();
- let mut csv = Reader::new(
- file,
- Arc::new(schema.clone()),
- false,
- None,
- 1024,
- None,
- None,
- format,
- );
- assert_eq!(Arc::new(schema), csv.schema());
- let batch = csv.next().unwrap().unwrap();
- assert_eq!(37, batch.num_rows());
- assert_eq!(3, batch.num_columns());
-
- // access data from a primitive array
- let lat = batch
- .column(1)
- .as_any()
- .downcast_ref::<Float64Array>()
- .unwrap();
- assert_eq!(57.653484, lat.value(0));
-
- // access data from a string array (ListArray<u8>)
- let city = batch
- .column(0)
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
-
- assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
- })
- .collect();
- }
-
- #[test]
- fn test_csv_schema_metadata() {
- let mut metadata = std::collections::HashMap::new();
- metadata.insert("foo".to_owned(), "bar".to_owned());
- let schema = Schema::new_with_metadata(
- vec![
- Field::new("city", DataType::Utf8, false),
- Field::new("lat", DataType::Float64, false),
- Field::new("lng", DataType::Float64, false),
- ],
- metadata.clone(),
- );
-
- let file = File::open("test/data/uk_cities.csv").unwrap();
-
- let mut csv = Reader::new(
- file,
- Arc::new(schema.clone()),
- false,
- None,
- 1024,
- None,
- None,
- None,
- );
- assert_eq!(Arc::new(schema), csv.schema());
- let batch = csv.next().unwrap().unwrap();
- assert_eq!(37, batch.num_rows());
- assert_eq!(3, batch.num_columns());
-
- assert_eq!(&metadata, batch.schema().metadata());
- }
-
- #[test]
- fn test_csv_reader_with_decimal() {
- let schema = Schema::new(vec![
- Field::new("city", DataType::Utf8, false),
- Field::new("lat", DataType::Decimal128(38, 6), false),
- Field::new("lng", DataType::Decimal128(38, 6), false),
- ]);
-
- let file = File::open("test/data/decimal_test.csv").unwrap();
-
- let mut csv =
- Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None);
- let batch = csv.next().unwrap().unwrap();
- // access data from a primitive array
- let lat = batch
- .column(1)
- .as_any()
- .downcast_ref::<Decimal128Array>()
- .unwrap();
-
- assert_eq!("57.653484", lat.value_as_string(0));
- assert_eq!("53.002666", lat.value_as_string(1));
- assert_eq!("52.412811", lat.value_as_string(2));
- assert_eq!("51.481583", lat.value_as_string(3));
- assert_eq!("12.123456", lat.value_as_string(4));
- assert_eq!("50.760000", lat.value_as_string(5));
- assert_eq!("0.123000", lat.value_as_string(6));
- assert_eq!("123.000000", lat.value_as_string(7));
- assert_eq!("123.000000", lat.value_as_string(8));
- assert_eq!("-50.760000", lat.value_as_string(9));
- }
-
- #[test]
- fn test_csv_from_buf_reader() {
- let schema = Schema::new(vec![
- Field::new("city", DataType::Utf8, false),
- Field::new("lat", DataType::Float64, false),
- Field::new("lng", DataType::Float64, false),
- ]);
-
- let file_with_headers =
- File::open("test/data/uk_cities_with_headers.csv").unwrap();
- let file_without_headers = File::open("test/data/uk_cities.csv").unwrap();
- let both_files = file_with_headers
- .chain(Cursor::new("\n".to_string()))
- .chain(file_without_headers);
- let mut csv = Reader::from_reader(
- both_files,
- Arc::new(schema),
- true,
- None,
- 1024,
- None,
- None,
- None,
- );
- let batch = csv.next().unwrap().unwrap();
- assert_eq!(74, batch.num_rows());
- assert_eq!(3, batch.num_columns());
- }
-
- #[test]
- fn test_csv_with_schema_inference() {
- let file = File::open("test/data/uk_cities_with_headers.csv").unwrap();
-
- let builder = ReaderBuilder::new().has_header(true).infer_schema(None);
-
- let mut csv = builder.build(file).unwrap();
- let expected_schema = Schema::new(vec![
- Field::new("city", DataType::Utf8, true),
- Field::new("lat", DataType::Float64, true),
- Field::new("lng", DataType::Float64, true),
- ]);
- assert_eq!(Arc::new(expected_schema), csv.schema());
- let batch = csv.next().unwrap().unwrap();
- assert_eq!(37, batch.num_rows());
- assert_eq!(3, batch.num_columns());
-
- // access data from a primitive array
- let lat = batch
- .column(1)
- .as_any()
- .downcast_ref::<Float64Array>()
- .unwrap();
- assert_eq!(57.653484, lat.value(0));
-
- // access data from a string array (ListArray<u8>)
- let city = batch
- .column(0)
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
-
- assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
- }
-
- #[test]
- fn test_csv_with_schema_inference_no_headers() {
- let file = File::open("test/data/uk_cities.csv").unwrap();
-
- let builder = ReaderBuilder::new().infer_schema(None);
-
- let mut csv = builder.build(file).unwrap();
-
- // csv field names should be 'column_{number}'
- let schema = csv.schema();
- assert_eq!("column_1", schema.field(0).name());
- assert_eq!("column_2", schema.field(1).name());
- assert_eq!("column_3", schema.field(2).name());
- let batch = csv.next().unwrap().unwrap();
- let batch_schema = batch.schema();
-
- assert_eq!(schema, batch_schema);
- assert_eq!(37, batch.num_rows());
- assert_eq!(3, batch.num_columns());
-
- // access data from a primitive array
- let lat = batch
- .column(1)
- .as_any()
- .downcast_ref::<Float64Array>()
- .unwrap();
- assert_eq!(57.653484, lat.value(0));
-
- // access data from a string array (ListArray<u8>)
- let city = batch
- .column(0)
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
-
- assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
- }
-
- #[test]
- fn test_csv_builder_with_bounds() {
- let file = File::open("test/data/uk_cities.csv").unwrap();
-
- // Set the bounds to the lines 0, 1 and 2.
- let mut csv = ReaderBuilder::new().with_bounds(0, 2).build(file).unwrap();
- let batch = csv.next().unwrap().unwrap();
-
- // access data from a string array (ListArray<u8>)
- let city = batch
- .column(0)
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
-
- // The value on line 0 is within the bounds
- assert_eq!("Elgin, Scotland, the UK", city.value(0));
-
- // The value on line 13 is outside of the bounds. Therefore
- // the call to .value() will panic.
- let result = std::panic::catch_unwind(|| city.value(13));
- assert!(result.is_err());
- }
-
- #[test]
- fn test_csv_with_projection() {
- let schema = Schema::new(vec![
- Field::new("city", DataType::Utf8, false),
- Field::new("lat", DataType::Float64, false),
- Field::new("lng", DataType::Float64, false),
- ]);
-
- let file = File::open("test/data/uk_cities.csv").unwrap();
-
- let mut csv = Reader::new(
- file,
- Arc::new(schema),
- false,
- None,
- 1024,
- None,
- Some(vec![0, 1]),
- None,
- );
- let projected_schema = Arc::new(Schema::new(vec![
- Field::new("city", DataType::Utf8, false),
- Field::new("lat", DataType::Float64, false),
- ]));
- assert_eq!(projected_schema, csv.schema());
- let batch = csv.next().unwrap().unwrap();
- assert_eq!(projected_schema, batch.schema());
- assert_eq!(37, batch.num_rows());
- assert_eq!(2, batch.num_columns());
- }
-
- #[test]
- fn test_csv_with_dictionary() {
- let schema = Schema::new(vec![
- Field::new(
- "city",
- DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
- false,
- ),
- Field::new("lat", DataType::Float64, false),
- Field::new("lng", DataType::Float64, false),
- ]);
-
- let file = File::open("test/data/uk_cities.csv").unwrap();
-
- let mut csv = Reader::new(
- file,
- Arc::new(schema),
- false,
- None,
- 1024,
- None,
- Some(vec![0, 1]),
- None,
- );
- let projected_schema = Arc::new(Schema::new(vec![
- Field::new(
- "city",
- DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
- false,
- ),
- Field::new("lat", DataType::Float64, false),
- ]));
- assert_eq!(projected_schema, csv.schema());
- let batch = csv.next().unwrap().unwrap();
- assert_eq!(projected_schema, batch.schema());
- assert_eq!(37, batch.num_rows());
- assert_eq!(2, batch.num_columns());
-
- let strings = cast(batch.column(0), &DataType::Utf8).unwrap();
- let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
-
- assert_eq!(strings.value(0), "Elgin, Scotland, the UK");
- assert_eq!(strings.value(4), "Eastbourne, East Sussex, UK");
- assert_eq!(strings.value(29), "Uckfield, East Sussex, UK");
- }
-
- #[test]
- fn test_nulls() {
- let schema = Schema::new(vec![
- Field::new("c_int", DataType::UInt64, false),
- Field::new("c_float", DataType::Float32, true),
- Field::new("c_string", DataType::Utf8, false),
- ]);
-
- let file = File::open("test/data/null_test.csv").unwrap();
-
- let mut csv =
- Reader::new(file, Arc::new(schema), true, None, 1024, None, None, None);
- let batch = csv.next().unwrap().unwrap();
-
- assert!(!batch.column(1).is_null(0));
- assert!(!batch.column(1).is_null(1));
- assert!(batch.column(1).is_null(2));
- assert!(!batch.column(1).is_null(3));
- assert!(!batch.column(1).is_null(4));
- }
-
- #[test]
- fn test_nulls_with_inference() {
- let file = File::open("test/data/various_types.csv").unwrap();
-
- let builder = ReaderBuilder::new()
- .infer_schema(None)
- .has_header(true)
- .with_delimiter(b'|')
- .with_batch_size(512)
- .with_projection(vec![0, 1, 2, 3, 4, 5]);
-
- let mut csv = builder.build(file).unwrap();
- let batch = csv.next().unwrap().unwrap();
-
- assert_eq!(7, batch.num_rows());
- assert_eq!(6, batch.num_columns());
-
- let schema = batch.schema();
-
- assert_eq!(&DataType::Int64, schema.field(0).data_type());
- assert_eq!(&DataType::Float64, schema.field(1).data_type());
- assert_eq!(&DataType::Float64, schema.field(2).data_type());
- assert_eq!(&DataType::Boolean, schema.field(3).data_type());
- assert_eq!(&DataType::Date32, schema.field(4).data_type());
- assert_eq!(&DataType::Date64, schema.field(5).data_type());
-
- let names: Vec<&str> =
- schema.fields().iter().map(|x| x.name().as_str()).collect();
- assert_eq!(
- names,
- vec![
- "c_int",
- "c_float",
- "c_string",
- "c_bool",
- "c_date",
- "c_datetime"
- ]
- );
-
- assert!(schema.field(0).is_nullable());
- assert!(schema.field(1).is_nullable());
- assert!(schema.field(2).is_nullable());
- assert!(schema.field(3).is_nullable());
- assert!(schema.field(4).is_nullable());
- assert!(schema.field(5).is_nullable());
-
- assert!(!batch.column(1).is_null(0));
- assert!(!batch.column(1).is_null(1));
- assert!(batch.column(1).is_null(2));
- assert!(!batch.column(1).is_null(3));
- assert!(!batch.column(1).is_null(4));
- }
-
- #[test]
- fn test_parse_invalid_csv() {
- let file = File::open("test/data/various_types_invalid.csv").unwrap();
-
- let schema = Schema::new(vec![
- Field::new("c_int", DataType::UInt64, false),
- Field::new("c_float", DataType::Float32, false),
- Field::new("c_string", DataType::Utf8, false),
- Field::new("c_bool", DataType::Boolean, false),
- ]);
-
- let builder = ReaderBuilder::new()
- .with_schema(Arc::new(schema))
- .has_header(true)
- .with_delimiter(b'|')
- .with_batch_size(512)
- .with_projection(vec![0, 1, 2, 3]);
-
- let mut csv = builder.build(file).unwrap();
- match csv.next() {
- Some(e) => match e {
- Err(e) => assert_eq!(
- "ParseError(\"Error while parsing value 4.x4 for column 1 at line 4\")",
- format!("{:?}", e)
- ),
- Ok(_) => panic!("should have failed"),
- },
- None => panic!("should have failed"),
- }
- }
-
#[test]
fn test_infer_field_schema() {
assert_eq!(infer_field_schema("A", None), DataType::Utf8);
@@ -1771,21 +1346,21 @@ mod tests {
}
#[test]
- fn test_infer_schema_from_multiple_files() -> Result<()> {
- let mut csv1 = NamedTempFile::new()?;
- let mut csv2 = NamedTempFile::new()?;
- let csv3 = NamedTempFile::new()?; // empty csv file should be skipped
- let mut csv4 = NamedTempFile::new()?;
- writeln!(csv1, "c1,c2,c3")?;
- writeln!(csv1, "1,\"foo\",0.5")?;
- writeln!(csv1, "3,\"bar\",1")?;
- writeln!(csv1, "3,\"bar\",2e-06")?;
+ fn test_infer_schema_from_multiple_files() {
+ let mut csv1 = NamedTempFile::new().unwrap();
+ let mut csv2 = NamedTempFile::new().unwrap();
+ let csv3 = NamedTempFile::new().unwrap(); // empty csv file should be skipped
+ let mut csv4 = NamedTempFile::new().unwrap();
+ writeln!(csv1, "c1,c2,c3").unwrap();
+ writeln!(csv1, "1,\"foo\",0.5").unwrap();
+ writeln!(csv1, "3,\"bar\",1").unwrap();
+ writeln!(csv1, "3,\"bar\",2e-06").unwrap();
// reading csv2 will set c2 to optional
- writeln!(csv2, "c1,c2,c3,c4")?;
- writeln!(csv2, "10,,3.14,true")?;
+ writeln!(csv2, "c1,c2,c3,c4").unwrap();
+ writeln!(csv2, "10,,3.14,true").unwrap();
// reading csv4 will set c3 to optional
- writeln!(csv4, "c1,c2,c3")?;
- writeln!(csv4, "10,\"foo\",")?;
+ writeln!(csv4, "c1,c2,c3").unwrap();
+ writeln!(csv4, "10,\"foo\",").unwrap();
let schema = infer_schema_from_files(
&[
@@ -1797,7 +1372,8 @@ mod tests {
b',',
Some(4), // only csv1 and csv2 should be read
true,
- )?;
+ )
+ .unwrap();
assert_eq!(schema.fields().len(), 4);
assert!(schema.field(0).is_nullable());
@@ -1809,8 +1385,6 @@ mod tests {
assert_eq!(&DataType::Utf8, schema.field(1).data_type());
assert_eq!(&DataType::Float64, schema.field(2).data_type());
assert_eq!(&DataType::Boolean, schema.field(3).data_type());
-
- Ok(())
}
#[test]
diff --git a/arrow/src/csv/writer.rs b/arrow-csv/src/writer.rs
similarity index 81%
rename from arrow/src/csv/writer.rs
rename to arrow-csv/src/writer.rs
index b2d02fe84..674b33369 100644
--- a/arrow/src/csv/writer.rs
+++ b/arrow-csv/src/writer.rs
@@ -23,11 +23,11 @@
//! Example:
//!
//! ```
-//! use arrow::array::*;
-//! use arrow::csv;
-//! use arrow::datatypes::*;
-//! use arrow::record_batch::RecordBatch;
-//! use std::sync::Arc;
+//! # use arrow_array::*;
+//! # use arrow_array::types::*;
+//! # use arrow_csv::Writer;
+//! # use arrow_schema::*;
+//! # use std::sync::Arc;
//!
//! let schema = Schema::new(vec![
//! Field::new("c1", DataType::Utf8, false),
@@ -56,7 +56,7 @@
//!
//! let mut output = Vec::with_capacity(1024);
//!
-//! let mut writer = csv::Writer::new(&mut output);
+//! let mut writer = Writer::new(&mut output);
//! let batches = vec![&batch, &batch];
//! for batch in batches {
//! writer.write(batch).unwrap();
@@ -64,15 +64,14 @@
//! ```
use arrow_array::timezone::Tz;
+use arrow_array::types::*;
+use arrow_array::*;
+use arrow_cast::display::{lexical_to_string, make_string_from_decimal};
+use arrow_schema::*;
use chrono::{DateTime, Utc};
use std::io::Write;
-use crate::array::*;
-use crate::csv::map_csv_error;
-use crate::datatypes::*;
-use crate::error::{ArrowError, Result};
-use crate::record_batch::RecordBatch;
-use crate::util::display::{lexical_to_string, make_string_from_decimal};
+use crate::map_csv_error;
const DEFAULT_DATE_FORMAT: &str = "%F";
const DEFAULT_TIME_FORMAT: &str = "%T";
@@ -81,7 +80,7 @@ const DEFAULT_TIMESTAMP_TZ_FORMAT: &str = "%FT%H:%M:%S.%9f%:z";
fn write_primitive_value<T>(array: &ArrayRef, i: usize) -> String
where
- T: ArrowNumericType,
+ T: ArrowPrimitiveType,
T::Native: lexical_core::ToLexical,
{
let c = array.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
@@ -92,7 +91,7 @@ where
#[derive(Debug)]
pub struct Writer<W: Write> {
/// The object to write to
- writer: csv_crate::Writer<W>,
+ writer: csv::Writer<W>,
/// Whether file should be written with headers. Defaults to `true`
has_headers: bool,
/// The date format for date arrays
@@ -115,7 +114,7 @@ impl<W: Write> Writer<W> {
/// Create a new CsvWriter from a writable object, with default options
pub fn new(writer: W) -> Self {
let delimiter = b',';
- let mut builder = csv_crate::WriterBuilder::new();
+ let mut builder = csv::WriterBuilder::new();
let writer = builder.delimiter(delimiter).from_writer(writer);
Writer {
writer,
@@ -135,7 +134,7 @@ impl<W: Write> Writer<W> {
batch: &[ArrayRef],
row_index: usize,
buffer: &mut [String],
- ) -> Result<()> {
+ ) -> Result<(), ArrowError> {
// TODO: it'd be more efficient if we could create `record: Vec<&[u8]>
for (col_index, item) in buffer.iter_mut().enumerate() {
let col = &batch[col_index];
@@ -242,7 +241,7 @@ impl<W: Write> Writer<W> {
time_zone: Option<&String>,
row_index: usize,
col: &ArrayRef,
- ) -> Result<String> {
+ ) -> Result<String, ArrowError> {
use TimeUnit::*;
let datetime = match time_unit {
Second => col
@@ -283,7 +282,7 @@ impl<W: Write> Writer<W> {
}
/// Write a vector of record batches to a writable object
- pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+ pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
let num_columns = batch.num_columns();
if self.beginning {
if self.has_headers {
@@ -305,7 +304,7 @@ impl<W: Write> Writer<W> {
.iter()
.map(|array| match array.data_type() {
DataType::Dictionary(_, value_type) => {
- crate::compute::kernels::cast::cast(array, value_type)
+ arrow_cast::cast(array, value_type)
.expect("cannot cast dictionary to underlying values")
}
_ => array.clone(),
@@ -365,16 +364,14 @@ impl WriterBuilder {
/// # Example
///
/// ```
- /// extern crate arrow;
+ /// # use arrow_csv::{Writer, WriterBuilder};
+ /// # use std::fs::File;
///
- /// use arrow::csv;
- /// use std::fs::File;
- ///
- /// fn example() -> csv::Writer<File> {
+ /// fn example() -> Writer<File> {
/// let file = File::create("target/out.csv").unwrap();
///
/// // create a builder that doesn't write headers
- /// let builder = csv::WriterBuilder::new().has_headers(false);
+ /// let builder = WriterBuilder::new().has_headers(false);
/// let writer = builder.build(file);
///
/// writer
@@ -423,7 +420,7 @@ impl WriterBuilder {
/// Create a new `Writer`
pub fn build<W: Write>(self, writer: W) -> Writer<W> {
let delimiter = self.delimiter.unwrap_or(b',');
- let mut builder = csv_crate::WriterBuilder::new();
+ let mut builder = csv::WriterBuilder::new();
let writer = builder.delimiter(delimiter).from_writer(writer);
Writer {
writer,
@@ -452,13 +449,8 @@ impl WriterBuilder {
mod tests {
use super::*;
- use crate::csv::Reader;
- use crate::datatypes::{Field, Schema};
- #[cfg(feature = "chrono-tz")]
- use crate::util::string_writer::StringWriter;
- use crate::util::test_util::get_temp_file;
- use std::fs::File;
- use std::io::{Cursor, Read};
+ use crate::Reader;
+ use std::io::{Cursor, Read, Seek};
use std::sync::Arc;
#[test]
@@ -512,15 +504,17 @@ mod tests {
)
.unwrap();
- let file = get_temp_file("columns.csv", &[]);
+ let mut file = tempfile::tempfile().unwrap();
- let mut writer = Writer::new(file);
+ let mut writer = Writer::new(&mut file);
let batches = vec![&batch, &batch];
for batch in batches {
writer.write(batch).unwrap();
}
+ drop(writer);
+
// check that file was written successfully
- let mut file = File::open("target/debug/testdata/columns.csv").unwrap();
+ file.rewind().unwrap();
let mut buffer: Vec<u8> = vec![];
file.read_to_end(&mut buffer).unwrap();
@@ -571,20 +565,21 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo
)
.unwrap();
- let file = get_temp_file("custom_options.csv", &[]);
+ let mut file = tempfile::tempfile().unwrap();
let builder = WriterBuilder::new()
.has_headers(false)
.with_delimiter(b'|')
.with_time_format("%r".to_string());
- let mut writer = builder.build(file);
+ let mut writer = builder.build(&mut file);
let batches = vec![&batch];
for batch in batches {
writer.write(batch).unwrap();
}
+ drop(writer);
// check that file was written successfully
- let mut file = File::open("target/debug/testdata/custom_options.csv").unwrap();
+ file.rewind().unwrap();
let mut buffer: Vec<u8> = vec![];
file.read_to_end(&mut buffer).unwrap();
@@ -595,105 +590,6 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo
);
}
- #[cfg(feature = "chrono-tz")]
- #[test]
- fn test_export_csv_timestamps() {
- let schema = Schema::new(vec![
- Field::new(
- "c1",
- DataType::Timestamp(
- TimeUnit::Millisecond,
- Some("Australia/Sydney".to_string()),
- ),
- true,
- ),
- Field::new("c2", DataType::Timestamp(TimeUnit::Millisecond, None), true),
- ]);
-
- let c1 = TimestampMillisecondArray::from(
- // 1555584887 converts to 2019-04-18, 20:54:47 in time zone Australia/Sydney (AEST).
- // The offset (difference to UTC) is +10:00.
- // 1635577147 converts to 2021-10-30 17:59:07 in time zone Australia/Sydney (AEDT)
- // The offset (difference to UTC) is +11:00. Note that daylight savings is in effect on 2021-10-30.
- //
- vec![Some(1555584887378), Some(1635577147000)],
- )
- .with_timezone("Australia/Sydney".to_string());
- let c2 = TimestampMillisecondArray::from(vec![
- Some(1555584887378),
- Some(1635577147000),
- ]);
- let batch =
- RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)])
- .unwrap();
-
- let sw = StringWriter::new();
- let mut writer = Writer::new(sw);
- let batches = vec![&batch];
- for batch in batches {
- writer.write(batch).unwrap();
- }
-
- let left = "c1,c2
-2019-04-18T20:54:47.378000000+10:00,2019-04-18T10:54:47.378000000
-2021-10-30T17:59:07.000000000+11:00,2021-10-30T06:59:07.000000000\n";
- let right = writer.writer.into_inner().map(|s| s.to_string());
- assert_eq!(Some(left.to_string()), right.ok());
- }
-
- #[cfg(not(feature = "chrono-tz"))]
- #[test]
- fn test_conversion_consistency() {
- // test if we can serialize and deserialize whilst retaining the same type information/ precision
-
- let schema = Schema::new(vec![
- Field::new("c1", DataType::Date32, false),
- Field::new("c2", DataType::Date64, false),
- ]);
-
- let c1 = Date32Array::from(vec![3, 2, 1]);
- let c2 = Date64Array::from(vec![3, 2, 1]);
-
- let batch = RecordBatch::try_new(
- Arc::new(schema.clone()),
- vec![Arc::new(c1), Arc::new(c2)],
- )
- .unwrap();
-
- let builder = WriterBuilder::new().has_headers(false);
-
- let mut buf: Cursor<Vec<u8>> = Default::default();
- // drop the writer early to release the borrow.
- {
- let mut writer = builder.build(&mut buf);
- writer.write(&batch).unwrap();
- }
- buf.set_position(0);
-
- let mut reader = Reader::new(
- buf,
- Arc::new(schema),
- false,
- None,
- 3,
- // starting at row 2 and up to row 6.
- None,
- None,
- None,
- );
- let rb = reader.next().unwrap().unwrap();
- let c1 = rb.column(0).as_any().downcast_ref::<Date32Array>().unwrap();
- let c2 = rb.column(1).as_any().downcast_ref::<Date64Array>().unwrap();
-
- let actual = c1.into_iter().collect::<Vec<_>>();
- let expected = vec![Some(3), Some(2), Some(1)];
- assert_eq!(actual, expected);
- let actual = c2.into_iter().collect::<Vec<_>>();
- let expected = vec![Some(3), Some(2), Some(1)];
- assert_eq!(actual, expected);
- }
-
- #[cfg(feature = "chrono-tz")]
#[test]
fn test_conversion_consistency() {
// test if we can serialize and deserialize whilst retaining the same type information/ precision
diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index 6c30df6bd..cc9421de7 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -46,6 +46,7 @@ ahash = { version = "0.8", default-features = false, features = ["runtime-rng"]
[dependencies]
arrow-buffer = { version = "26.0.0", path = "../arrow-buffer" }
arrow-cast = { version = "26.0.0", path = "../arrow-cast" }
+arrow-csv = { version = "26.0.0", path = "../arrow-csv", optional = true }
arrow-data = { version = "26.0.0", path = "../arrow-data" }
arrow-schema = { version = "26.0.0", path = "../arrow-schema" }
arrow-array = { version = "26.0.0", path = "../arrow-array" }
@@ -57,10 +58,8 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng"
num = { version = "0.4", default-features = false, features = ["std"] }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
hashbrown = { version = "0.12", default-features = false }
-csv_crate = { version = "1.1", default-features = false, optional = true, package = "csv" }
regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] }
regex-syntax = { version = "0.6.27", default-features = false, features = ["unicode"] }
-lazy_static = { version = "1.4", default-features = false }
packed_simd = { version = "0.3", default-features = false, optional = true, package = "packed_simd_2" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
comfy-table = { version = "6.0", optional = true, default-features = false }
@@ -75,7 +74,7 @@ features = ["prettyprint", "ipc_compression", "dyn_cmp_dict", "ffi", "pyarrow"]
[features]
default = ["csv", "ipc", "json"]
ipc_compression = ["ipc", "arrow-ipc/lz4", "arrow-ipc/zstd"]
-csv = ["csv_crate"]
+csv = ["arrow-csv"]
ipc = ["arrow-ipc"]
json = ["serde_json"]
simd = ["packed_simd"]
@@ -265,5 +264,9 @@ harness = false
required-features = ["test_utils"]
[[test]]
-name = "ipc_integration"
+name = "ipc"
required-features = ["test_utils", "ipc"]
+
+[[test]]
+name = "csv"
+required-features = ["csv"]
diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs
index b2fa30d26..d1e009584 100644
--- a/arrow/src/lib.rs
+++ b/arrow/src/lib.rs
@@ -306,7 +306,7 @@ pub mod bitmap {
pub mod array;
pub mod compute;
#[cfg(feature = "csv")]
-pub mod csv;
+pub use arrow_csv as csv;
pub mod datatypes;
pub mod error;
#[cfg(feature = "ffi")]
diff --git a/arrow/tests/csv.rs b/arrow/tests/csv.rs
new file mode 100644
index 000000000..11e1b30e1
--- /dev/null
+++ b/arrow/tests/csv.rs
@@ -0,0 +1,486 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fs::File;
+use std::io::{Cursor, Read};
+use std::sync::Arc;
+
+use arrow_array::*;
+use arrow_csv::{Reader, ReaderBuilder};
+use arrow_schema::*;
+
+#[test]
+#[cfg(feature = "chrono-tz")]
+fn test_export_csv_timestamps() {
+ let schema = Schema::new(vec![
+ Field::new(
+ "c1",
+ DataType::Timestamp(
+ TimeUnit::Millisecond,
+ Some("Australia/Sydney".to_string()),
+ ),
+ true,
+ ),
+ Field::new("c2", DataType::Timestamp(TimeUnit::Millisecond, None), true),
+ ]);
+
+ let c1 = TimestampMillisecondArray::from(
+ // 1555584887 converts to 2019-04-18, 20:54:47 in time zone Australia/Sydney (AEST).
+ // The offset (difference to UTC) is +10:00.
+ // 1635577147 converts to 2021-10-30 17:59:07 in time zone Australia/Sydney (AEDT)
+ // The offset (difference to UTC) is +11:00. Note that daylight savings is in effect on 2021-10-30.
+ //
+ vec![Some(1555584887378), Some(1635577147000)],
+ )
+ .with_timezone("Australia/Sydney".to_string());
+ let c2 =
+ TimestampMillisecondArray::from(vec![Some(1555584887378), Some(1635577147000)]);
+ let batch =
+ RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
+
+ let mut sw = Vec::new();
+ let mut writer = arrow_csv::Writer::new(&mut sw);
+ let batches = vec![&batch];
+ for batch in batches {
+ writer.write(batch).unwrap();
+ }
+ drop(writer);
+
+ let left = "c1,c2
+2019-04-18T20:54:47.378000000+10:00,2019-04-18T10:54:47.378000000
+2021-10-30T17:59:07.000000000+11:00,2021-10-30T06:59:07.000000000\n";
+ let right = String::from_utf8(sw).unwrap();
+ assert_eq!(left, right);
+}
+
+#[test]
+fn test_csv() {
+ let _: Vec<()> = vec![None, Some("%Y-%m-%dT%H:%M:%S%.f%:z".to_string())]
+ .into_iter()
+ .map(|format| {
+ let schema = Schema::new(vec![
+ Field::new("city", DataType::Utf8, false),
+ Field::new("lat", DataType::Float64, false),
+ Field::new("lng", DataType::Float64, false),
+ ]);
+
+ let file = File::open("test/data/uk_cities.csv").unwrap();
+ let mut csv = Reader::new(
+ file,
+ Arc::new(schema.clone()),
+ false,
+ None,
+ 1024,
+ None,
+ None,
+ format,
+ );
+ assert_eq!(Arc::new(schema), csv.schema());
+ let batch = csv.next().unwrap().unwrap();
+ assert_eq!(37, batch.num_rows());
+ assert_eq!(3, batch.num_columns());
+
+ // access data from a primitive array
+ let lat = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap();
+ assert_eq!(57.653484, lat.value(0));
+
+ // access data from a string array (ListArray<u8>)
+ let city = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+
+ assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
+ })
+ .collect();
+}
+
+#[test]
+fn test_csv_schema_metadata() {
+ let mut metadata = std::collections::HashMap::new();
+ metadata.insert("foo".to_owned(), "bar".to_owned());
+ let schema = Schema::new_with_metadata(
+ vec![
+ Field::new("city", DataType::Utf8, false),
+ Field::new("lat", DataType::Float64, false),
+ Field::new("lng", DataType::Float64, false),
+ ],
+ metadata.clone(),
+ );
+
+ let file = File::open("test/data/uk_cities.csv").unwrap();
+
+ let mut csv = Reader::new(
+ file,
+ Arc::new(schema.clone()),
+ false,
+ None,
+ 1024,
+ None,
+ None,
+ None,
+ );
+ assert_eq!(Arc::new(schema), csv.schema());
+ let batch = csv.next().unwrap().unwrap();
+ assert_eq!(37, batch.num_rows());
+ assert_eq!(3, batch.num_columns());
+
+ assert_eq!(&metadata, batch.schema().metadata());
+}
+
+#[test]
+fn test_csv_reader_with_decimal() {
+ let schema = Schema::new(vec![
+ Field::new("city", DataType::Utf8, false),
+ Field::new("lat", DataType::Decimal128(38, 6), false),
+ Field::new("lng", DataType::Decimal128(38, 6), false),
+ ]);
+
+ let file = File::open("test/data/decimal_test.csv").unwrap();
+
+ let mut csv =
+ Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None);
+ let batch = csv.next().unwrap().unwrap();
+ // access data from a primitive array
+ let lat = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<Decimal128Array>()
+ .unwrap();
+
+ assert_eq!("57.653484", lat.value_as_string(0));
+ assert_eq!("53.002666", lat.value_as_string(1));
+ assert_eq!("52.412811", lat.value_as_string(2));
+ assert_eq!("51.481583", lat.value_as_string(3));
+ assert_eq!("12.123456", lat.value_as_string(4));
+ assert_eq!("50.760000", lat.value_as_string(5));
+ assert_eq!("0.123000", lat.value_as_string(6));
+ assert_eq!("123.000000", lat.value_as_string(7));
+ assert_eq!("123.000000", lat.value_as_string(8));
+ assert_eq!("-50.760000", lat.value_as_string(9));
+}
+
+#[test]
+fn test_csv_from_buf_reader() {
+ let schema = Schema::new(vec![
+ Field::new("city", DataType::Utf8, false),
+ Field::new("lat", DataType::Float64, false),
+ Field::new("lng", DataType::Float64, false),
+ ]);
+
+ let file_with_headers = File::open("test/data/uk_cities_with_headers.csv").unwrap();
+ let file_without_headers = File::open("test/data/uk_cities.csv").unwrap();
+ let both_files = file_with_headers
+ .chain(Cursor::new("\n".to_string()))
+ .chain(file_without_headers);
+ let mut csv = Reader::from_reader(
+ both_files,
+ Arc::new(schema),
+ true,
+ None,
+ 1024,
+ None,
+ None,
+ None,
+ );
+ let batch = csv.next().unwrap().unwrap();
+ assert_eq!(74, batch.num_rows());
+ assert_eq!(3, batch.num_columns());
+}
+
+#[test]
+fn test_csv_with_schema_inference() {
+ let file = File::open("test/data/uk_cities_with_headers.csv").unwrap();
+
+ let builder = ReaderBuilder::new().has_header(true).infer_schema(None);
+
+ let mut csv = builder.build(file).unwrap();
+ let expected_schema = Schema::new(vec![
+ Field::new("city", DataType::Utf8, true),
+ Field::new("lat", DataType::Float64, true),
+ Field::new("lng", DataType::Float64, true),
+ ]);
+ assert_eq!(Arc::new(expected_schema), csv.schema());
+ let batch = csv.next().unwrap().unwrap();
+ assert_eq!(37, batch.num_rows());
+ assert_eq!(3, batch.num_columns());
+
+ // access data from a primitive array
+ let lat = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap();
+ assert_eq!(57.653484, lat.value(0));
+
+ // access data from a string array (ListArray<u8>)
+ let city = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+
+ assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
+}
+
+#[test]
+fn test_csv_with_schema_inference_no_headers() {
+ let file = File::open("test/data/uk_cities.csv").unwrap();
+
+ let builder = ReaderBuilder::new().infer_schema(None);
+
+ let mut csv = builder.build(file).unwrap();
+
+ // csv field names should be 'column_{number}'
+ let schema = csv.schema();
+ assert_eq!("column_1", schema.field(0).name());
+ assert_eq!("column_2", schema.field(1).name());
+ assert_eq!("column_3", schema.field(2).name());
+ let batch = csv.next().unwrap().unwrap();
+ let batch_schema = batch.schema();
+
+ assert_eq!(schema, batch_schema);
+ assert_eq!(37, batch.num_rows());
+ assert_eq!(3, batch.num_columns());
+
+ // access data from a primitive array
+ let lat = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap();
+ assert_eq!(57.653484, lat.value(0));
+
+ // access data from a string array (ListArray<u8>)
+ let city = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+
+ assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
+}
+
+#[test]
+fn test_csv_builder_with_bounds() {
+ let file = File::open("test/data/uk_cities.csv").unwrap();
+
+ // Set the bounds to the lines 0, 1 and 2.
+ let mut csv = ReaderBuilder::new().with_bounds(0, 2).build(file).unwrap();
+ let batch = csv.next().unwrap().unwrap();
+
+ // access data from a string array (ListArray<u8>)
+ let city = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+
+ // The value on line 0 is within the bounds
+ assert_eq!("Elgin, Scotland, the UK", city.value(0));
+
+ // The value on line 13 is outside of the bounds. Therefore
+ // the call to .value() will panic.
+ let result = std::panic::catch_unwind(|| city.value(13));
+ assert!(result.is_err());
+}
+
+#[test]
+fn test_csv_with_projection() {
+ let schema = Schema::new(vec![
+ Field::new("city", DataType::Utf8, false),
+ Field::new("lat", DataType::Float64, false),
+ Field::new("lng", DataType::Float64, false),
+ ]);
+
+ let file = File::open("test/data/uk_cities.csv").unwrap();
+
+ let mut csv = Reader::new(
+ file,
+ Arc::new(schema),
+ false,
+ None,
+ 1024,
+ None,
+ Some(vec![0, 1]),
+ None,
+ );
+ let projected_schema = Arc::new(Schema::new(vec![
+ Field::new("city", DataType::Utf8, false),
+ Field::new("lat", DataType::Float64, false),
+ ]));
+ assert_eq!(projected_schema, csv.schema());
+ let batch = csv.next().unwrap().unwrap();
+ assert_eq!(projected_schema, batch.schema());
+ assert_eq!(37, batch.num_rows());
+ assert_eq!(2, batch.num_columns());
+}
+
+#[test]
+fn test_csv_with_dictionary() {
+ let schema = Schema::new(vec![
+ Field::new(
+ "city",
+ DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
+ false,
+ ),
+ Field::new("lat", DataType::Float64, false),
+ Field::new("lng", DataType::Float64, false),
+ ]);
+
+ let file = File::open("test/data/uk_cities.csv").unwrap();
+
+ let mut csv = Reader::new(
+ file,
+ Arc::new(schema),
+ false,
+ None,
+ 1024,
+ None,
+ Some(vec![0, 1]),
+ None,
+ );
+ let projected_schema = Arc::new(Schema::new(vec![
+ Field::new(
+ "city",
+ DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
+ false,
+ ),
+ Field::new("lat", DataType::Float64, false),
+ ]));
+ assert_eq!(projected_schema, csv.schema());
+ let batch = csv.next().unwrap().unwrap();
+ assert_eq!(projected_schema, batch.schema());
+ assert_eq!(37, batch.num_rows());
+ assert_eq!(2, batch.num_columns());
+
+ let strings = arrow_cast::cast(batch.column(0), &DataType::Utf8).unwrap();
+ let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+
+ assert_eq!(strings.value(0), "Elgin, Scotland, the UK");
+ assert_eq!(strings.value(4), "Eastbourne, East Sussex, UK");
+ assert_eq!(strings.value(29), "Uckfield, East Sussex, UK");
+}
+
+#[test]
+fn test_nulls() {
+ let schema = Schema::new(vec![
+ Field::new("c_int", DataType::UInt64, false),
+ Field::new("c_float", DataType::Float32, true),
+ Field::new("c_string", DataType::Utf8, false),
+ ]);
+
+ let file = File::open("test/data/null_test.csv").unwrap();
+
+ let mut csv = Reader::new(file, Arc::new(schema), true, None, 1024, None, None, None);
+ let batch = csv.next().unwrap().unwrap();
+
+ assert!(!batch.column(1).is_null(0));
+ assert!(!batch.column(1).is_null(1));
+ assert!(batch.column(1).is_null(2));
+ assert!(!batch.column(1).is_null(3));
+ assert!(!batch.column(1).is_null(4));
+}
+
+#[test]
+fn test_nulls_with_inference() {
+ let file = File::open("test/data/various_types.csv").unwrap();
+
+ let builder = ReaderBuilder::new()
+ .infer_schema(None)
+ .has_header(true)
+ .with_delimiter(b'|')
+ .with_batch_size(512)
+ .with_projection(vec![0, 1, 2, 3, 4, 5]);
+
+ let mut csv = builder.build(file).unwrap();
+ let batch = csv.next().unwrap().unwrap();
+
+ assert_eq!(7, batch.num_rows());
+ assert_eq!(6, batch.num_columns());
+
+ let schema = batch.schema();
+
+ assert_eq!(&DataType::Int64, schema.field(0).data_type());
+ assert_eq!(&DataType::Float64, schema.field(1).data_type());
+ assert_eq!(&DataType::Float64, schema.field(2).data_type());
+ assert_eq!(&DataType::Boolean, schema.field(3).data_type());
+ assert_eq!(&DataType::Date32, schema.field(4).data_type());
+ assert_eq!(&DataType::Date64, schema.field(5).data_type());
+
+ let names: Vec<&str> = schema.fields().iter().map(|x| x.name().as_str()).collect();
+ assert_eq!(
+ names,
+ vec![
+ "c_int",
+ "c_float",
+ "c_string",
+ "c_bool",
+ "c_date",
+ "c_datetime"
+ ]
+ );
+
+ assert!(schema.field(0).is_nullable());
+ assert!(schema.field(1).is_nullable());
+ assert!(schema.field(2).is_nullable());
+ assert!(schema.field(3).is_nullable());
+ assert!(schema.field(4).is_nullable());
+ assert!(schema.field(5).is_nullable());
+
+ assert!(!batch.column(1).is_null(0));
+ assert!(!batch.column(1).is_null(1));
+ assert!(batch.column(1).is_null(2));
+ assert!(!batch.column(1).is_null(3));
+ assert!(!batch.column(1).is_null(4));
+}
+
+#[test]
+fn test_parse_invalid_csv() {
+ let file = File::open("test/data/various_types_invalid.csv").unwrap();
+
+ let schema = Schema::new(vec![
+ Field::new("c_int", DataType::UInt64, false),
+ Field::new("c_float", DataType::Float32, false),
+ Field::new("c_string", DataType::Utf8, false),
+ Field::new("c_bool", DataType::Boolean, false),
+ ]);
+
+ let builder = ReaderBuilder::new()
+ .with_schema(Arc::new(schema))
+ .has_header(true)
+ .with_delimiter(b'|')
+ .with_batch_size(512)
+ .with_projection(vec![0, 1, 2, 3]);
+
+ let mut csv = builder.build(file).unwrap();
+ match csv.next() {
+ Some(e) => match e {
+ Err(e) => assert_eq!(
+ "ParseError(\"Error while parsing value 4.x4 for column 1 at line 4\")",
+ format!("{:?}", e)
+ ),
+ Ok(_) => panic!("should have failed"),
+ },
+ None => panic!("should have failed"),
+ }
+}
diff --git a/arrow/tests/ipc_integration.rs b/arrow/tests/ipc.rs
similarity index 100%
rename from arrow/tests/ipc_integration.rs
rename to arrow/tests/ipc.rs