You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/05/24 16:25:33 UTC
[arrow-datafusion] branch master updated: Decouple FileFormat from datafusion_data_access (#2572)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 9ea7dc603 Decouple FileFormat from datafusion_data_access (#2572)
9ea7dc603 is described below
commit 9ea7dc6036a7b1d28c7450db4f26720b732a50de
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue May 24 17:25:26 2022 +0100
Decouple FileFormat from datafusion_data_access (#2572)
* Decouple FileFormat from datafusion_data_access
* Review feedback
* Update ballista pin
---
datafusion/core/src/datasource/file_format/avro.rs | 85 ++++------
datafusion/core/src/datasource/file_format/csv.rs | 68 +++-----
datafusion/core/src/datasource/file_format/json.rs | 65 +++----
datafusion/core/src/datasource/file_format/mod.rs | 61 ++++++-
.../core/src/datasource/file_format/parquet.rs | 186 +++++++++------------
datafusion/core/src/datasource/listing/mod.rs | 15 +-
datafusion/core/src/datasource/listing/table.rs | 22 +--
.../core/src/physical_plan/file_format/avro.rs | 40 ++---
.../core/src/physical_plan/file_format/json.rs | 51 +++---
.../core/src/physical_plan/file_format/parquet.rs | 140 ++++++----------
datafusion/core/src/test/mod.rs | 12 +-
datafusion/core/tests/row.rs | 16 +-
dev/build-arrow-ballista.sh | 3 +-
13 files changed, 348 insertions(+), 416 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs
index 882066206..63781da52 100644
--- a/datafusion/core/src/datasource/file_format/avro.rs
+++ b/datafusion/core/src/datasource/file_format/avro.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
-use futures::StreamExt;
+use datafusion_data_access::FileMeta;
use super::FileFormat;
use crate::avro_to_arrow::read_avro_schema_from_reader;
@@ -32,7 +32,7 @@ use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
-use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
+use datafusion_data_access::object_store::ObjectStore;
/// The default file extension of avro files
pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
@@ -46,10 +46,14 @@ impl FileFormat for AvroFormat {
self
}
- async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
+ async fn infer_schema(
+ &self,
+ store: &Arc<dyn ObjectStore>,
+ files: &[FileMeta],
+ ) -> Result<SchemaRef> {
let mut schemas = vec![];
- while let Some(obj_reader) = readers.next().await {
- let mut reader = obj_reader?.sync_reader()?;
+ for file in files {
+ let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
let schema = read_avro_schema_from_reader(&mut reader)?;
schemas.push(schema);
}
@@ -59,8 +63,9 @@ impl FileFormat for AvroFormat {
async fn infer_stats(
&self,
- _reader: Arc<dyn ObjectReader>,
+ _store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
+ _file: &FileMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
@@ -78,15 +83,9 @@ impl FileFormat for AvroFormat {
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
- use crate::{
- datafusion_data_access::object_store::local::{
- local_object_reader, local_object_reader_stream, LocalFileSystem,
- },
- physical_plan::collect,
- };
-
use super::*;
- use crate::datasource::listing::local_unpartitioned_file;
+ use crate::datasource::file_format::test_util::scan_format;
+ use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
@@ -100,7 +99,7 @@ mod tests {
let ctx = SessionContext::with_config(config);
let task_ctx = ctx.task_ctx();
let projection = None;
- let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.avro", projection, None).await?;
let stream = exec.execute(0, task_ctx)?;
let tt_batches = stream
@@ -122,7 +121,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = None;
- let exec = get_exec("alltypes_plain.avro", &projection, Some(1)).await?;
+ let exec = get_exec("alltypes_plain.avro", projection, Some(1)).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(11, batches[0].num_columns());
@@ -136,7 +135,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = None;
- let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.avro", projection, None).await?;
let x: Vec<String> = exec
.schema()
@@ -188,7 +187,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![1]);
- let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.avro", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
@@ -218,7 +217,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0]);
- let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.avro", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
@@ -245,7 +244,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![10]);
- let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.avro", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
@@ -272,7 +271,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![6]);
- let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.avro", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
@@ -302,7 +301,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![7]);
- let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.avro", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
@@ -332,7 +331,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![9]);
- let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.avro", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
@@ -359,36 +358,13 @@ mod tests {
async fn get_exec(
file_name: &str,
- projection: &Option<Vec<usize>>,
+ projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let testdata = crate::test_util::arrow_test_data();
- let filename = format!("{}/avro/{}", testdata, file_name);
+ let store_root = format!("{}/avro", testdata);
let format = AvroFormat {};
- let file_schema = format
- .infer_schema(local_object_reader_stream(vec![filename.clone()]))
- .await
- .expect("Schema inference");
- let statistics = format
- .infer_stats(local_object_reader(filename.clone()), file_schema.clone())
- .await
- .expect("Stats inference");
- let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
- let exec = format
- .create_physical_plan(
- FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
- file_schema,
- file_groups,
- statistics,
- projection: projection.clone(),
- limit,
- table_partition_cols: vec![],
- },
- &[],
- )
- .await?;
- Ok(exec)
+ scan_format(&format, &store_root, file_name, projection, limit).await
}
}
@@ -397,18 +373,17 @@ mod tests {
mod tests {
use super::*;
- use crate::datafusion_data_access::object_store::local::local_object_reader_stream;
+ use super::super::test_util::scan_format;
use crate::error::DataFusionError;
#[tokio::test]
async fn test() -> Result<()> {
+ let format = AvroFormat {};
let testdata = crate::test_util::arrow_test_data();
- let filename = format!("{}/avro/alltypes_plain.avro", testdata);
- let schema_result = AvroFormat {}
- .infer_schema(local_object_reader_stream(vec![filename]))
- .await;
+ let filename = "avro/alltypes_plain.avro";
+ let result = scan_format(&format, &testdata, filename, None, None).await;
assert!(matches!(
- schema_result,
+ result,
Err(DataFusionError::NotImplemented(msg))
if msg == *"cannot read avro schema without the 'avro' feature enabled"
));
diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs
index cbc2adca1..0d665e9ef 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
-use futures::StreamExt;
+use datafusion_data_access::FileMeta;
use super::FileFormat;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
@@ -32,7 +32,7 @@ use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
-use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
+use datafusion_data_access::object_store::ObjectStore;
/// The default file extension of csv files
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
@@ -93,13 +93,17 @@ impl FileFormat for CsvFormat {
self
}
- async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
+ async fn infer_schema(
+ &self,
+ store: &Arc<dyn ObjectStore>,
+ files: &[FileMeta],
+ ) -> Result<SchemaRef> {
let mut schemas = vec![];
- let mut records_to_read = self.schema_infer_max_rec.unwrap_or(std::usize::MAX);
+ let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
- while let Some(obj_reader) = readers.next().await {
- let mut reader = obj_reader?.sync_reader()?;
+ for file in files {
+ let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
&mut reader,
self.delimiter,
@@ -122,8 +126,9 @@ impl FileFormat for CsvFormat {
async fn infer_stats(
&self,
- _reader: Arc<dyn ObjectReader>,
+ _store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
+ _file: &FileMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
@@ -142,16 +147,11 @@ impl FileFormat for CsvFormat {
mod tests {
use arrow::array::StringArray;
+ use super::super::test_util::scan_format;
use super::*;
- use crate::datasource::listing::local_unpartitioned_file;
+ use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
- use crate::{
- datafusion_data_access::object_store::local::{
- local_object_reader, local_object_reader_stream, LocalFileSystem,
- },
- datasource::file_format::FileScanConfig,
- physical_plan::collect,
- };
+ use futures::StreamExt;
#[tokio::test]
async fn read_small_batches() -> Result<()> {
@@ -159,7 +159,7 @@ mod tests {
let ctx = SessionContext::with_config(config);
// skip column 9 that overflows the automaticly discovered column type of i64 (u64 would work)
let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]);
- let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
+ let exec = get_exec("aggregate_test_100.csv", projection, None).await?;
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx)?;
@@ -186,7 +186,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0, 1, 2, 3]);
- let exec = get_exec("aggregate_test_100.csv", &projection, Some(1)).await?;
+ let exec = get_exec("aggregate_test_100.csv", projection, Some(1)).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(4, batches[0].num_columns());
@@ -198,7 +198,7 @@ mod tests {
#[tokio::test]
async fn infer_schema() -> Result<()> {
let projection = None;
- let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
+ let exec = get_exec("aggregate_test_100.csv", projection, None).await?;
let x: Vec<String> = exec
.schema()
@@ -233,7 +233,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0]);
- let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
+ let exec = get_exec("aggregate_test_100.csv", projection, None).await?;
let batches = collect(exec, task_ctx).await.expect("Collect batches");
@@ -258,35 +258,11 @@ mod tests {
async fn get_exec(
file_name: &str,
- projection: &Option<Vec<usize>>,
+ projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
- let testdata = crate::test_util::arrow_test_data();
- let filename = format!("{}/csv/{}", testdata, file_name);
+ let root = format!("{}/csv", crate::test_util::arrow_test_data());
let format = CsvFormat::default();
- let file_schema = format
- .infer_schema(local_object_reader_stream(vec![filename.clone()]))
- .await
- .expect("Schema inference");
- let statistics = format
- .infer_stats(local_object_reader(filename.clone()), file_schema.clone())
- .await
- .expect("Stats inference");
- let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
- let exec = format
- .create_physical_plan(
- FileScanConfig {
- object_store: Arc::new(LocalFileSystem),
- file_schema,
- file_groups,
- statistics,
- projection: projection.clone(),
- limit,
- table_partition_cols: vec![],
- },
- &[],
- )
- .await?;
- Ok(exec)
+ scan_format(&format, &root, file_name, projection, limit).await
}
}
diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs
index cd4fd5810..e9b49c42d 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef;
use arrow::json::reader::infer_json_schema_from_iterator;
use arrow::json::reader::ValueIter;
use async_trait::async_trait;
-use futures::StreamExt;
+use datafusion_data_access::{object_store::ObjectStore, FileMeta};
use super::FileFormat;
use super::FileScanConfig;
@@ -36,7 +36,6 @@ use crate::logical_plan::Expr;
use crate::physical_plan::file_format::NdJsonExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
-use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
/// The default file extension of json files
pub const DEFAULT_JSON_EXTENSION: &str = ".json";
@@ -69,11 +68,16 @@ impl FileFormat for JsonFormat {
self
}
- async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
+ async fn infer_schema(
+ &self,
+ store: &Arc<dyn ObjectStore>,
+ files: &[FileMeta],
+ ) -> Result<SchemaRef> {
let mut schemas = Vec::new();
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
- while let Some(obj_reader) = readers.next().await {
- let mut reader = BufReader::new(obj_reader?.sync_reader()?);
+ for file in files {
+ let reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
+ let mut reader = BufReader::new(reader);
let iter = ValueIter::new(&mut reader, None);
let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
let should_take = records_to_read > 0;
@@ -94,8 +98,9 @@ impl FileFormat for JsonFormat {
async fn infer_stats(
&self,
- _reader: Arc<dyn ObjectReader>,
+ _store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
+ _file: &FileMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
@@ -112,15 +117,16 @@ impl FileFormat for JsonFormat {
#[cfg(test)]
mod tests {
+ use super::super::test_util::scan_format;
use arrow::array::Int64Array;
+ use futures::StreamExt;
use super::*;
use crate::prelude::{SessionConfig, SessionContext};
use crate::{
datafusion_data_access::object_store::local::{
- local_object_reader, local_object_reader_stream, LocalFileSystem,
+ local_unpartitioned_file, LocalFileSystem,
},
- datasource::{file_format::FileScanConfig, listing::local_unpartitioned_file},
physical_plan::collect,
};
@@ -129,7 +135,7 @@ mod tests {
let config = SessionConfig::new().with_batch_size(2);
let ctx = SessionContext::with_config(config);
let projection = None;
- let exec = get_exec(&projection, None).await?;
+ let exec = get_exec(projection, None).await?;
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx)?;
@@ -156,7 +162,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = None;
- let exec = get_exec(&projection, Some(1)).await?;
+ let exec = get_exec(projection, Some(1)).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(4, batches[0].num_columns());
@@ -168,7 +174,7 @@ mod tests {
#[tokio::test]
async fn infer_schema() -> Result<()> {
let projection = None;
- let exec = get_exec(&projection, None).await?;
+ let exec = get_exec(projection, None).await?;
let x: Vec<String> = exec
.schema()
@@ -186,7 +192,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0]);
- let exec = get_exec(&projection, None).await?;
+ let exec = get_exec(projection, None).await?;
let batches = collect(exec, task_ctx).await.expect("Collect batches");
@@ -213,48 +219,25 @@ mod tests {
}
async fn get_exec(
- projection: &Option<Vec<usize>>,
+ projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let filename = "tests/jsons/2.json";
let format = JsonFormat::default();
- let file_schema = format
- .infer_schema(local_object_reader_stream(vec![filename.to_owned()]))
- .await
- .expect("Schema inference");
- let statistics = format
- .infer_stats(
- local_object_reader(filename.to_owned()),
- file_schema.clone(),
- )
- .await
- .expect("Stats inference");
- let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
- let exec = format
- .create_physical_plan(
- FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
- file_schema,
- file_groups,
- statistics,
- projection: projection.clone(),
- limit,
- table_partition_cols: vec![],
- },
- &[],
- )
- .await?;
- Ok(exec)
+ scan_format(&format, ".", filename, projection, limit).await
}
#[tokio::test]
async fn infer_schema_with_limit() {
+ let store = Arc::new(LocalFileSystem {}) as _;
let filename = "tests/jsons/schema_infer_limit.json";
let format = JsonFormat::default().with_schema_infer_max_rec(Some(3));
+
let file_schema = format
- .infer_schema(local_object_reader_stream(vec![filename.to_owned()]))
+ .infer_schema(&store, &[local_unpartitioned_file(filename.to_string())])
.await
.expect("Schema inference");
+
let fields = file_schema
.fields()
.iter()
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index bb37b7557..669ed0efd 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -36,8 +36,8 @@ use crate::physical_plan::file_format::FileScanConfig;
use crate::physical_plan::{ExecutionPlan, Statistics};
use async_trait::async_trait;
-
-use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
/// This trait abstracts all the file format specific implementations
/// from the `TableProvider`. This helps code re-utilization across
@@ -52,7 +52,11 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
/// be analysed up to a given number of records or files (as specified in the
/// format config) then give the estimated common schema. This might fail if
/// the files have schemas that cannot be merged.
- async fn infer_schema(&self, readers: ObjectReaderStream) -> Result<SchemaRef>;
+ async fn infer_schema(
+ &self,
+ store: &Arc<dyn ObjectStore>,
+ files: &[FileMeta],
+ ) -> Result<SchemaRef>;
/// Infer the statistics for the provided object. The cost and accuracy of the
/// estimated statistics might vary greatly between file formats.
@@ -63,8 +67,9 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
/// TODO: should the file source return statistics for only columns referred to in the table schema?
async fn infer_stats(
&self,
- reader: Arc<dyn ObjectReader>,
+ store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
+ file: &FileMeta,
) -> Result<Statistics>;
/// Take a list of files and convert it to the appropriate executor
@@ -75,3 +80,51 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
filters: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>>;
}
+
+#[cfg(test)]
+pub(crate) mod test_util {
+ use super::*;
+ use crate::datasource::listing::PartitionedFile;
+ use datafusion_data_access::object_store::local::{
+ local_unpartitioned_file, LocalFileSystem,
+ };
+
+ pub async fn scan_format(
+ format: &dyn FileFormat,
+ store_root: &str,
+ file_name: &str,
+ projection: Option<Vec<usize>>,
+ limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let store = Arc::new(LocalFileSystem {}) as _;
+ let meta = local_unpartitioned_file(format!("{}/{}", store_root, file_name));
+
+ let file_schema = format.infer_schema(&store, &[meta.clone()]).await?;
+
+ let statistics = format
+ .infer_stats(&store, file_schema.clone(), &meta)
+ .await?;
+
+ let file_groups = vec![vec![PartitionedFile {
+ file_meta: meta,
+ partition_values: vec![],
+ range: None,
+ }]];
+
+ let exec = format
+ .create_physical_plan(
+ FileScanConfig {
+ object_store: store,
+ file_schema,
+ file_groups,
+ statistics,
+ projection,
+ limit,
+ table_partition_cols: vec![],
+ },
+ &[],
+ )
+ .await?;
+ Ok(exec)
+ }
+}
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index a9a0c788b..42bff573c 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -24,7 +24,7 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
-use futures::TryStreamExt;
+use datafusion_data_access::FileMeta;
use hashbrown::HashMap;
use parquet::arrow::ArrowReader;
use parquet::arrow::ParquetFileArrowReader;
@@ -50,9 +50,9 @@ use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
use crate::physical_plan::{metrics, ExecutionPlan};
use crate::physical_plan::{Accumulator, Statistics};
-use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
+use datafusion_data_access::object_store::{ObjectReader, ObjectStore};
-/// The default file exetension of parquet files
+/// The default file extension of parquet files
pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
/// The Apache Parquet `FileFormat` implementation
@@ -88,24 +88,27 @@ impl FileFormat for ParquetFormat {
self
}
- async fn infer_schema(&self, readers: ObjectReaderStream) -> Result<SchemaRef> {
- let merged_schema = readers
- .map_err(DataFusionError::IoError)
- .try_fold(Schema::empty(), |acc, reader| async {
- let next_schema = fetch_schema(reader);
- Schema::try_merge([acc, next_schema?])
- .map_err(DataFusionError::ArrowError)
- })
- .await?;
- Ok(Arc::new(merged_schema))
+ async fn infer_schema(
+ &self,
+ store: &Arc<dyn ObjectStore>,
+ files: &[FileMeta],
+ ) -> Result<SchemaRef> {
+ let mut schemas = Vec::with_capacity(files.len());
+ for file in files {
+ let schema = fetch_schema(store.as_ref(), file)?;
+ schemas.push(schema)
+ }
+ let schema = Schema::try_merge(schemas)?;
+ Ok(Arc::new(schema))
}
async fn infer_stats(
&self,
- reader: Arc<dyn ObjectReader>,
+ store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
+ file: &FileMeta,
) -> Result<Statistics> {
- let stats = fetch_statistics(reader, table_schema)?;
+ let stats = fetch_statistics(store.as_ref(), table_schema, file)?;
Ok(stats)
}
@@ -274,7 +277,8 @@ fn summarize_min_max(
}
/// Read and parse the schema of the Parquet file at location `path`
-fn fetch_schema(object_reader: Arc<dyn ObjectReader>) -> Result<Schema> {
+fn fetch_schema(store: &dyn ObjectStore, file: &FileMeta) -> Result<Schema> {
+ let object_reader = store.file_reader(file.sized_file.clone())?;
let obj_reader = ChunkObjectReader {
object_reader,
bytes_scanned: None,
@@ -288,9 +292,11 @@ fn fetch_schema(object_reader: Arc<dyn ObjectReader>) -> Result<Schema> {
/// Read and parse the statistics of the Parquet file at location `path`
fn fetch_statistics(
- object_reader: Arc<dyn ObjectReader>,
+ store: &dyn ObjectStore,
table_schema: SchemaRef,
+ file: &FileMeta,
) -> Result<Statistics> {
+ let object_reader = store.file_reader(file.sized_file.clone())?;
let obj_reader = ChunkObjectReader {
object_reader,
bytes_scanned: None,
@@ -396,56 +402,17 @@ impl ChunkReader for ChunkObjectReader {
}
#[cfg(test)]
-mod tests {
- use crate::datasource::listing::local_unpartitioned_file;
- use crate::physical_plan::collect;
- use datafusion_data_access::object_store::local::{
- local_object_reader, local_object_reader_stream, LocalFileSystem,
- };
-
+pub(crate) mod test_util {
use super::*;
-
- use crate::physical_plan::metrics::MetricValue;
- use crate::prelude::{SessionConfig, SessionContext};
- use arrow::array::{
- ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
- StringArray, TimestampNanosecondArray,
- };
use arrow::record_batch::RecordBatch;
- use datafusion_common::ScalarValue;
- use futures::StreamExt;
+ use datafusion_data_access::object_store::local::local_unpartitioned_file;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use tempfile::NamedTempFile;
- // Add a new column with the specified field name to the RecordBatch
- fn add_to_batch(
- batch: &RecordBatch,
- field_name: &str,
- array: ArrayRef,
- ) -> RecordBatch {
- let mut fields = batch.schema().fields().clone();
- fields.push(Field::new(field_name, array.data_type().clone(), true));
- let schema = Arc::new(Schema::new(fields));
-
- let mut columns = batch.columns().to_vec();
- columns.push(array);
- RecordBatch::try_new(schema, columns).expect("error; creating record batch")
- }
-
- fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
- columns.into_iter().fold(
- RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
- |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()),
- )
- }
-
- async fn create_table(
+ pub async fn store_parquet(
batches: Vec<RecordBatch>,
- ) -> Result<(Vec<NamedTempFile>, Schema)> {
- let merged_schema =
- Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone()))?;
-
+ ) -> Result<(Vec<FileMeta>, Vec<NamedTempFile>)> {
let files: Vec<_> = batches
.into_iter()
.map(|batch| {
@@ -464,8 +431,33 @@ mod tests {
})
.collect();
- Ok((files, merged_schema))
+ let meta: Vec<_> = files
+ .iter()
+ .map(|f| local_unpartitioned_file(f.path().to_string_lossy().to_string()))
+ .collect();
+
+ Ok((meta, files))
}
+}
+
+#[cfg(test)]
+mod tests {
+ use super::super::test_util::scan_format;
+ use crate::physical_plan::collect;
+ use datafusion_data_access::object_store::local::LocalFileSystem;
+
+ use super::*;
+
+ use crate::datasource::file_format::parquet::test_util::store_parquet;
+ use crate::physical_plan::metrics::MetricValue;
+ use crate::prelude::{SessionConfig, SessionContext};
+ use arrow::array::{
+ ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
+ StringArray, TimestampNanosecondArray,
+ };
+ use arrow::record_batch::RecordBatch;
+ use datafusion_common::ScalarValue;
+ use futures::StreamExt;
#[tokio::test]
async fn read_merged_batches() -> Result<()> {
@@ -474,16 +466,16 @@ mod tests {
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
- let batch1 = create_batch(vec![("c1", c1.clone())]);
+ let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap();
+ let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
- let batch2 = create_batch(vec![("c2", c2)]);
+ let store = Arc::new(LocalFileSystem {}) as _;
+ let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
- let (files, schema) = create_table(vec![batch1, batch2]).await?;
- let table_schema = Arc::new(schema);
-
- let reader = local_object_reader(files[0].path().to_string_lossy().to_string());
+ let format = ParquetFormat::default();
+ let schema = format.infer_schema(&store, &meta).await.unwrap();
- let stats = fetch_statistics(reader, table_schema.clone())?;
+ let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0])?;
assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
@@ -491,9 +483,7 @@ mod tests {
assert_eq!(c1_stats.null_count, Some(1));
assert_eq!(c2_stats.null_count, Some(3));
- let reader = local_object_reader(files[1].path().to_string_lossy().to_string());
-
- let stats = fetch_statistics(reader, table_schema)?;
+ let stats = fetch_statistics(store.as_ref(), schema, &meta[1])?;
assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
@@ -510,7 +500,7 @@ mod tests {
let config = SessionConfig::new().with_batch_size(2);
let ctx = SessionContext::with_config(config);
let projection = None;
- let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx)?;
@@ -536,14 +526,14 @@ mod tests {
async fn capture_bytes_scanned_metric() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
let ctx = SessionContext::with_config(config);
- let projection = None;
// Read the full file
- let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+ let projection = None;
+ let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
// Read only one column. This should scan less data.
- let exec_projected =
- get_exec("alltypes_plain.parquet", &Some(vec![0]), None).await?;
+ let projection = Some(vec![0]);
+ let exec_projected = get_exec("alltypes_plain.parquet", projection, None).await?;
let task_ctx = ctx.task_ctx();
@@ -561,7 +551,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = None;
- let exec = get_exec("alltypes_plain.parquet", &projection, Some(1)).await?;
+ let exec = get_exec("alltypes_plain.parquet", projection, Some(1)).await?;
// note: even if the limit is set, the executor rounds up to the batch size
assert_eq!(exec.statistics().num_rows, Some(8));
@@ -580,7 +570,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = None;
- let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
let x: Vec<String> = exec
.schema()
@@ -618,7 +608,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![1]);
- let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
@@ -648,7 +638,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0]);
- let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
@@ -675,7 +665,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![10]);
- let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
@@ -702,7 +692,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![6]);
- let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
@@ -732,7 +722,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![7]);
- let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
@@ -762,7 +752,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![9]);
- let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+ let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
@@ -800,35 +790,11 @@ mod tests {
async fn get_exec(
file_name: &str,
- projection: &Option<Vec<usize>>,
+ projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let testdata = crate::test_util::parquet_test_data();
- let filename = format!("{}/{}", testdata, file_name);
let format = ParquetFormat::default();
- let file_schema = format
- .infer_schema(local_object_reader_stream(vec![filename.clone()]))
- .await
- .expect("Schema inference");
- let statistics = format
- .infer_stats(local_object_reader(filename.clone()), file_schema.clone())
- .await
- .expect("Stats inference");
- let file_groups = vec![vec![local_unpartitioned_file(filename.clone())]];
- let exec = format
- .create_physical_plan(
- FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
- file_schema,
- file_groups,
- statistics,
- projection: projection.clone(),
- limit,
- table_partition_cols: vec![],
- },
- &[],
- )
- .await?;
- Ok(exec)
+ scan_format(&format, &testdata, file_name, projection, limit).await
}
}
diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs
index d7932b38f..0f0a7d20e 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -22,7 +22,7 @@ mod helpers;
mod table;
use datafusion_common::ScalarValue;
-use datafusion_data_access::{object_store::local, FileMeta, Result, SizedFile};
+use datafusion_data_access::{FileMeta, Result, SizedFile};
use futures::Stream;
use std::pin::Pin;
@@ -88,11 +88,12 @@ impl std::fmt::Display for PartitionedFile {
}
}
-/// Helper method to fetch the file size and date at given path and create a `FileMeta`
-pub fn local_unpartitioned_file(file: String) -> PartitionedFile {
- PartitionedFile {
- file_meta: local::local_unpartitioned_file(file),
- partition_values: vec![],
- range: None,
+impl From<FileMeta> for PartitionedFile {
+ fn from(file_meta: FileMeta) -> Self {
+ PartitionedFile {
+ file_meta,
+ partition_values: vec![],
+ range: None,
+ }
}
}
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index bde88b659..1dceb8b35 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -21,7 +21,7 @@ use std::{any::Any, sync::Arc};
use arrow::datatypes::{Field, Schema, SchemaRef};
use async_trait::async_trait;
-use futures::StreamExt;
+use futures::{StreamExt, TryStreamExt};
use crate::datasource::{
file_format::{
@@ -212,15 +212,13 @@ impl ListingOptions {
/// locally or ask a remote service to do it (e.g a scheduler).
pub async fn infer_schema<'a>(
&'a self,
- object_store: Arc<dyn ObjectStore>,
+ store: Arc<dyn ObjectStore>,
path: &'a str,
) -> Result<SchemaRef> {
- let file_stream = object_store
- .glob_file_with_suffix(path, &self.file_extension)
- .await?
- .map(move |file_meta| object_store.file_reader(file_meta?.sized_file));
- let file_schema = self.format.infer_schema(Box::pin(file_stream)).await?;
- Ok(file_schema)
+ let extension = &self.file_extension;
+ let list_stream = store.glob_file_with_suffix(path, extension).await?;
+ let files: Vec<_> = list_stream.try_collect().await?;
+ self.format.infer_schema(&store, &files).await
}
}
@@ -377,11 +375,13 @@ impl ListingTable {
async move {
let part_file = part_file?;
let statistics = if self.options.collect_stat {
- let object_reader = object_store
- .file_reader(part_file.file_meta.sized_file.clone())?;
self.options
.format
- .infer_stats(object_reader, self.file_schema.clone())
+ .infer_stats(
+ &object_store,
+ self.file_schema.clone(),
+ &part_file.file_meta,
+ )
.await?
} else {
Statistics::default()
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index a25ad60e1..fc56ce1d8 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -165,15 +165,13 @@ impl ExecutionPlan for AvroExec {
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
- use crate::datasource::{
- file_format::{avro::AvroFormat, FileFormat},
- listing::local_unpartitioned_file,
- };
+ use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
+ use crate::datasource::listing::PartitionedFile;
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_data_access::object_store::local::{
- local_object_reader_stream, LocalFileSystem,
+ local_unpartitioned_file, LocalFileSystem,
};
use futures::StreamExt;
@@ -183,12 +181,15 @@ mod tests {
async fn avro_exec_without_partition() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
+ let store = Arc::new(LocalFileSystem {}) as _;
+ let meta = local_unpartitioned_file(filename);
+
+ let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?;
+
let avro_exec = AvroExec::new(FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
- file_groups: vec![vec![local_unpartitioned_file(filename.clone())]],
- file_schema: AvroFormat {}
- .infer_schema(local_object_reader_stream(vec![filename]))
- .await?,
+ file_groups: vec![vec![meta.into()]],
+ file_schema,
statistics: Statistics::default(),
projection: Some(vec![0, 1, 2]),
limit: None,
@@ -240,9 +241,9 @@ mod tests {
async fn avro_exec_missing_column() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
- let actual_schema = AvroFormat {}
- .infer_schema(local_object_reader_stream(vec![filename.clone()]))
- .await?;
+ let store = Arc::new(LocalFileSystem {}) as _;
+ let meta = local_unpartitioned_file(filename);
+ let actual_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?;
let mut fields = actual_schema.fields().clone();
fields.push(Field::new("missing_col", DataType::Int32, true));
@@ -252,8 +253,8 @@ mod tests {
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
let avro_exec = AvroExec::new(FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
- file_groups: vec![vec![local_unpartitioned_file(filename.clone())]],
+ object_store: store,
+ file_groups: vec![vec![meta.into()]],
file_schema,
statistics: Statistics::default(),
projection,
@@ -306,18 +307,19 @@ mod tests {
async fn avro_exec_with_partition() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
- let mut partitioned_file = local_unpartitioned_file(filename.clone());
+ let store = Arc::new(LocalFileSystem {}) as _;
+ let meta = local_unpartitioned_file(filename);
+ let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?;
+
+ let mut partitioned_file = PartitionedFile::from(meta);
partitioned_file.partition_values =
vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];
- let file_schema = AvroFormat {}
- .infer_schema(local_object_reader_stream(vec![filename]))
- .await?;
let avro_exec = AvroExec::new(FileScanConfig {
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table schema.
projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
- object_store: Arc::new(LocalFileSystem {}),
+ object_store: store,
file_groups: vec![vec![partitioned_file]],
file_schema,
statistics: Statistics::default(),
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index 818496d13..5470f6d57 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -192,25 +192,30 @@ mod tests {
use arrow::datatypes::{Field, Schema};
use futures::StreamExt;
- use crate::datafusion_data_access::object_store::local::{
- local_object_reader_stream, LocalFileSystem,
- };
- use crate::datasource::{
- file_format::{json::JsonFormat, FileFormat},
- listing::local_unpartitioned_file,
- };
+ use crate::datafusion_data_access::object_store::local::LocalFileSystem;
+ use crate::datasource::file_format::{json::JsonFormat, FileFormat};
+ use crate::datasource::listing::PartitionedFile;
use crate::prelude::NdJsonReadOptions;
use crate::prelude::*;
+ use datafusion_data_access::object_store::local::local_unpartitioned_file;
+ use datafusion_data_access::object_store::ObjectStore;
use tempfile::TempDir;
use super::*;
const TEST_DATA_BASE: &str = "tests/jsons";
- async fn infer_schema(path: String) -> Result<SchemaRef> {
- JsonFormat::default()
- .infer_schema(local_object_reader_stream(vec![path]))
+ async fn prepare_store(
+ ) -> (Arc<dyn ObjectStore>, Vec<Vec<PartitionedFile>>, SchemaRef) {
+ let store = Arc::new(LocalFileSystem {}) as _;
+ let path = format!("{}/1.json", TEST_DATA_BASE);
+ let meta = local_unpartitioned_file(path);
+ let schema = JsonFormat::default()
+ .infer_schema(&store, &[meta.clone()])
.await
+ .unwrap();
+
+ (store, vec![vec![meta.into()]], schema)
}
#[tokio::test]
@@ -218,11 +223,12 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
use arrow::datatypes::DataType;
- let path = format!("{}/1.json", TEST_DATA_BASE);
+ let (object_store, file_groups, file_schema) = prepare_store().await;
+
let exec = NdJsonExec::new(FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
- file_groups: vec![vec![local_unpartitioned_file(path.clone())]],
- file_schema: infer_schema(path).await?,
+ object_store,
+ file_groups,
+ file_schema,
statistics: Statistics::default(),
projection: None,
limit: Some(3),
@@ -274,9 +280,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
use arrow::datatypes::DataType;
- let path = format!("{}/1.json", TEST_DATA_BASE);
-
- let actual_schema = infer_schema(path.clone()).await?;
+ let (object_store, file_groups, actual_schema) = prepare_store().await;
let mut fields = actual_schema.fields().clone();
fields.push(Field::new("missing_col", DataType::Int32, true));
@@ -285,8 +289,8 @@ mod tests {
let file_schema = Arc::new(Schema::new(fields));
let exec = NdJsonExec::new(FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
- file_groups: vec![vec![local_unpartitioned_file(path.clone())]],
+ object_store,
+ file_groups,
file_schema,
statistics: Statistics::default(),
projection: None,
@@ -315,11 +319,12 @@ mod tests {
async fn nd_json_exec_file_projection() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
- let path = format!("{}/1.json", TEST_DATA_BASE);
+ let (object_store, file_groups, file_schema) = prepare_store().await;
+
let exec = NdJsonExec::new(FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
- file_groups: vec![vec![local_unpartitioned_file(path.clone())]],
- file_schema: infer_schema(path).await?,
+ object_store,
+ file_groups,
+ file_schema,
statistics: Statistics::default(),
projection: Some(vec![0, 2]),
limit: None,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index ab4ab3fbe..0931484ef 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -630,17 +630,15 @@ mod tests {
use crate::{
assert_batches_sorted_eq, assert_contains,
datafusion_data_access::{
- object_store::local::{local_object_reader_stream, LocalFileSystem},
- FileMeta, SizedFile,
- },
- datasource::{
- file_format::{parquet::ParquetFormat, FileFormat},
- listing::local_unpartitioned_file,
+ object_store::local::LocalFileSystem, FileMeta, SizedFile,
},
+ datasource::file_format::{parquet::ParquetFormat, FileFormat},
physical_plan::collect,
};
use super::*;
+ use crate::datasource::file_format::parquet::test_util::store_parquet;
+ use crate::datasource::file_format::test_util::scan_format;
use crate::datasource::listing::FileRange;
use crate::execution::options::CsvReadOptions;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
@@ -649,16 +647,12 @@ mod tests {
array::{Int64Array, Int8Array, StringArray},
datatypes::{DataType, Field},
};
- use datafusion_data_access::object_store::local;
+ use datafusion_data_access::object_store::local::local_unpartitioned_file;
use datafusion_expr::{col, lit};
use futures::StreamExt;
use parquet::{
- arrow::ArrowWriter,
basic::Type as PhysicalType,
- file::{
- metadata::RowGroupMetaData, properties::WriterProperties,
- statistics::Statistics as ParquetStatistics,
- },
+ file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
schema::types::SchemaDescPtr,
};
use std::fs::File;
@@ -673,45 +667,16 @@ mod tests {
schema: Option<SchemaRef>,
predicate: Option<Expr>,
) -> Result<Vec<RecordBatch>> {
- // When vec is dropped, temp files are deleted
- let files: Vec<_> = batches
- .into_iter()
- .map(|batch| {
- let output = tempfile::NamedTempFile::new().expect("creating temp file");
-
- let props = WriterProperties::builder().build();
- let file: std::fs::File = (*output.as_file())
- .try_clone()
- .expect("cloning file descriptor");
- let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))
- .expect("creating writer");
-
- writer.write(&batch).expect("Writing batch");
- writer.close().unwrap();
- output
- })
- .collect();
-
- let file_names: Vec<_> = files
- .iter()
- .map(|t| t.path().to_string_lossy().to_string())
- .collect();
-
- // Now, read the files back in
- let file_groups: Vec<_> = file_names
- .iter()
- .map(|name| local_unpartitioned_file(name.clone()))
- .collect();
-
- // Infer the schema (if not provided)
let file_schema = match schema {
- Some(provided_schema) => provided_schema,
- None => ParquetFormat::default()
- .infer_schema(local_object_reader_stream(file_names))
- .await
- .expect("inferring schema"),
+ Some(schema) => schema,
+ None => Arc::new(Schema::try_merge(
+ batches.iter().map(|b| b.schema().as_ref().clone()),
+ )?),
};
+ let (meta, _files) = store_parquet(batches).await?;
+ let file_groups = meta.into_iter().map(Into::into).collect();
+
// prepare the scan
let parquet_exec = ParquetExec::new(
FileScanConfig {
@@ -1050,26 +1015,16 @@ mod tests {
#[tokio::test]
async fn parquet_exec_with_projection() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
let testdata = crate::test_util::parquet_test_data();
- let filename = format!("{}/alltypes_plain.parquet", testdata);
- let parquet_exec = ParquetExec::new(
- FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
- file_groups: vec![vec![local_unpartitioned_file(filename.clone())]],
- file_schema: ParquetFormat::default()
- .infer_schema(local_object_reader_stream(vec![filename]))
- .await?,
- statistics: Statistics::default(),
- projection: Some(vec![0, 1, 2]),
- limit: None,
- table_partition_cols: vec![],
- },
- None,
- );
+ let filename = "alltypes_plain.parquet";
+ let format = ParquetFormat::default();
+ let parquet_exec =
+ scan_format(&format, &testdata, filename, Some(vec![0, 1, 2]), None)
+ .await
+ .unwrap();
assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
+ let task_ctx = SessionContext::new().task_ctx();
let mut results = parquet_exec.execute(0, task_ctx)?;
let batch = results.next().await.unwrap()?;
@@ -1095,9 +1050,9 @@ mod tests {
#[tokio::test]
async fn parquet_exec_with_range() -> Result<()> {
- fn file_range(file: String, start: i64, end: i64) -> PartitionedFile {
+ fn file_range(meta: &FileMeta, start: i64, end: i64) -> PartitionedFile {
PartitionedFile {
- file_meta: local::local_unpartitioned_file(file),
+ file_meta: meta.clone(),
partition_values: vec![],
range: Some(FileRange { start, end }),
}
@@ -1137,15 +1092,19 @@ mod tests {
let session_ctx = SessionContext::new();
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/alltypes_plain.parquet", testdata);
+
+ let meta = local_unpartitioned_file(filename);
+
+ let store = Arc::new(LocalFileSystem {}) as _;
let file_schema = ParquetFormat::default()
- .infer_schema(local_object_reader_stream(vec![filename.clone()]))
+ .infer_schema(&store, &[meta.clone()])
.await?;
- let group_empty = vec![vec![file_range(filename.clone(), 0, 5)]];
- let group_contain = vec![vec![file_range(filename.clone(), 5, i64::MAX)]];
+ let group_empty = vec![vec![file_range(&meta, 0, 5)]];
+ let group_contain = vec![vec![file_range(&meta, 5, i64::MAX)]];
let group_all = vec![vec![
- file_range(filename.clone(), 0, 5),
- file_range(filename.clone(), 5, i64::MAX),
+ file_range(&meta, 0, 5),
+ file_range(&meta, 5, i64::MAX),
]];
assert_parquet_read(
@@ -1174,19 +1133,30 @@ mod tests {
let task_ctx = session_ctx.task_ctx();
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/alltypes_plain.parquet", testdata);
- let mut partitioned_file = local_unpartitioned_file(filename.clone());
- partitioned_file.partition_values = vec![
- ScalarValue::Utf8(Some("2021".to_owned())),
- ScalarValue::Utf8(Some("10".to_owned())),
- ScalarValue::Utf8(Some("26".to_owned())),
- ];
+ let store = Arc::new(LocalFileSystem {}) as _;
+
+ let meta = local_unpartitioned_file(filename);
+
+ let schema = ParquetFormat::default()
+ .infer_schema(&store, &[meta.clone()])
+ .await
+ .unwrap();
+
+ let partitioned_file = PartitionedFile {
+ file_meta: meta,
+ partition_values: vec![
+ ScalarValue::Utf8(Some("2021".to_owned())),
+ ScalarValue::Utf8(Some("10".to_owned())),
+ ScalarValue::Utf8(Some("26".to_owned())),
+ ],
+ range: None,
+ };
+
let parquet_exec = ParquetExec::new(
FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
+ object_store: store,
file_groups: vec![vec![partitioned_file]],
- file_schema: ParquetFormat::default()
- .infer_schema(local_object_reader_stream(vec![filename]))
- .await?,
+ file_schema: schema,
statistics: Statistics::default(),
// file has 10 cols so index 12 should be month
projection: Some(vec![0, 1, 2, 12]),
@@ -1229,8 +1199,6 @@ mod tests {
async fn parquet_exec_with_error() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
- let testdata = crate::test_util::parquet_test_data();
- let filename = format!("{}/alltypes_plain.parquet", testdata);
let partitioned_file = PartitionedFile {
file_meta: FileMeta {
sized_file: SizedFile {
@@ -1247,9 +1215,7 @@ mod tests {
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_groups: vec![vec![partitioned_file]],
- file_schema: ParquetFormat::default()
- .infer_schema(local_object_reader_stream(vec![filename]))
- .await?,
+ file_schema: Arc::new(Schema::empty()),
statistics: Statistics::default(),
projection: None,
limit: None,
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 1798232b3..815379de4 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -18,7 +18,7 @@
//! Common unit test utility methods
use crate::arrow::array::UInt32Array;
-use crate::datasource::{listing::local_unpartitioned_file, MemTable, TableProvider};
+use crate::datasource::{MemTable, TableProvider};
use crate::error::Result;
use crate::from_slice::FromSlice;
use crate::logical_plan::LogicalPlan;
@@ -28,7 +28,9 @@ use array::{Array, ArrayRef};
use arrow::array::{self, DecimalBuilder, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
-use datafusion_data_access::object_store::local::LocalFileSystem;
+use datafusion_data_access::object_store::local::{
+ local_unpartitioned_file, LocalFileSystem,
+};
use futures::{Future, FutureExt};
use std::fs::File;
use std::io::prelude::*;
@@ -108,10 +110,12 @@ pub fn partitioned_csv_config(
files
.into_iter()
- .map(|f| vec![local_unpartitioned_file(f.to_str().unwrap().to_owned())])
+ .map(
+ |f| vec![local_unpartitioned_file(f.to_str().unwrap().to_owned()).into()],
+ )
.collect::<Vec<_>>()
} else {
- vec![vec![local_unpartitioned_file(path)]]
+ vec![vec![local_unpartitioned_file(path).into()]]
};
Ok(FileScanConfig {
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index bda107f13..51ddff2fb 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -17,14 +17,12 @@
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
-use datafusion::datasource::listing::local_unpartitioned_file;
use datafusion::error::Result;
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::SessionContext;
-use datafusion_data_access::object_store::local::LocalFileSystem;
use datafusion_data_access::object_store::local::{
- local_object_reader, local_object_reader_stream,
+ local_unpartitioned_file, LocalFileSystem,
};
use datafusion_row::layout::RowType::{Compact, WordAligned};
use datafusion_row::reader::read_as_batch;
@@ -80,20 +78,24 @@ async fn get_exec(
) -> Result<Arc<dyn ExecutionPlan>> {
let testdata = datafusion::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, file_name);
+ let meta = local_unpartitioned_file(filename);
+
let format = ParquetFormat::default();
+ let store = Arc::new(LocalFileSystem {}) as _;
+
let file_schema = format
- .infer_schema(local_object_reader_stream(vec![filename.clone()]))
+ .infer_schema(&store, &[meta.clone()])
.await
.expect("Schema inference");
let statistics = format
- .infer_stats(local_object_reader(filename.clone()), file_schema.clone())
+ .infer_stats(&store, file_schema.clone(), &meta)
.await
.expect("Stats inference");
- let file_groups = vec![vec![local_unpartitioned_file(filename.clone())]];
+ let file_groups = vec![vec![meta.into()]];
let exec = format
.create_physical_plan(
FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
+ object_store: store,
file_schema,
file_groups,
statistics,
diff --git a/dev/build-arrow-ballista.sh b/dev/build-arrow-ballista.sh
index 0c3922627..ebc5fd736 100755
--- a/dev/build-arrow-ballista.sh
+++ b/dev/build-arrow-ballista.sh
@@ -23,8 +23,7 @@ set -e
rm -rf arrow-ballista 2>/dev/null
# clone the repo
-# TODO make repo/branch configurable
-git clone https://github.com/apache/arrow-ballista
+git clone https://github.com/tustvold/arrow-ballista -b fix-file-format
# update dependencies to local crates
python ./dev/make-ballista-deps-local.py