You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/08 20:17:19 UTC

[arrow-rs] branch master updated: Split out arrow-csv (#2594) (#3044)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new fe3318bba Split out arrow-csv (#2594) (#3044)
fe3318bba is described below

commit fe3318bba24abfe572fa037a0b8805a15bdf5c45
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Nov 9 09:17:14 2022 +1300

    Split out arrow-csv (#2594) (#3044)
    
    * Split out arrow-csv (#2594)
    
    * Fix doc
    
    * Update arrow-csv/Cargo.toml
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 .github/workflows/arrow.yml                  |   5 +
 .github/workflows/arrow_flight.yml           |   1 +
 .github/workflows/dev_pr/labeler.yml         |   1 +
 .github/workflows/integration.yml            |   1 +
 .github/workflows/miri.yaml                  |   1 +
 .github/workflows/parquet.yml                |   1 +
 Cargo.toml                                   |   1 +
 arrow-csv/Cargo.toml                         |  53 +++
 arrow/src/csv/mod.rs => arrow-csv/src/lib.rs |   8 +-
 {arrow/src/csv => arrow-csv/src}/reader.rs   | 546 +++------------------------
 {arrow/src/csv => arrow-csv/src}/writer.rs   | 172 ++-------
 arrow/Cargo.toml                             |  11 +-
 arrow/src/lib.rs                             |   2 +-
 arrow/tests/csv.rs                           | 486 ++++++++++++++++++++++++
 arrow/tests/{ipc_integration.rs => ipc.rs}   |   0
 15 files changed, 656 insertions(+), 633 deletions(-)

diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml
index d930086ef..461e7e87e 100644
--- a/.github/workflows/arrow.yml
+++ b/.github/workflows/arrow.yml
@@ -34,6 +34,7 @@ on:
       - arrow-select/**
       - arrow-integration-test/**
       - arrow-ipc/**
+      - arrow-csv/**
       - .github/**
 
 jobs:
@@ -64,6 +65,8 @@ jobs:
         run: cargo test -p arrow-cast --all-features
       - name: Test arrow-ipc with all features
         run: cargo test -p arrow-ipc --all-features
+      - name: Test arrow-csv with all features
+        run: cargo test -p arrow-csv --all-features
       - name: Test arrow-integration-test with all features
         run: cargo test -p arrow-integration-test --all-features
       - name: Test arrow with default features
@@ -174,5 +177,7 @@ jobs:
         run: cargo clippy -p arrow-cast --all-targets --all-features -- -D warnings
       - name: Clippy arrow-ipc with all features
         run: cargo clippy -p arrow-ipc --all-targets --all-features -- -D warnings
+      - name: Clippy arrow-csv with all features
+        run: cargo clippy -p arrow-csv --all-targets --all-features -- -D warnings
       - name: Clippy arrow
         run: cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi,ipc_compression,dyn_cmp_dict,dyn_arith_dict,chrono-tz --all-targets -- -D warnings
diff --git a/.github/workflows/arrow_flight.yml b/.github/workflows/arrow_flight.yml
index ded4f5a67..1f830ccf2 100644
--- a/.github/workflows/arrow_flight.yml
+++ b/.github/workflows/arrow_flight.yml
@@ -36,6 +36,7 @@ on:
       - arrow-select/**
       - arrow-flight/**
       - arrow-ipc/**
+      - arrow-csv/**
       - .github/**
 
 jobs:
diff --git a/.github/workflows/dev_pr/labeler.yml b/.github/workflows/dev_pr/labeler.yml
index 17ebf54de..04c7c080e 100644
--- a/.github/workflows/dev_pr/labeler.yml
+++ b/.github/workflows/dev_pr/labeler.yml
@@ -24,6 +24,7 @@ arrow:
   - arrow-schema/**/*
   - arrow-select/**/*
   - arrow-ipc/**/*
+  - arrow-csv/**/*
 
 arrow-flight:
   - arrow-flight/**/*
diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml
index 8566230ea..9418b9042 100644
--- a/.github/workflows/integration.yml
+++ b/.github/workflows/integration.yml
@@ -32,6 +32,7 @@ on:
       - arrow-schema/**
       - arrow-select/**
       - arrow-ipc/**
+      - arrow-csv/**
       - arrow-pyarrow-integration-testing/**
       - arrow-integration-test/**
       - arrow-integration-testing/**
diff --git a/.github/workflows/miri.yaml b/.github/workflows/miri.yaml
index 2e85c9dd9..e58ebdb35 100644
--- a/.github/workflows/miri.yaml
+++ b/.github/workflows/miri.yaml
@@ -32,6 +32,7 @@ on:
       - arrow-schema/**
       - arrow-select/**
       - arrow-ipc/**
+      - arrow-csv/**
       - .github/**
 
 jobs:
diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml
index b369ef69b..4f3cf5f80 100644
--- a/.github/workflows/parquet.yml
+++ b/.github/workflows/parquet.yml
@@ -35,6 +35,7 @@ on:
       - arrow-schema/**
       - arrow-select/**
       - arrow-ipc/**
+      - arrow-csv/**
       - parquet/**
       - .github/**
 
diff --git a/Cargo.toml b/Cargo.toml
index 0ab4853c6..18497d043 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,6 +21,7 @@ members = [
         "arrow-array",
         "arrow-buffer",
         "arrow-cast",
+        "arrow-csv",
         "arrow-data",
         "arrow-flight",
         "arrow-integration-test",
diff --git a/arrow-csv/Cargo.toml b/arrow-csv/Cargo.toml
new file mode 100644
index 000000000..d40cef0db
--- /dev/null
+++ b/arrow-csv/Cargo.toml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "arrow-csv"
+version = "26.0.0"
+description = "Support for parsing CSV format into the Arrow format"
+homepage = "https://github.com/apache/arrow-rs"
+repository = "https://github.com/apache/arrow-rs"
+authors = ["Apache Arrow <de...@arrow.apache.org>"]
+license = "Apache-2.0"
+keywords = ["arrow"]
+include = [
+    "benches/*.rs",
+    "src/**/*.rs",
+    "Cargo.toml",
+]
+edition = "2021"
+rust-version = "1.62"
+
+[lib]
+name = "arrow_csv"
+path = "src/lib.rs"
+bench = false
+
+[dependencies]
+arrow-array = { version = "26.0.0", path = "../arrow-array" }
+arrow-buffer = { version = "26.0.0", path = "../arrow-buffer" }
+arrow-cast = { version = "26.0.0", path = "../arrow-cast" }
+arrow-data = { version = "26.0.0", path = "../arrow-data" }
+arrow-schema = { version = "26.0.0", path = "../arrow-schema" }
+chrono = { version = "0.4", default-features = false, features = ["clock"] }
+csv = { version = "1.1", default-features = false }
+lazy_static = { version = "1.4", default-features = false }
+lexical-core = { version = "^0.8", default-features = false }
+regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] }
+
+[dev-dependencies]
+tempfile = "3.3"
diff --git a/arrow/src/csv/mod.rs b/arrow-csv/src/lib.rs
similarity index 85%
rename from arrow/src/csv/mod.rs
rename to arrow-csv/src/lib.rs
index 46ba7d71e..a45cf082d 100644
--- a/arrow/src/csv/mod.rs
+++ b/arrow-csv/src/lib.rs
@@ -27,14 +27,14 @@ pub use self::writer::Writer;
 pub use self::writer::WriterBuilder;
 use arrow_schema::ArrowError;
 
-fn map_csv_error(error: csv_crate::Error) -> ArrowError {
+fn map_csv_error(error: csv::Error) -> ArrowError {
     match error.kind() {
-        csv_crate::ErrorKind::Io(error) => ArrowError::CsvError(error.to_string()),
-        csv_crate::ErrorKind::Utf8 { pos: _, err } => ArrowError::CsvError(format!(
+        csv::ErrorKind::Io(error) => ArrowError::CsvError(error.to_string()),
+        csv::ErrorKind::Utf8 { pos: _, err } => ArrowError::CsvError(format!(
             "Encountered UTF-8 error while reading CSV file: {}",
             err
         )),
-        csv_crate::ErrorKind::UnequalLengths {
+        csv::ErrorKind::UnequalLengths {
             expected_len, len, ..
         } => ArrowError::CsvError(format!(
             "Encountered unequal lengths between records on CSV file. Expected {} \
diff --git a/arrow/src/csv/reader.rs b/arrow-csv/src/reader.rs
similarity index 75%
rename from arrow/src/csv/reader.rs
rename to arrow-csv/src/reader.rs
index 404f37e93..459c23ad2 100644
--- a/arrow/src/csv/reader.rs
+++ b/arrow-csv/src/reader.rs
@@ -22,11 +22,11 @@
 //!
 //! Example:
 //!
-//! ```
-//! use arrow::csv;
-//! use arrow::datatypes::{DataType, Field, Schema};
-//! use std::fs::File;
-//! use std::sync::Arc;
+//! ```no_run
+//! # use arrow_schema::*;
+//! # use arrow_csv::Reader;
+//! # use std::fs::File;
+//! # use std::sync::Arc;
 //!
 //! let schema = Schema::new(vec![
 //!     Field::new("city", DataType::Utf8, false),
@@ -36,7 +36,7 @@
 //!
 //! let file = File::open("test/data/uk_cities.csv").unwrap();
 //!
-//! let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None);
+//! let mut csv = Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None);
 //! let batch = csv.next().unwrap().unwrap();
 //! ```
 
@@ -49,17 +49,15 @@ use std::fs::File;
 use std::io::{Read, Seek, SeekFrom};
 use std::sync::Arc;
 
-use crate::array::{
-    ArrayRef, BooleanArray, Decimal128Builder, DictionaryArray, PrimitiveArray,
-    StringArray,
-};
-use crate::datatypes::*;
-use crate::error::{ArrowError, Result};
-use crate::record_batch::{RecordBatch, RecordBatchOptions};
+use arrow_array::builder::Decimal128Builder;
+use arrow_array::types::*;
+use arrow_array::*;
 use arrow_cast::parse::Parser;
+use arrow_schema::*;
 
-use crate::csv::map_csv_error;
-use csv_crate::{ByteRecord, StringRecord};
+use crate::map_csv_error;
+use arrow_data::decimal::validate_decimal_precision;
+use csv::{ByteRecord, StringRecord};
 use std::ops::Neg;
 
 lazy_static! {
@@ -128,7 +126,7 @@ pub fn infer_file_schema<R: Read + Seek>(
     delimiter: u8,
     max_read_records: Option<usize>,
     has_header: bool,
-) -> Result<(Schema, usize)> {
+) -> Result<(Schema, usize), ArrowError> {
     let roptions = ReaderOptions {
         delimiter: Some(delimiter),
         max_read_records,
@@ -142,7 +140,7 @@ pub fn infer_file_schema<R: Read + Seek>(
 fn infer_file_schema_with_csv_options<R: Read + Seek>(
     mut reader: R,
     roptions: ReaderOptions,
-) -> Result<(Schema, usize)> {
+) -> Result<(Schema, usize), ArrowError> {
     let saved_offset = reader.seek(SeekFrom::Current(0))?;
 
     let (schema, records_count) =
@@ -164,7 +162,7 @@ pub fn infer_reader_schema<R: Read>(
     delimiter: u8,
     max_read_records: Option<usize>,
     has_header: bool,
-) -> Result<(Schema, usize)> {
+) -> Result<(Schema, usize), ArrowError> {
     let roptions = ReaderOptions {
         delimiter: Some(delimiter),
         max_read_records,
@@ -177,7 +175,7 @@ pub fn infer_reader_schema<R: Read>(
 fn infer_reader_schema_with_csv_options<R: Read>(
     reader: R,
     roptions: ReaderOptions,
-) -> Result<(Schema, usize)> {
+) -> Result<(Schema, usize), ArrowError> {
     let mut csv_reader = Reader::build_csv_reader(
         reader,
         roptions.has_header,
@@ -268,7 +266,7 @@ pub fn infer_schema_from_files(
     delimiter: u8,
     max_read_records: Option<usize>,
     has_header: bool,
-) -> Result<Schema> {
+) -> Result<Schema, ArrowError> {
     let mut schemas = vec![];
     let mut records_to_read = max_read_records.unwrap_or(usize::MAX);
 
@@ -302,7 +300,7 @@ pub struct Reader<R: Read> {
     /// Optional projection for which columns to load (zero-based column indices)
     projection: Option<Vec<usize>>,
     /// File reader
-    reader: csv_crate::Reader<R>,
+    reader: csv::Reader<R>,
     /// Current line number
     line_number: usize,
     /// Maximum number of rows to read
@@ -410,8 +408,8 @@ impl<R: Read> Reader<R> {
         escape: Option<u8>,
         quote: Option<u8>,
         terminator: Option<u8>,
-    ) -> csv_crate::Reader<R> {
-        let mut reader_builder = csv_crate::ReaderBuilder::new();
+    ) -> csv::Reader<R> {
+        let mut reader_builder = csv::ReaderBuilder::new();
         reader_builder.has_headers(has_header);
 
         if let Some(c) = delimiter {
@@ -422,13 +420,13 @@ impl<R: Read> Reader<R> {
             reader_builder.quote(c);
         }
         if let Some(t) = terminator {
-            reader_builder.terminator(csv_crate::Terminator::Any(t));
+            reader_builder.terminator(csv::Terminator::Any(t));
         }
         reader_builder.from_reader(reader)
     }
 
     fn from_csv_reader(
-        mut csv_reader: csv_crate::Reader<R>,
+        mut csv_reader: csv::Reader<R>,
         schema: SchemaRef,
         has_header: bool,
         batch_size: usize,
@@ -474,7 +472,7 @@ impl<R: Read> Reader<R> {
 }
 
 impl<R: Read> Iterator for Reader<R> {
-    type Item = Result<RecordBatch>;
+    type Item = Result<RecordBatch, ArrowError>;
 
     fn next(&mut self) -> Option<Self::Item> {
         let remaining = self.end - self.line_number;
@@ -522,8 +520,8 @@ impl<R: Read> Iterator for Reader<R> {
     }
 }
 
-/// parses a slice of [csv_crate::StringRecord] into a
-/// [RecordBatch](crate::record_batch::RecordBatch).
+/// parses a slice of [csv::StringRecord] into a
+/// [RecordBatch]
 fn parse(
     rows: &[StringRecord],
     fields: &[Field],
@@ -531,13 +529,13 @@ fn parse(
     projection: Option<&Vec<usize>>,
     line_number: usize,
     datetime_format: Option<&str>,
-) -> Result<RecordBatch> {
+) -> Result<RecordBatch, ArrowError> {
     let projection: Vec<usize> = match projection {
         Some(v) => v.clone(),
         None => fields.iter().enumerate().map(|(i, _)| i).collect(),
     };
 
-    let arrays: Result<Vec<ArrayRef>> = projection
+    let arrays: Result<Vec<ArrayRef>, _> = projection
         .iter()
         .map(|i| {
             let i = *i;
@@ -706,7 +704,7 @@ fn build_decimal_array(
     col_idx: usize,
     precision: u8,
     scale: u8,
-) -> Result<ArrayRef> {
+) -> Result<ArrayRef, ArrowError> {
     let mut decimal_builder = Decimal128Builder::with_capacity(rows.len());
     for row in rows {
         let col_s = row.get(col_idx);
@@ -720,7 +718,7 @@ fn build_decimal_array(
                     // append null
                     decimal_builder.append_null();
                 } else {
-                    let decimal_value: Result<i128> =
+                    let decimal_value: Result<i128, _> =
                         parse_decimal_with_parameter(s, precision, scale);
                     match decimal_value {
                         Ok(v) => {
@@ -743,7 +741,11 @@ fn build_decimal_array(
 
 // Parse the string format decimal value to i128 format and checking the precision and scale.
 // The result i128 value can't be out of bounds.
-fn parse_decimal_with_parameter(s: &str, precision: u8, scale: u8) -> Result<i128> {
+fn parse_decimal_with_parameter(
+    s: &str,
+    precision: u8,
+    scale: u8,
+) -> Result<i128, ArrowError> {
     if PARSE_DECIMAL_RE.is_match(s) {
         let mut offset = s.len();
         let len = s.len();
@@ -808,7 +810,7 @@ fn parse_decimal_with_parameter(s: &str, precision: u8, scale: u8) -> Result<i12
 // Parse the string format decimal value to i128 format without checking the precision and scale.
 // Like "125.12" to 12512_i128.
 #[cfg(test)]
-fn parse_decimal(s: &str) -> Result<i128> {
+fn parse_decimal(s: &str) -> Result<i128, ArrowError> {
     if PARSE_DECIMAL_RE.is_match(s) {
         let mut offset = s.len();
         // each byte is digit、'-' or '.'
@@ -856,7 +858,7 @@ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
     rows: &[StringRecord],
     col_idx: usize,
     format: Option<&str>,
-) -> Result<ArrayRef> {
+) -> Result<ArrayRef, ArrowError> {
     rows.iter()
         .enumerate()
         .map(|(row_index, row)| {
@@ -884,7 +886,7 @@ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
                 None => Ok(None),
             }
         })
-        .collect::<Result<PrimitiveArray<T>>>()
+        .collect::<Result<PrimitiveArray<T>, ArrowError>>()
         .map(|e| Arc::new(e) as ArrayRef)
 }
 
@@ -893,7 +895,7 @@ fn build_boolean_array(
     line_number: usize,
     rows: &[StringRecord],
     col_idx: usize,
-) -> Result<ArrayRef> {
+) -> Result<ArrayRef, ArrowError> {
     rows.iter()
         .enumerate()
         .map(|(row_index, row)| {
@@ -918,7 +920,7 @@ fn build_boolean_array(
                 None => Ok(None),
             }
         })
-        .collect::<Result<BooleanArray>>()
+        .collect::<Result<BooleanArray, _>>()
         .map(|e| Arc::new(e) as ArrayRef)
 }
 
@@ -988,16 +990,14 @@ impl ReaderBuilder {
     /// # Example
     ///
     /// ```
-    /// extern crate arrow;
-    ///
-    /// use arrow::csv;
+    /// use arrow_csv::{Reader, ReaderBuilder};
     /// use std::fs::File;
     ///
-    /// fn example() -> csv::Reader<File> {
+    /// fn example() -> Reader<File> {
     ///     let file = File::open("test/data/uk_cities_with_headers.csv").unwrap();
     ///
     ///     // create a builder, inferring the schema with the first 100 records
-    ///     let builder = csv::ReaderBuilder::new().infer_schema(Some(100));
+    ///     let builder = ReaderBuilder::new().infer_schema(Some(100));
     ///
     ///     let reader = builder.build(file).unwrap();
     ///
@@ -1086,7 +1086,7 @@ impl ReaderBuilder {
     }
 
     /// Create a new `Reader` from the `ReaderBuilder`
-    pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<Reader<R>> {
+    pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<Reader<R>, ArrowError> {
         // check if schema should be inferred
         let delimiter = self.delimiter.unwrap_or(b',');
         let schema = match self.schema {
@@ -1131,436 +1131,11 @@ impl ReaderBuilder {
 mod tests {
     use super::*;
 
-    use std::fs::File;
-    use std::io::{Cursor, Write};
+    use std::io::Write;
     use tempfile::NamedTempFile;
 
-    use crate::array::*;
-    use crate::compute::cast;
-    use crate::datatypes::Field;
     use chrono::prelude::*;
 
-    #[test]
-    fn test_csv() {
-        let _: Vec<()> = vec![None, Some("%Y-%m-%dT%H:%M:%S%.f%:z".to_string())]
-            .into_iter()
-            .map(|format| {
-                let schema = Schema::new(vec![
-                    Field::new("city", DataType::Utf8, false),
-                    Field::new("lat", DataType::Float64, false),
-                    Field::new("lng", DataType::Float64, false),
-                ]);
-
-                let file = File::open("test/data/uk_cities.csv").unwrap();
-                let mut csv = Reader::new(
-                    file,
-                    Arc::new(schema.clone()),
-                    false,
-                    None,
-                    1024,
-                    None,
-                    None,
-                    format,
-                );
-                assert_eq!(Arc::new(schema), csv.schema());
-                let batch = csv.next().unwrap().unwrap();
-                assert_eq!(37, batch.num_rows());
-                assert_eq!(3, batch.num_columns());
-
-                // access data from a primitive array
-                let lat = batch
-                    .column(1)
-                    .as_any()
-                    .downcast_ref::<Float64Array>()
-                    .unwrap();
-                assert_eq!(57.653484, lat.value(0));
-
-                // access data from a string array (ListArray<u8>)
-                let city = batch
-                    .column(0)
-                    .as_any()
-                    .downcast_ref::<StringArray>()
-                    .unwrap();
-
-                assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
-            })
-            .collect();
-    }
-
-    #[test]
-    fn test_csv_schema_metadata() {
-        let mut metadata = std::collections::HashMap::new();
-        metadata.insert("foo".to_owned(), "bar".to_owned());
-        let schema = Schema::new_with_metadata(
-            vec![
-                Field::new("city", DataType::Utf8, false),
-                Field::new("lat", DataType::Float64, false),
-                Field::new("lng", DataType::Float64, false),
-            ],
-            metadata.clone(),
-        );
-
-        let file = File::open("test/data/uk_cities.csv").unwrap();
-
-        let mut csv = Reader::new(
-            file,
-            Arc::new(schema.clone()),
-            false,
-            None,
-            1024,
-            None,
-            None,
-            None,
-        );
-        assert_eq!(Arc::new(schema), csv.schema());
-        let batch = csv.next().unwrap().unwrap();
-        assert_eq!(37, batch.num_rows());
-        assert_eq!(3, batch.num_columns());
-
-        assert_eq!(&metadata, batch.schema().metadata());
-    }
-
-    #[test]
-    fn test_csv_reader_with_decimal() {
-        let schema = Schema::new(vec![
-            Field::new("city", DataType::Utf8, false),
-            Field::new("lat", DataType::Decimal128(38, 6), false),
-            Field::new("lng", DataType::Decimal128(38, 6), false),
-        ]);
-
-        let file = File::open("test/data/decimal_test.csv").unwrap();
-
-        let mut csv =
-            Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None);
-        let batch = csv.next().unwrap().unwrap();
-        // access data from a primitive array
-        let lat = batch
-            .column(1)
-            .as_any()
-            .downcast_ref::<Decimal128Array>()
-            .unwrap();
-
-        assert_eq!("57.653484", lat.value_as_string(0));
-        assert_eq!("53.002666", lat.value_as_string(1));
-        assert_eq!("52.412811", lat.value_as_string(2));
-        assert_eq!("51.481583", lat.value_as_string(3));
-        assert_eq!("12.123456", lat.value_as_string(4));
-        assert_eq!("50.760000", lat.value_as_string(5));
-        assert_eq!("0.123000", lat.value_as_string(6));
-        assert_eq!("123.000000", lat.value_as_string(7));
-        assert_eq!("123.000000", lat.value_as_string(8));
-        assert_eq!("-50.760000", lat.value_as_string(9));
-    }
-
-    #[test]
-    fn test_csv_from_buf_reader() {
-        let schema = Schema::new(vec![
-            Field::new("city", DataType::Utf8, false),
-            Field::new("lat", DataType::Float64, false),
-            Field::new("lng", DataType::Float64, false),
-        ]);
-
-        let file_with_headers =
-            File::open("test/data/uk_cities_with_headers.csv").unwrap();
-        let file_without_headers = File::open("test/data/uk_cities.csv").unwrap();
-        let both_files = file_with_headers
-            .chain(Cursor::new("\n".to_string()))
-            .chain(file_without_headers);
-        let mut csv = Reader::from_reader(
-            both_files,
-            Arc::new(schema),
-            true,
-            None,
-            1024,
-            None,
-            None,
-            None,
-        );
-        let batch = csv.next().unwrap().unwrap();
-        assert_eq!(74, batch.num_rows());
-        assert_eq!(3, batch.num_columns());
-    }
-
-    #[test]
-    fn test_csv_with_schema_inference() {
-        let file = File::open("test/data/uk_cities_with_headers.csv").unwrap();
-
-        let builder = ReaderBuilder::new().has_header(true).infer_schema(None);
-
-        let mut csv = builder.build(file).unwrap();
-        let expected_schema = Schema::new(vec![
-            Field::new("city", DataType::Utf8, true),
-            Field::new("lat", DataType::Float64, true),
-            Field::new("lng", DataType::Float64, true),
-        ]);
-        assert_eq!(Arc::new(expected_schema), csv.schema());
-        let batch = csv.next().unwrap().unwrap();
-        assert_eq!(37, batch.num_rows());
-        assert_eq!(3, batch.num_columns());
-
-        // access data from a primitive array
-        let lat = batch
-            .column(1)
-            .as_any()
-            .downcast_ref::<Float64Array>()
-            .unwrap();
-        assert_eq!(57.653484, lat.value(0));
-
-        // access data from a string array (ListArray<u8>)
-        let city = batch
-            .column(0)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-
-        assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
-    }
-
-    #[test]
-    fn test_csv_with_schema_inference_no_headers() {
-        let file = File::open("test/data/uk_cities.csv").unwrap();
-
-        let builder = ReaderBuilder::new().infer_schema(None);
-
-        let mut csv = builder.build(file).unwrap();
-
-        // csv field names should be 'column_{number}'
-        let schema = csv.schema();
-        assert_eq!("column_1", schema.field(0).name());
-        assert_eq!("column_2", schema.field(1).name());
-        assert_eq!("column_3", schema.field(2).name());
-        let batch = csv.next().unwrap().unwrap();
-        let batch_schema = batch.schema();
-
-        assert_eq!(schema, batch_schema);
-        assert_eq!(37, batch.num_rows());
-        assert_eq!(3, batch.num_columns());
-
-        // access data from a primitive array
-        let lat = batch
-            .column(1)
-            .as_any()
-            .downcast_ref::<Float64Array>()
-            .unwrap();
-        assert_eq!(57.653484, lat.value(0));
-
-        // access data from a string array (ListArray<u8>)
-        let city = batch
-            .column(0)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-
-        assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
-    }
-
-    #[test]
-    fn test_csv_builder_with_bounds() {
-        let file = File::open("test/data/uk_cities.csv").unwrap();
-
-        // Set the bounds to the lines 0, 1 and 2.
-        let mut csv = ReaderBuilder::new().with_bounds(0, 2).build(file).unwrap();
-        let batch = csv.next().unwrap().unwrap();
-
-        // access data from a string array (ListArray<u8>)
-        let city = batch
-            .column(0)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-
-        // The value on line 0 is within the bounds
-        assert_eq!("Elgin, Scotland, the UK", city.value(0));
-
-        // The value on line 13 is outside of the bounds. Therefore
-        // the call to .value() will panic.
-        let result = std::panic::catch_unwind(|| city.value(13));
-        assert!(result.is_err());
-    }
-
-    #[test]
-    fn test_csv_with_projection() {
-        let schema = Schema::new(vec![
-            Field::new("city", DataType::Utf8, false),
-            Field::new("lat", DataType::Float64, false),
-            Field::new("lng", DataType::Float64, false),
-        ]);
-
-        let file = File::open("test/data/uk_cities.csv").unwrap();
-
-        let mut csv = Reader::new(
-            file,
-            Arc::new(schema),
-            false,
-            None,
-            1024,
-            None,
-            Some(vec![0, 1]),
-            None,
-        );
-        let projected_schema = Arc::new(Schema::new(vec![
-            Field::new("city", DataType::Utf8, false),
-            Field::new("lat", DataType::Float64, false),
-        ]));
-        assert_eq!(projected_schema, csv.schema());
-        let batch = csv.next().unwrap().unwrap();
-        assert_eq!(projected_schema, batch.schema());
-        assert_eq!(37, batch.num_rows());
-        assert_eq!(2, batch.num_columns());
-    }
-
-    #[test]
-    fn test_csv_with_dictionary() {
-        let schema = Schema::new(vec![
-            Field::new(
-                "city",
-                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
-                false,
-            ),
-            Field::new("lat", DataType::Float64, false),
-            Field::new("lng", DataType::Float64, false),
-        ]);
-
-        let file = File::open("test/data/uk_cities.csv").unwrap();
-
-        let mut csv = Reader::new(
-            file,
-            Arc::new(schema),
-            false,
-            None,
-            1024,
-            None,
-            Some(vec![0, 1]),
-            None,
-        );
-        let projected_schema = Arc::new(Schema::new(vec![
-            Field::new(
-                "city",
-                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
-                false,
-            ),
-            Field::new("lat", DataType::Float64, false),
-        ]));
-        assert_eq!(projected_schema, csv.schema());
-        let batch = csv.next().unwrap().unwrap();
-        assert_eq!(projected_schema, batch.schema());
-        assert_eq!(37, batch.num_rows());
-        assert_eq!(2, batch.num_columns());
-
-        let strings = cast(batch.column(0), &DataType::Utf8).unwrap();
-        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
-
-        assert_eq!(strings.value(0), "Elgin, Scotland, the UK");
-        assert_eq!(strings.value(4), "Eastbourne, East Sussex, UK");
-        assert_eq!(strings.value(29), "Uckfield, East Sussex, UK");
-    }
-
-    #[test]
-    fn test_nulls() {
-        let schema = Schema::new(vec![
-            Field::new("c_int", DataType::UInt64, false),
-            Field::new("c_float", DataType::Float32, true),
-            Field::new("c_string", DataType::Utf8, false),
-        ]);
-
-        let file = File::open("test/data/null_test.csv").unwrap();
-
-        let mut csv =
-            Reader::new(file, Arc::new(schema), true, None, 1024, None, None, None);
-        let batch = csv.next().unwrap().unwrap();
-
-        assert!(!batch.column(1).is_null(0));
-        assert!(!batch.column(1).is_null(1));
-        assert!(batch.column(1).is_null(2));
-        assert!(!batch.column(1).is_null(3));
-        assert!(!batch.column(1).is_null(4));
-    }
-
-    #[test]
-    fn test_nulls_with_inference() {
-        let file = File::open("test/data/various_types.csv").unwrap();
-
-        let builder = ReaderBuilder::new()
-            .infer_schema(None)
-            .has_header(true)
-            .with_delimiter(b'|')
-            .with_batch_size(512)
-            .with_projection(vec![0, 1, 2, 3, 4, 5]);
-
-        let mut csv = builder.build(file).unwrap();
-        let batch = csv.next().unwrap().unwrap();
-
-        assert_eq!(7, batch.num_rows());
-        assert_eq!(6, batch.num_columns());
-
-        let schema = batch.schema();
-
-        assert_eq!(&DataType::Int64, schema.field(0).data_type());
-        assert_eq!(&DataType::Float64, schema.field(1).data_type());
-        assert_eq!(&DataType::Float64, schema.field(2).data_type());
-        assert_eq!(&DataType::Boolean, schema.field(3).data_type());
-        assert_eq!(&DataType::Date32, schema.field(4).data_type());
-        assert_eq!(&DataType::Date64, schema.field(5).data_type());
-
-        let names: Vec<&str> =
-            schema.fields().iter().map(|x| x.name().as_str()).collect();
-        assert_eq!(
-            names,
-            vec![
-                "c_int",
-                "c_float",
-                "c_string",
-                "c_bool",
-                "c_date",
-                "c_datetime"
-            ]
-        );
-
-        assert!(schema.field(0).is_nullable());
-        assert!(schema.field(1).is_nullable());
-        assert!(schema.field(2).is_nullable());
-        assert!(schema.field(3).is_nullable());
-        assert!(schema.field(4).is_nullable());
-        assert!(schema.field(5).is_nullable());
-
-        assert!(!batch.column(1).is_null(0));
-        assert!(!batch.column(1).is_null(1));
-        assert!(batch.column(1).is_null(2));
-        assert!(!batch.column(1).is_null(3));
-        assert!(!batch.column(1).is_null(4));
-    }
-
-    #[test]
-    fn test_parse_invalid_csv() {
-        let file = File::open("test/data/various_types_invalid.csv").unwrap();
-
-        let schema = Schema::new(vec![
-            Field::new("c_int", DataType::UInt64, false),
-            Field::new("c_float", DataType::Float32, false),
-            Field::new("c_string", DataType::Utf8, false),
-            Field::new("c_bool", DataType::Boolean, false),
-        ]);
-
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
-            .has_header(true)
-            .with_delimiter(b'|')
-            .with_batch_size(512)
-            .with_projection(vec![0, 1, 2, 3]);
-
-        let mut csv = builder.build(file).unwrap();
-        match csv.next() {
-            Some(e) => match e {
-                Err(e) => assert_eq!(
-                    "ParseError(\"Error while parsing value 4.x4 for column 1 at line 4\")",
-                    format!("{:?}", e)
-                ),
-                Ok(_) => panic!("should have failed"),
-            },
-            None => panic!("should have failed"),
-        }
-    }
-
     #[test]
     fn test_infer_field_schema() {
         assert_eq!(infer_field_schema("A", None), DataType::Utf8);
@@ -1771,21 +1346,21 @@ mod tests {
     }
 
     #[test]
-    fn test_infer_schema_from_multiple_files() -> Result<()> {
-        let mut csv1 = NamedTempFile::new()?;
-        let mut csv2 = NamedTempFile::new()?;
-        let csv3 = NamedTempFile::new()?; // empty csv file should be skipped
-        let mut csv4 = NamedTempFile::new()?;
-        writeln!(csv1, "c1,c2,c3")?;
-        writeln!(csv1, "1,\"foo\",0.5")?;
-        writeln!(csv1, "3,\"bar\",1")?;
-        writeln!(csv1, "3,\"bar\",2e-06")?;
+    fn test_infer_schema_from_multiple_files() {
+        let mut csv1 = NamedTempFile::new().unwrap();
+        let mut csv2 = NamedTempFile::new().unwrap();
+        let csv3 = NamedTempFile::new().unwrap(); // empty csv file should be skipped
+        let mut csv4 = NamedTempFile::new().unwrap();
+        writeln!(csv1, "c1,c2,c3").unwrap();
+        writeln!(csv1, "1,\"foo\",0.5").unwrap();
+        writeln!(csv1, "3,\"bar\",1").unwrap();
+        writeln!(csv1, "3,\"bar\",2e-06").unwrap();
         // reading csv2 will set c2 to optional
-        writeln!(csv2, "c1,c2,c3,c4")?;
-        writeln!(csv2, "10,,3.14,true")?;
+        writeln!(csv2, "c1,c2,c3,c4").unwrap();
+        writeln!(csv2, "10,,3.14,true").unwrap();
         // reading csv4 will set c3 to optional
-        writeln!(csv4, "c1,c2,c3")?;
-        writeln!(csv4, "10,\"foo\",")?;
+        writeln!(csv4, "c1,c2,c3").unwrap();
+        writeln!(csv4, "10,\"foo\",").unwrap();
 
         let schema = infer_schema_from_files(
             &[
@@ -1797,7 +1372,8 @@ mod tests {
             b',',
             Some(4), // only csv1 and csv2 should be read
             true,
-        )?;
+        )
+        .unwrap();
 
         assert_eq!(schema.fields().len(), 4);
         assert!(schema.field(0).is_nullable());
@@ -1809,8 +1385,6 @@ mod tests {
         assert_eq!(&DataType::Utf8, schema.field(1).data_type());
         assert_eq!(&DataType::Float64, schema.field(2).data_type());
         assert_eq!(&DataType::Boolean, schema.field(3).data_type());
-
-        Ok(())
     }
 
     #[test]
diff --git a/arrow/src/csv/writer.rs b/arrow-csv/src/writer.rs
similarity index 81%
rename from arrow/src/csv/writer.rs
rename to arrow-csv/src/writer.rs
index b2d02fe84..674b33369 100644
--- a/arrow/src/csv/writer.rs
+++ b/arrow-csv/src/writer.rs
@@ -23,11 +23,11 @@
 //! Example:
 //!
 //! ```
-//! use arrow::array::*;
-//! use arrow::csv;
-//! use arrow::datatypes::*;
-//! use arrow::record_batch::RecordBatch;
-//! use std::sync::Arc;
+//! # use arrow_array::*;
+//! # use arrow_array::types::*;
+//! # use arrow_csv::Writer;
+//! # use arrow_schema::*;
+//! # use std::sync::Arc;
 //!
 //! let schema = Schema::new(vec![
 //!     Field::new("c1", DataType::Utf8, false),
@@ -56,7 +56,7 @@
 //!
 //! let mut output = Vec::with_capacity(1024);
 //!
-//! let mut writer = csv::Writer::new(&mut output);
+//! let mut writer = Writer::new(&mut output);
 //! let batches = vec![&batch, &batch];
 //! for batch in batches {
 //!     writer.write(batch).unwrap();
@@ -64,15 +64,14 @@
 //! ```
 
 use arrow_array::timezone::Tz;
+use arrow_array::types::*;
+use arrow_array::*;
+use arrow_cast::display::{lexical_to_string, make_string_from_decimal};
+use arrow_schema::*;
 use chrono::{DateTime, Utc};
 use std::io::Write;
 
-use crate::array::*;
-use crate::csv::map_csv_error;
-use crate::datatypes::*;
-use crate::error::{ArrowError, Result};
-use crate::record_batch::RecordBatch;
-use crate::util::display::{lexical_to_string, make_string_from_decimal};
+use crate::map_csv_error;
 
 const DEFAULT_DATE_FORMAT: &str = "%F";
 const DEFAULT_TIME_FORMAT: &str = "%T";
@@ -81,7 +80,7 @@ const DEFAULT_TIMESTAMP_TZ_FORMAT: &str = "%FT%H:%M:%S.%9f%:z";
 
 fn write_primitive_value<T>(array: &ArrayRef, i: usize) -> String
 where
-    T: ArrowNumericType,
+    T: ArrowPrimitiveType,
     T::Native: lexical_core::ToLexical,
 {
     let c = array.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
@@ -92,7 +91,7 @@ where
 #[derive(Debug)]
 pub struct Writer<W: Write> {
     /// The object to write to
-    writer: csv_crate::Writer<W>,
+    writer: csv::Writer<W>,
     /// Whether file should be written with headers. Defaults to `true`
     has_headers: bool,
     /// The date format for date arrays
@@ -115,7 +114,7 @@ impl<W: Write> Writer<W> {
     /// Create a new CsvWriter from a writable object, with default options
     pub fn new(writer: W) -> Self {
         let delimiter = b',';
-        let mut builder = csv_crate::WriterBuilder::new();
+        let mut builder = csv::WriterBuilder::new();
         let writer = builder.delimiter(delimiter).from_writer(writer);
         Writer {
             writer,
@@ -135,7 +134,7 @@ impl<W: Write> Writer<W> {
         batch: &[ArrayRef],
         row_index: usize,
         buffer: &mut [String],
-    ) -> Result<()> {
+    ) -> Result<(), ArrowError> {
         // TODO: it'd be more efficient if we could create `record: Vec<&[u8]>
         for (col_index, item) in buffer.iter_mut().enumerate() {
             let col = &batch[col_index];
@@ -242,7 +241,7 @@ impl<W: Write> Writer<W> {
         time_zone: Option<&String>,
         row_index: usize,
         col: &ArrayRef,
-    ) -> Result<String> {
+    ) -> Result<String, ArrowError> {
         use TimeUnit::*;
         let datetime = match time_unit {
             Second => col
@@ -283,7 +282,7 @@ impl<W: Write> Writer<W> {
     }
 
     /// Write a vector of record batches to a writable object
-    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
         let num_columns = batch.num_columns();
         if self.beginning {
             if self.has_headers {
@@ -305,7 +304,7 @@ impl<W: Write> Writer<W> {
             .iter()
             .map(|array| match array.data_type() {
                 DataType::Dictionary(_, value_type) => {
-                    crate::compute::kernels::cast::cast(array, value_type)
+                    arrow_cast::cast(array, value_type)
                         .expect("cannot cast dictionary to underlying values")
                 }
                 _ => array.clone(),
@@ -365,16 +364,14 @@ impl WriterBuilder {
     /// # Example
     ///
     /// ```
-    /// extern crate arrow;
+    /// # use arrow_csv::{Writer, WriterBuilder};
+    /// # use std::fs::File;
     ///
-    /// use arrow::csv;
-    /// use std::fs::File;
-    ///
-    /// fn example() -> csv::Writer<File> {
+    /// fn example() -> Writer<File> {
     ///     let file = File::create("target/out.csv").unwrap();
     ///
     ///     // create a builder that doesn't write headers
-    ///     let builder = csv::WriterBuilder::new().has_headers(false);
+    ///     let builder = WriterBuilder::new().has_headers(false);
     ///     let writer = builder.build(file);
     ///
     ///     writer
@@ -423,7 +420,7 @@ impl WriterBuilder {
     /// Create a new `Writer`
     pub fn build<W: Write>(self, writer: W) -> Writer<W> {
         let delimiter = self.delimiter.unwrap_or(b',');
-        let mut builder = csv_crate::WriterBuilder::new();
+        let mut builder = csv::WriterBuilder::new();
         let writer = builder.delimiter(delimiter).from_writer(writer);
         Writer {
             writer,
@@ -452,13 +449,8 @@ impl WriterBuilder {
 mod tests {
     use super::*;
 
-    use crate::csv::Reader;
-    use crate::datatypes::{Field, Schema};
-    #[cfg(feature = "chrono-tz")]
-    use crate::util::string_writer::StringWriter;
-    use crate::util::test_util::get_temp_file;
-    use std::fs::File;
-    use std::io::{Cursor, Read};
+    use crate::Reader;
+    use std::io::{Cursor, Read, Seek};
     use std::sync::Arc;
 
     #[test]
@@ -512,15 +504,17 @@ mod tests {
         )
         .unwrap();
 
-        let file = get_temp_file("columns.csv", &[]);
+        let mut file = tempfile::tempfile().unwrap();
 
-        let mut writer = Writer::new(file);
+        let mut writer = Writer::new(&mut file);
         let batches = vec![&batch, &batch];
         for batch in batches {
             writer.write(batch).unwrap();
         }
+        drop(writer);
+
         // check that file was written successfully
-        let mut file = File::open("target/debug/testdata/columns.csv").unwrap();
+        file.rewind().unwrap();
         let mut buffer: Vec<u8> = vec![];
         file.read_to_end(&mut buffer).unwrap();
 
@@ -571,20 +565,21 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo
         )
         .unwrap();
 
-        let file = get_temp_file("custom_options.csv", &[]);
+        let mut file = tempfile::tempfile().unwrap();
 
         let builder = WriterBuilder::new()
             .has_headers(false)
             .with_delimiter(b'|')
             .with_time_format("%r".to_string());
-        let mut writer = builder.build(file);
+        let mut writer = builder.build(&mut file);
         let batches = vec![&batch];
         for batch in batches {
             writer.write(batch).unwrap();
         }
+        drop(writer);
 
         // check that file was written successfully
-        let mut file = File::open("target/debug/testdata/custom_options.csv").unwrap();
+        file.rewind().unwrap();
         let mut buffer: Vec<u8> = vec![];
         file.read_to_end(&mut buffer).unwrap();
 
@@ -595,105 +590,6 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo
         );
     }
 
-    #[cfg(feature = "chrono-tz")]
-    #[test]
-    fn test_export_csv_timestamps() {
-        let schema = Schema::new(vec![
-            Field::new(
-                "c1",
-                DataType::Timestamp(
-                    TimeUnit::Millisecond,
-                    Some("Australia/Sydney".to_string()),
-                ),
-                true,
-            ),
-            Field::new("c2", DataType::Timestamp(TimeUnit::Millisecond, None), true),
-        ]);
-
-        let c1 = TimestampMillisecondArray::from(
-            // 1555584887 converts to 2019-04-18, 20:54:47 in time zone Australia/Sydney (AEST).
-            // The offset (difference to UTC) is +10:00.
-            // 1635577147 converts to 2021-10-30 17:59:07 in time zone Australia/Sydney (AEDT)
-            // The offset (difference to UTC) is +11:00. Note that daylight savings is in effect on 2021-10-30.
-            //
-            vec![Some(1555584887378), Some(1635577147000)],
-        )
-        .with_timezone("Australia/Sydney".to_string());
-        let c2 = TimestampMillisecondArray::from(vec![
-            Some(1555584887378),
-            Some(1635577147000),
-        ]);
-        let batch =
-            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)])
-                .unwrap();
-
-        let sw = StringWriter::new();
-        let mut writer = Writer::new(sw);
-        let batches = vec![&batch];
-        for batch in batches {
-            writer.write(batch).unwrap();
-        }
-
-        let left = "c1,c2
-2019-04-18T20:54:47.378000000+10:00,2019-04-18T10:54:47.378000000
-2021-10-30T17:59:07.000000000+11:00,2021-10-30T06:59:07.000000000\n";
-        let right = writer.writer.into_inner().map(|s| s.to_string());
-        assert_eq!(Some(left.to_string()), right.ok());
-    }
-
-    #[cfg(not(feature = "chrono-tz"))]
-    #[test]
-    fn test_conversion_consistency() {
-        // test if we can serialize and deserialize whilst retaining the same type information/ precision
-
-        let schema = Schema::new(vec![
-            Field::new("c1", DataType::Date32, false),
-            Field::new("c2", DataType::Date64, false),
-        ]);
-
-        let c1 = Date32Array::from(vec![3, 2, 1]);
-        let c2 = Date64Array::from(vec![3, 2, 1]);
-
-        let batch = RecordBatch::try_new(
-            Arc::new(schema.clone()),
-            vec![Arc::new(c1), Arc::new(c2)],
-        )
-        .unwrap();
-
-        let builder = WriterBuilder::new().has_headers(false);
-
-        let mut buf: Cursor<Vec<u8>> = Default::default();
-        // drop the writer early to release the borrow.
-        {
-            let mut writer = builder.build(&mut buf);
-            writer.write(&batch).unwrap();
-        }
-        buf.set_position(0);
-
-        let mut reader = Reader::new(
-            buf,
-            Arc::new(schema),
-            false,
-            None,
-            3,
-            // starting at row 2 and up to row 6.
-            None,
-            None,
-            None,
-        );
-        let rb = reader.next().unwrap().unwrap();
-        let c1 = rb.column(0).as_any().downcast_ref::<Date32Array>().unwrap();
-        let c2 = rb.column(1).as_any().downcast_ref::<Date64Array>().unwrap();
-
-        let actual = c1.into_iter().collect::<Vec<_>>();
-        let expected = vec![Some(3), Some(2), Some(1)];
-        assert_eq!(actual, expected);
-        let actual = c2.into_iter().collect::<Vec<_>>();
-        let expected = vec![Some(3), Some(2), Some(1)];
-        assert_eq!(actual, expected);
-    }
-
-    #[cfg(feature = "chrono-tz")]
     #[test]
     fn test_conversion_consistency() {
         // test if we can serialize and deserialize whilst retaining the same type information/ precision
diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index 6c30df6bd..cc9421de7 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -46,6 +46,7 @@ ahash = { version = "0.8", default-features = false, features = ["runtime-rng"]
 [dependencies]
 arrow-buffer = { version = "26.0.0", path = "../arrow-buffer" }
 arrow-cast = { version = "26.0.0", path = "../arrow-cast" }
+arrow-csv = { version = "26.0.0", path = "../arrow-csv", optional = true }
 arrow-data = { version = "26.0.0", path = "../arrow-data" }
 arrow-schema = { version = "26.0.0", path = "../arrow-schema" }
 arrow-array = { version = "26.0.0", path = "../arrow-array" }
@@ -57,10 +58,8 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng"
 num = { version = "0.4", default-features = false, features = ["std"] }
 half = { version = "2.1", default-features = false, features = ["num-traits"] }
 hashbrown = { version = "0.12", default-features = false }
-csv_crate = { version = "1.1", default-features = false, optional = true, package = "csv" }
 regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] }
 regex-syntax = { version = "0.6.27", default-features = false, features = ["unicode"] }
-lazy_static = { version = "1.4", default-features = false }
 packed_simd = { version = "0.3", default-features = false, optional = true, package = "packed_simd_2" }
 chrono = { version = "0.4", default-features = false, features = ["clock"] }
 comfy-table = { version = "6.0", optional = true, default-features = false }
@@ -75,7 +74,7 @@ features = ["prettyprint", "ipc_compression", "dyn_cmp_dict", "ffi", "pyarrow"]
 [features]
 default = ["csv", "ipc", "json"]
 ipc_compression = ["ipc", "arrow-ipc/lz4", "arrow-ipc/zstd"]
-csv = ["csv_crate"]
+csv = ["arrow-csv"]
 ipc = ["arrow-ipc"]
 json = ["serde_json"]
 simd = ["packed_simd"]
@@ -265,5 +264,9 @@ harness = false
 required-features = ["test_utils"]
 
 [[test]]
-name = "ipc_integration"
+name = "ipc"
 required-features = ["test_utils", "ipc"]
+
+[[test]]
+name = "csv"
+required-features = ["csv"]
diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs
index b2fa30d26..d1e009584 100644
--- a/arrow/src/lib.rs
+++ b/arrow/src/lib.rs
@@ -306,7 +306,7 @@ pub mod bitmap {
 pub mod array;
 pub mod compute;
 #[cfg(feature = "csv")]
-pub mod csv;
+pub use arrow_csv as csv;
 pub mod datatypes;
 pub mod error;
 #[cfg(feature = "ffi")]
diff --git a/arrow/tests/csv.rs b/arrow/tests/csv.rs
new file mode 100644
index 000000000..11e1b30e1
--- /dev/null
+++ b/arrow/tests/csv.rs
@@ -0,0 +1,486 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fs::File;
+use std::io::{Cursor, Read};
+use std::sync::Arc;
+
+use arrow_array::*;
+use arrow_csv::{Reader, ReaderBuilder};
+use arrow_schema::*;
+
+#[test]
+#[cfg(feature = "chrono-tz")]
+fn test_export_csv_timestamps() {
+    let schema = Schema::new(vec![
+        Field::new(
+            "c1",
+            DataType::Timestamp(
+                TimeUnit::Millisecond,
+                Some("Australia/Sydney".to_string()),
+            ),
+            true,
+        ),
+        Field::new("c2", DataType::Timestamp(TimeUnit::Millisecond, None), true),
+    ]);
+
+    let c1 = TimestampMillisecondArray::from(
+        // 1555584887 converts to 2019-04-18, 20:54:47 in time zone Australia/Sydney (AEST).
+        // The offset (difference to UTC) is +10:00.
+        // 1635577147 converts to 2021-10-30 17:59:07 in time zone Australia/Sydney (AEDT)
+        // The offset (difference to UTC) is +11:00. Note that daylight savings is in effect on 2021-10-30.
+        //
+        vec![Some(1555584887378), Some(1635577147000)],
+    )
+    .with_timezone("Australia/Sydney".to_string());
+    let c2 =
+        TimestampMillisecondArray::from(vec![Some(1555584887378), Some(1635577147000)]);
+    let batch =
+        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
+
+    let mut sw = Vec::new();
+    let mut writer = arrow_csv::Writer::new(&mut sw);
+    let batches = vec![&batch];
+    for batch in batches {
+        writer.write(batch).unwrap();
+    }
+    drop(writer);
+
+    let left = "c1,c2
+2019-04-18T20:54:47.378000000+10:00,2019-04-18T10:54:47.378000000
+2021-10-30T17:59:07.000000000+11:00,2021-10-30T06:59:07.000000000\n";
+    let right = String::from_utf8(sw).unwrap();
+    assert_eq!(left, right);
+}
+
+#[test]
+fn test_csv() {
+    let _: Vec<()> = vec![None, Some("%Y-%m-%dT%H:%M:%S%.f%:z".to_string())]
+        .into_iter()
+        .map(|format| {
+            let schema = Schema::new(vec![
+                Field::new("city", DataType::Utf8, false),
+                Field::new("lat", DataType::Float64, false),
+                Field::new("lng", DataType::Float64, false),
+            ]);
+
+            let file = File::open("test/data/uk_cities.csv").unwrap();
+            let mut csv = Reader::new(
+                file,
+                Arc::new(schema.clone()),
+                false,
+                None,
+                1024,
+                None,
+                None,
+                format,
+            );
+            assert_eq!(Arc::new(schema), csv.schema());
+            let batch = csv.next().unwrap().unwrap();
+            assert_eq!(37, batch.num_rows());
+            assert_eq!(3, batch.num_columns());
+
+            // access data from a primitive array
+            let lat = batch
+                .column(1)
+                .as_any()
+                .downcast_ref::<Float64Array>()
+                .unwrap();
+            assert_eq!(57.653484, lat.value(0));
+
+            // access data from a string array (ListArray<u8>)
+            let city = batch
+                .column(0)
+                .as_any()
+                .downcast_ref::<StringArray>()
+                .unwrap();
+
+            assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
+        })
+        .collect();
+}
+
+#[test]
+fn test_csv_schema_metadata() {
+    let mut metadata = std::collections::HashMap::new();
+    metadata.insert("foo".to_owned(), "bar".to_owned());
+    let schema = Schema::new_with_metadata(
+        vec![
+            Field::new("city", DataType::Utf8, false),
+            Field::new("lat", DataType::Float64, false),
+            Field::new("lng", DataType::Float64, false),
+        ],
+        metadata.clone(),
+    );
+
+    let file = File::open("test/data/uk_cities.csv").unwrap();
+
+    let mut csv = Reader::new(
+        file,
+        Arc::new(schema.clone()),
+        false,
+        None,
+        1024,
+        None,
+        None,
+        None,
+    );
+    assert_eq!(Arc::new(schema), csv.schema());
+    let batch = csv.next().unwrap().unwrap();
+    assert_eq!(37, batch.num_rows());
+    assert_eq!(3, batch.num_columns());
+
+    assert_eq!(&metadata, batch.schema().metadata());
+}
+
+#[test]
+fn test_csv_reader_with_decimal() {
+    let schema = Schema::new(vec![
+        Field::new("city", DataType::Utf8, false),
+        Field::new("lat", DataType::Decimal128(38, 6), false),
+        Field::new("lng", DataType::Decimal128(38, 6), false),
+    ]);
+
+    let file = File::open("test/data/decimal_test.csv").unwrap();
+
+    let mut csv =
+        Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None);
+    let batch = csv.next().unwrap().unwrap();
+    // access data from a primitive array
+    let lat = batch
+        .column(1)
+        .as_any()
+        .downcast_ref::<Decimal128Array>()
+        .unwrap();
+
+    assert_eq!("57.653484", lat.value_as_string(0));
+    assert_eq!("53.002666", lat.value_as_string(1));
+    assert_eq!("52.412811", lat.value_as_string(2));
+    assert_eq!("51.481583", lat.value_as_string(3));
+    assert_eq!("12.123456", lat.value_as_string(4));
+    assert_eq!("50.760000", lat.value_as_string(5));
+    assert_eq!("0.123000", lat.value_as_string(6));
+    assert_eq!("123.000000", lat.value_as_string(7));
+    assert_eq!("123.000000", lat.value_as_string(8));
+    assert_eq!("-50.760000", lat.value_as_string(9));
+}
+
+#[test]
+fn test_csv_from_buf_reader() {
+    let schema = Schema::new(vec![
+        Field::new("city", DataType::Utf8, false),
+        Field::new("lat", DataType::Float64, false),
+        Field::new("lng", DataType::Float64, false),
+    ]);
+
+    let file_with_headers = File::open("test/data/uk_cities_with_headers.csv").unwrap();
+    let file_without_headers = File::open("test/data/uk_cities.csv").unwrap();
+    let both_files = file_with_headers
+        .chain(Cursor::new("\n".to_string()))
+        .chain(file_without_headers);
+    let mut csv = Reader::from_reader(
+        both_files,
+        Arc::new(schema),
+        true,
+        None,
+        1024,
+        None,
+        None,
+        None,
+    );
+    let batch = csv.next().unwrap().unwrap();
+    assert_eq!(74, batch.num_rows());
+    assert_eq!(3, batch.num_columns());
+}
+
+#[test]
+fn test_csv_with_schema_inference() {
+    let file = File::open("test/data/uk_cities_with_headers.csv").unwrap();
+
+    let builder = ReaderBuilder::new().has_header(true).infer_schema(None);
+
+    let mut csv = builder.build(file).unwrap();
+    let expected_schema = Schema::new(vec![
+        Field::new("city", DataType::Utf8, true),
+        Field::new("lat", DataType::Float64, true),
+        Field::new("lng", DataType::Float64, true),
+    ]);
+    assert_eq!(Arc::new(expected_schema), csv.schema());
+    let batch = csv.next().unwrap().unwrap();
+    assert_eq!(37, batch.num_rows());
+    assert_eq!(3, batch.num_columns());
+
+    // access data from a primitive array
+    let lat = batch
+        .column(1)
+        .as_any()
+        .downcast_ref::<Float64Array>()
+        .unwrap();
+    assert_eq!(57.653484, lat.value(0));
+
+    // access data from a string array (ListArray<u8>)
+    let city = batch
+        .column(0)
+        .as_any()
+        .downcast_ref::<StringArray>()
+        .unwrap();
+
+    assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
+}
+
+#[test]
+fn test_csv_with_schema_inference_no_headers() {
+    let file = File::open("test/data/uk_cities.csv").unwrap();
+
+    let builder = ReaderBuilder::new().infer_schema(None);
+
+    let mut csv = builder.build(file).unwrap();
+
+    // csv field names should be 'column_{number}'
+    let schema = csv.schema();
+    assert_eq!("column_1", schema.field(0).name());
+    assert_eq!("column_2", schema.field(1).name());
+    assert_eq!("column_3", schema.field(2).name());
+    let batch = csv.next().unwrap().unwrap();
+    let batch_schema = batch.schema();
+
+    assert_eq!(schema, batch_schema);
+    assert_eq!(37, batch.num_rows());
+    assert_eq!(3, batch.num_columns());
+
+    // access data from a primitive array
+    let lat = batch
+        .column(1)
+        .as_any()
+        .downcast_ref::<Float64Array>()
+        .unwrap();
+    assert_eq!(57.653484, lat.value(0));
+
+    // access data from a string array (ListArray<u8>)
+    let city = batch
+        .column(0)
+        .as_any()
+        .downcast_ref::<StringArray>()
+        .unwrap();
+
+    assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
+}
+
+#[test]
+fn test_csv_builder_with_bounds() {
+    let file = File::open("test/data/uk_cities.csv").unwrap();
+
+    // Set the bounds to the lines 0, 1 and 2.
+    let mut csv = ReaderBuilder::new().with_bounds(0, 2).build(file).unwrap();
+    let batch = csv.next().unwrap().unwrap();
+
+    // access data from a string array (ListArray<u8>)
+    let city = batch
+        .column(0)
+        .as_any()
+        .downcast_ref::<StringArray>()
+        .unwrap();
+
+    // The value on line 0 is within the bounds
+    assert_eq!("Elgin, Scotland, the UK", city.value(0));
+
+    // The value on line 13 is outside of the bounds. Therefore
+    // the call to .value() will panic.
+    let result = std::panic::catch_unwind(|| city.value(13));
+    assert!(result.is_err());
+}
+
+#[test]
+fn test_csv_with_projection() {
+    let schema = Schema::new(vec![
+        Field::new("city", DataType::Utf8, false),
+        Field::new("lat", DataType::Float64, false),
+        Field::new("lng", DataType::Float64, false),
+    ]);
+
+    let file = File::open("test/data/uk_cities.csv").unwrap();
+
+    let mut csv = Reader::new(
+        file,
+        Arc::new(schema),
+        false,
+        None,
+        1024,
+        None,
+        Some(vec![0, 1]),
+        None,
+    );
+    let projected_schema = Arc::new(Schema::new(vec![
+        Field::new("city", DataType::Utf8, false),
+        Field::new("lat", DataType::Float64, false),
+    ]));
+    assert_eq!(projected_schema, csv.schema());
+    let batch = csv.next().unwrap().unwrap();
+    assert_eq!(projected_schema, batch.schema());
+    assert_eq!(37, batch.num_rows());
+    assert_eq!(2, batch.num_columns());
+}
+
+#[test]
+fn test_csv_with_dictionary() {
+    let schema = Schema::new(vec![
+        Field::new(
+            "city",
+            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
+            false,
+        ),
+        Field::new("lat", DataType::Float64, false),
+        Field::new("lng", DataType::Float64, false),
+    ]);
+
+    let file = File::open("test/data/uk_cities.csv").unwrap();
+
+    let mut csv = Reader::new(
+        file,
+        Arc::new(schema),
+        false,
+        None,
+        1024,
+        None,
+        Some(vec![0, 1]),
+        None,
+    );
+    let projected_schema = Arc::new(Schema::new(vec![
+        Field::new(
+            "city",
+            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
+            false,
+        ),
+        Field::new("lat", DataType::Float64, false),
+    ]));
+    assert_eq!(projected_schema, csv.schema());
+    let batch = csv.next().unwrap().unwrap();
+    assert_eq!(projected_schema, batch.schema());
+    assert_eq!(37, batch.num_rows());
+    assert_eq!(2, batch.num_columns());
+
+    let strings = arrow_cast::cast(batch.column(0), &DataType::Utf8).unwrap();
+    let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+
+    assert_eq!(strings.value(0), "Elgin, Scotland, the UK");
+    assert_eq!(strings.value(4), "Eastbourne, East Sussex, UK");
+    assert_eq!(strings.value(29), "Uckfield, East Sussex, UK");
+}
+
+#[test]
+fn test_nulls() {
+    let schema = Schema::new(vec![
+        Field::new("c_int", DataType::UInt64, false),
+        Field::new("c_float", DataType::Float32, true),
+        Field::new("c_string", DataType::Utf8, false),
+    ]);
+
+    let file = File::open("test/data/null_test.csv").unwrap();
+
+    let mut csv = Reader::new(file, Arc::new(schema), true, None, 1024, None, None, None);
+    let batch = csv.next().unwrap().unwrap();
+
+    assert!(!batch.column(1).is_null(0));
+    assert!(!batch.column(1).is_null(1));
+    assert!(batch.column(1).is_null(2));
+    assert!(!batch.column(1).is_null(3));
+    assert!(!batch.column(1).is_null(4));
+}
+
+#[test]
+fn test_nulls_with_inference() {
+    let file = File::open("test/data/various_types.csv").unwrap();
+
+    let builder = ReaderBuilder::new()
+        .infer_schema(None)
+        .has_header(true)
+        .with_delimiter(b'|')
+        .with_batch_size(512)
+        .with_projection(vec![0, 1, 2, 3, 4, 5]);
+
+    let mut csv = builder.build(file).unwrap();
+    let batch = csv.next().unwrap().unwrap();
+
+    assert_eq!(7, batch.num_rows());
+    assert_eq!(6, batch.num_columns());
+
+    let schema = batch.schema();
+
+    assert_eq!(&DataType::Int64, schema.field(0).data_type());
+    assert_eq!(&DataType::Float64, schema.field(1).data_type());
+    assert_eq!(&DataType::Float64, schema.field(2).data_type());
+    assert_eq!(&DataType::Boolean, schema.field(3).data_type());
+    assert_eq!(&DataType::Date32, schema.field(4).data_type());
+    assert_eq!(&DataType::Date64, schema.field(5).data_type());
+
+    let names: Vec<&str> = schema.fields().iter().map(|x| x.name().as_str()).collect();
+    assert_eq!(
+        names,
+        vec![
+            "c_int",
+            "c_float",
+            "c_string",
+            "c_bool",
+            "c_date",
+            "c_datetime"
+        ]
+    );
+
+    assert!(schema.field(0).is_nullable());
+    assert!(schema.field(1).is_nullable());
+    assert!(schema.field(2).is_nullable());
+    assert!(schema.field(3).is_nullable());
+    assert!(schema.field(4).is_nullable());
+    assert!(schema.field(5).is_nullable());
+
+    assert!(!batch.column(1).is_null(0));
+    assert!(!batch.column(1).is_null(1));
+    assert!(batch.column(1).is_null(2));
+    assert!(!batch.column(1).is_null(3));
+    assert!(!batch.column(1).is_null(4));
+}
+
+#[test]
+fn test_parse_invalid_csv() {
+    let file = File::open("test/data/various_types_invalid.csv").unwrap();
+
+    let schema = Schema::new(vec![
+        Field::new("c_int", DataType::UInt64, false),
+        Field::new("c_float", DataType::Float32, false),
+        Field::new("c_string", DataType::Utf8, false),
+        Field::new("c_bool", DataType::Boolean, false),
+    ]);
+
+    let builder = ReaderBuilder::new()
+        .with_schema(Arc::new(schema))
+        .has_header(true)
+        .with_delimiter(b'|')
+        .with_batch_size(512)
+        .with_projection(vec![0, 1, 2, 3]);
+
+    let mut csv = builder.build(file).unwrap();
+    match csv.next() {
+        Some(e) => match e {
+            Err(e) => assert_eq!(
+                "ParseError(\"Error while parsing value 4.x4 for column 1 at line 4\")",
+                format!("{:?}", e)
+            ),
+            Ok(_) => panic!("should have failed"),
+        },
+        None => panic!("should have failed"),
+    }
+}
diff --git a/arrow/tests/ipc_integration.rs b/arrow/tests/ipc.rs
similarity index 100%
rename from arrow/tests/ipc_integration.rs
rename to arrow/tests/ipc.rs