You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/04/26 21:39:08 UTC
[arrow-datafusion] branch master updated: ARROW-12306: Read CSV
format text from stdin or memory (#54)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 3855473 ARROW-12306: Read CSV format text from stdin or memory (#54)
3855473 is described below
commit 3855473e0bc3b8713a1c1b1fe21efefb8ca32cd6
Author: 思维 <he...@users.noreply.github.com>
AuthorDate: Tue Apr 27 05:39:01 2021 +0800
ARROW-12306: Read CSV format text from stdin or memory (#54)
---
datafusion/src/datasource/csv.rs | 152 +++++++++++++++++++++---
datafusion/src/physical_plan/csv.rs | 231 +++++++++++++++++++++++++++++++-----
2 files changed, 334 insertions(+), 49 deletions(-)
diff --git a/datafusion/src/datasource/csv.rs b/datafusion/src/datasource/csv.rs
index 6f6c9ab..1bd1b4b 100644
--- a/datafusion/src/datasource/csv.rs
+++ b/datafusion/src/datasource/csv.rs
@@ -35,8 +35,9 @@
use arrow::datatypes::SchemaRef;
use std::any::Any;
+use std::io::{Read, Seek};
use std::string::String;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use crate::datasource::datasource::Statistics;
use crate::datasource::TableProvider;
@@ -46,10 +47,17 @@ use crate::physical_plan::csv::CsvExec;
pub use crate::physical_plan::csv::CsvReadOptions;
use crate::physical_plan::{common, ExecutionPlan};
+enum Source {
+ /// Path to a single CSV file or a directory containing one of more CSV files
+ Path(String),
+
+ /// Read CSV data from a reader
+ Reader(Mutex<Option<Box<dyn Read + Send + Sync + 'static>>>),
+}
+
/// Represents a CSV file with a provided schema
pub struct CsvFile {
- /// Path to a single CSV file or a directory containing one of more CSV files
- path: String,
+ source: Source,
schema: SchemaRef,
has_header: bool,
delimiter: u8,
@@ -77,7 +85,7 @@ impl CsvFile {
});
Ok(Self {
- path: String::from(path),
+ source: Source::Path(path.to_string()),
schema,
has_header: options.has_header,
delimiter: options.delimiter,
@@ -86,9 +94,64 @@ impl CsvFile {
})
}
+ /// Attempt to initialize a `CsvFile` from a reader. The schema MUST be provided in options.
+ pub fn try_new_from_reader<R: Read + Send + Sync + 'static>(
+ reader: R,
+ options: CsvReadOptions,
+ ) -> Result<Self> {
+ let schema = Arc::new(match options.schema {
+ Some(s) => s.clone(),
+ None => {
+ return Err(DataFusionError::Execution(
+ "Schema must be provided to CsvRead".to_string(),
+ ));
+ }
+ });
+
+ Ok(Self {
+ source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+ schema,
+ has_header: options.has_header,
+ delimiter: options.delimiter,
+ statistics: Statistics::default(),
+ file_extension: String::new(),
+ })
+ }
+
+ /// Attempt to initialize a `CsvRead` from a reader impls `Seek`. The schema can be inferred automatically.
+ pub fn try_new_from_reader_infer_schema<R: Read + Seek + Send + Sync + 'static>(
+ mut reader: R,
+ options: CsvReadOptions,
+ ) -> Result<Self> {
+ let schema = Arc::new(match options.schema {
+ Some(s) => s.clone(),
+ None => {
+ let (schema, _) = arrow::csv::reader::infer_file_schema(
+ &mut reader,
+ options.delimiter,
+ Some(options.schema_infer_max_records),
+ options.has_header,
+ )?;
+ schema
+ }
+ });
+
+ Ok(Self {
+ source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+ schema,
+ has_header: options.has_header,
+ delimiter: options.delimiter,
+ statistics: Statistics::default(),
+ file_extension: String::new(),
+ })
+ }
+
/// Get the path for the CSV file(s) represented by this CsvFile instance
pub fn path(&self) -> &str {
- &self.path
+ match &self.source {
+ Source::Reader(_) => "",
+ Source::Path(path) => path,
+ }
}
/// Determine whether the CSV file(s) represented by this CsvFile instance have a header row
@@ -123,22 +186,75 @@ impl TableProvider for CsvFile {
_filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(Arc::new(CsvExec::try_new(
- &self.path,
- CsvReadOptions::new()
- .schema(&self.schema)
- .has_header(self.has_header)
- .delimiter(self.delimiter)
- .file_extension(self.file_extension.as_str()),
- projection.clone(),
- limit
- .map(|l| std::cmp::min(l, batch_size))
- .unwrap_or(batch_size),
- limit,
- )?))
+ let opts = CsvReadOptions::new()
+ .schema(&self.schema)
+ .has_header(self.has_header)
+ .delimiter(self.delimiter)
+ .file_extension(self.file_extension.as_str());
+ let batch_size = limit
+ .map(|l| std::cmp::min(l, batch_size))
+ .unwrap_or(batch_size);
+
+ let exec = match &self.source {
+ Source::Reader(maybe_reader) => {
+ if let Some(rdr) = maybe_reader.lock().unwrap().take() {
+ CsvExec::try_new_from_reader(
+ rdr,
+ opts,
+ projection.clone(),
+ batch_size,
+ limit,
+ )?
+ } else {
+ return Err(DataFusionError::Execution(
+ "You can only read once if the data comes from a reader"
+ .to_string(),
+ ));
+ }
+ }
+ Source::Path(p) => {
+ CsvExec::try_new(&p, opts, projection.clone(), batch_size, limit)?
+ }
+ };
+ Ok(Arc::new(exec))
}
fn statistics(&self) -> Statistics {
self.statistics.clone()
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::prelude::*;
+
+ #[tokio::test]
+ async fn csv_file_from_reader() -> Result<()> {
+ let testdata = arrow::util::test_util::arrow_test_data();
+ let filename = "aggregate_test_100.csv";
+ let path = format!("{}/csv/{}", testdata, filename);
+ let buf = std::fs::read(path).unwrap();
+ let rdr = std::io::Cursor::new(buf);
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table(
+ "aggregate_test",
+ Arc::new(CsvFile::try_new_from_reader_infer_schema(
+ rdr,
+ CsvReadOptions::new(),
+ )?),
+ )?;
+ let df = ctx.sql("select max(c2) from aggregate_test")?;
+ let batches = df.collect().await?;
+ assert_eq!(
+ batches[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<arrow::array::Int64Array>()
+ .unwrap()
+ .value(0),
+ 5
+ );
+ Ok(())
+ }
+}
diff --git a/datafusion/src/physical_plan/csv.rs b/datafusion/src/physical_plan/csv.rs
index 7ee5ae3..b96a702 100644
--- a/datafusion/src/physical_plan/csv.rs
+++ b/datafusion/src/physical_plan/csv.rs
@@ -17,12 +17,6 @@
//! Execution plan for reading CSV files
-use std::any::Any;
-use std::fs::File;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::task::{Context, Poll};
-
use crate::error::{DataFusionError, Result};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{common, Partitioning};
@@ -31,6 +25,13 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use futures::Stream;
+use std::any::Any;
+use std::fs::File;
+use std::io::Read;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::sync::Mutex;
+use std::task::{Context, Poll};
use super::{RecordBatchStream, SendableRecordBatchStream};
use async_trait::async_trait;
@@ -106,13 +107,69 @@ impl<'a> CsvReadOptions<'a> {
}
}
+/// Source represents where the data comes from.
+enum Source {
+ /// The data comes from partitioned files
+ PartitionedFiles {
+ /// Path to directory containing partitioned files with the same schema
+ path: String,
+ /// The individual files under path
+ filenames: Vec<String>,
+ },
+
+ /// The data comes from anything impl Read trait
+ Reader(Mutex<Option<Box<dyn Read + Send + Sync + 'static>>>),
+}
+
+impl std::fmt::Debug for Source {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Source::PartitionedFiles { path, filenames } => f
+ .debug_struct("PartitionedFiles")
+ .field("path", path)
+ .field("filenames", filenames)
+ .finish()?,
+ Source::Reader(_) => f.write_str("Reader")?,
+ };
+ Ok(())
+ }
+}
+
+impl Clone for Source {
+ fn clone(&self) -> Self {
+ match self {
+ Source::PartitionedFiles { path, filenames } => Self::PartitionedFiles {
+ path: path.clone(),
+ filenames: filenames.clone(),
+ },
+ Source::Reader(_) => Self::Reader(Mutex::new(None)),
+ }
+ }
+}
+
+impl Source {
+ /// Path to directory containing partitioned files with the same schema
+ pub fn path(&self) -> &str {
+ match self {
+ Source::PartitionedFiles { path, .. } => path.as_str(),
+ Source::Reader(_) => "",
+ }
+ }
+
+ /// The individual files under path
+ pub fn filenames(&self) -> &[String] {
+ match self {
+ Source::PartitionedFiles { filenames, .. } => filenames,
+ Source::Reader(_) => &[],
+ }
+ }
+}
+
/// Execution plan for scanning a CSV file
#[derive(Debug, Clone)]
pub struct CsvExec {
- /// Path to directory containing partitioned CSV files with the same schema
- path: String,
- /// The individual files under path
- filenames: Vec<String>,
+ /// Where the data comes from.
+ source: Source,
/// Schema representing the CSV file
schema: SchemaRef,
/// Does the CSV file have a header?
@@ -163,8 +220,10 @@ impl CsvExec {
};
Ok(Self {
- path: path.to_string(),
- filenames,
+ source: Source::PartitionedFiles {
+ path: path.to_string(),
+ filenames,
+ },
schema: Arc::new(schema),
has_header: options.has_header,
delimiter: Some(options.delimiter),
@@ -175,15 +234,50 @@ impl CsvExec {
limit,
})
}
+ /// Create a new execution plan for reading from a reader
+ pub fn try_new_from_reader(
+ reader: impl Read + Send + Sync + 'static,
+ options: CsvReadOptions,
+ projection: Option<Vec<usize>>,
+ batch_size: usize,
+ limit: Option<usize>,
+ ) -> Result<Self> {
+ let schema = match options.schema {
+ Some(s) => s.clone(),
+ None => {
+ return Err(DataFusionError::Execution(
+ "The schema must be provided in options when reading from a reader"
+ .to_string(),
+ ));
+ }
+ };
+
+ let projected_schema = match &projection {
+ None => schema.clone(),
+ Some(p) => Schema::new(p.iter().map(|i| schema.field(*i).clone()).collect()),
+ };
+
+ Ok(Self {
+ source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+ schema: Arc::new(schema),
+ has_header: options.has_header,
+ delimiter: Some(options.delimiter),
+ file_extension: String::new(),
+ projection,
+ projected_schema: Arc::new(projected_schema),
+ batch_size,
+ limit,
+ })
+ }
/// Path to directory containing partitioned CSV files with the same schema
pub fn path(&self) -> &str {
- &self.path
+ self.source.path()
}
/// The individual files under path
pub fn filenames(&self) -> &[String] {
- &self.filenames
+ self.source.filenames()
}
/// Does the CSV file have a header?
@@ -249,7 +343,10 @@ impl ExecutionPlan for CsvExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(self.filenames.len())
+ Partitioning::UnknownPartitioning(match &self.source {
+ Source::PartitionedFiles { filenames, .. } => filenames.len(),
+ Source::Reader(_) => 1,
+ })
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -272,25 +369,51 @@ impl ExecutionPlan for CsvExec {
}
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
- Ok(Box::pin(CsvStream::try_new(
- &self.filenames[partition],
- self.schema.clone(),
- self.has_header,
- self.delimiter,
- &self.projection,
- self.batch_size,
- self.limit,
- )?))
+ match &self.source {
+ Source::PartitionedFiles { filenames, .. } => {
+ Ok(Box::pin(CsvStream::try_new(
+ &filenames[partition],
+ self.schema.clone(),
+ self.has_header,
+ self.delimiter,
+ &self.projection,
+ self.batch_size,
+ self.limit,
+ )?))
+ }
+ Source::Reader(rdr) => {
+ if partition != 0 {
+ Err(DataFusionError::Internal(
+ "Only partition 0 is valid when CSV comes from a reader"
+ .to_string(),
+ ))
+ } else if let Some(rdr) = rdr.lock().unwrap().take() {
+ Ok(Box::pin(CsvStream::try_new_from_reader(
+ rdr,
+ self.schema.clone(),
+ self.has_header,
+ self.delimiter,
+ &self.projection,
+ self.batch_size,
+ self.limit,
+ )?))
+ } else {
+ Err(DataFusionError::Execution(
+ "Error reading CSV: Data can only be read a single time when the source is a reader"
+ .to_string(),
+ ))
+ }
+ }
+ }
}
}
/// Iterator over batches
-struct CsvStream {
+struct CsvStream<R: Read> {
/// Arrow CSV reader
- reader: csv::Reader<File>,
+ reader: csv::Reader<R>,
}
-
-impl CsvStream {
+impl CsvStream<File> {
/// Create an iterator for a CSV file
pub fn try_new(
filename: &str,
@@ -302,11 +425,27 @@ impl CsvStream {
limit: Option<usize>,
) -> Result<Self> {
let file = File::open(filename)?;
+ Self::try_new_from_reader(
+ file, schema, has_header, delimiter, projection, batch_size, limit,
+ )
+ }
+}
+impl<R: Read> CsvStream<R> {
+ /// Create an iterator for a reader
+ pub fn try_new_from_reader(
+ reader: R,
+ schema: SchemaRef,
+ has_header: bool,
+ delimiter: Option<u8>,
+ projection: &Option<Vec<usize>>,
+ batch_size: usize,
+ limit: Option<usize>,
+ ) -> Result<CsvStream<R>> {
let start_line = if has_header { 1 } else { 0 };
let bounds = limit.map(|x| (0, x + start_line));
let reader = csv::Reader::new(
- file,
+ reader,
schema,
has_header,
delimiter,
@@ -319,7 +458,7 @@ impl CsvStream {
}
}
-impl Stream for CsvStream {
+impl<R: Read + Unpin> Stream for CsvStream<R> {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
@@ -330,7 +469,7 @@ impl Stream for CsvStream {
}
}
-impl RecordBatchStream for CsvStream {
+impl<R: Read + Unpin> RecordBatchStream for CsvStream<R> {
/// Get the schema
fn schema(&self) -> SchemaRef {
self.reader.schema()
@@ -398,4 +537,34 @@ mod tests {
assert_eq!("c3", batch_schema.field(2).name());
Ok(())
}
+
+ #[tokio::test]
+ async fn csv_exec_with_reader() -> Result<()> {
+ let schema = aggr_test_schema();
+ let testdata = arrow::util::test_util::arrow_test_data();
+ let filename = "aggregate_test_100.csv";
+ let path = format!("{}/csv/{}", testdata, filename);
+ let buf = std::fs::read(path).unwrap();
+ let rdr = std::io::Cursor::new(buf);
+ let csv = CsvExec::try_new_from_reader(
+ rdr,
+ CsvReadOptions::new().schema(&schema),
+ Some(vec![0, 2, 4]),
+ 1024,
+ None,
+ )?;
+ assert_eq!(13, csv.schema.fields().len());
+ assert_eq!(3, csv.projected_schema.fields().len());
+ assert_eq!(13, csv.file_schema().fields().len());
+ assert_eq!(3, csv.schema().fields().len());
+ let mut stream = csv.execute(0).await?;
+ let batch = stream.next().await.unwrap()?;
+ assert_eq!(3, batch.num_columns());
+ let batch_schema = batch.schema();
+ assert_eq!(3, batch_schema.fields().len());
+ assert_eq!("c1", batch_schema.field(0).name());
+ assert_eq!("c3", batch_schema.field(1).name());
+ assert_eq!("c5", batch_schema.field(2).name());
+ Ok(())
+ }
}