You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/07/04 10:14:45 UTC
[arrow-datafusion] branch master updated: Switch to object_store crate (#2489) (#2677)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new bf7564f62 Switch to object_store crate (#2489) (#2677)
bf7564f62 is described below
commit bf7564f62117487a9a397c25326fcda1ed22d08d
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Jul 4 11:14:39 2022 +0100
Switch to object_store crate (#2489) (#2677)
* Switch to object_store crate (#2489)
* Test fixes
* Update to object_store 0.2.0
* More windows pacification
* Fix windows test
* Fix windows test_prefix_path
* More windows fixes
* Simplify ListingTableUrl::strip_prefix
* Review feedback
* Update to latest arrow-rs
* Use ParquetRecordBatchStream
* Simplify predicate pruning
* Add host to ObjectStoreRegistry
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
datafusion/common/Cargo.toml | 1 +
datafusion/common/src/error.rs | 21 +
datafusion/core/Cargo.toml | 7 +-
datafusion/core/src/catalog/schema.rs | 3 -
datafusion/core/src/datasource/file_format/avro.rs | 19 +-
datafusion/core/src/datasource/file_format/csv.rs | 20 +-
datafusion/core/src/datasource/file_format/json.rs | 43 +-
datafusion/core/src/datasource/file_format/mod.rs | 16 +-
.../core/src/datasource/file_format/parquet.rs | 160 +++----
datafusion/core/src/datasource/listing/helpers.rs | 173 +++----
datafusion/core/src/datasource/listing/mod.rs | 32 +-
datafusion/core/src/datasource/listing/table.rs | 4 +-
datafusion/core/src/datasource/listing/url.rs | 134 +++---
datafusion/core/src/datasource/object_store.rs | 34 +-
datafusion/core/src/execution/runtime_env.rs | 8 +-
datafusion/core/src/lib.rs | 1 -
.../core/src/physical_plan/file_format/avro.rs | 119 +++--
.../core/src/physical_plan/file_format/csv.rs | 103 ++--
.../src/physical_plan/file_format/file_stream.rs | 314 ++++++++-----
.../core/src/physical_plan/file_format/json.rs | 81 ++--
.../core/src/physical_plan/file_format/mod.rs | 2 +-
.../core/src/physical_plan/file_format/parquet.rs | 522 +++++++++------------
datafusion/core/src/physical_plan/mod.rs | 6 +-
datafusion/core/src/test/mod.rs | 6 +-
datafusion/core/src/test/object_store.rs | 121 +----
datafusion/core/tests/path_partition.rs | 124 +++--
datafusion/core/tests/row.rs | 11 +-
datafusion/core/tests/sql/mod.rs | 15 +-
28 files changed, 1045 insertions(+), 1055 deletions(-)
diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index 367a8fb70..03809bcf5 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -41,6 +41,7 @@ pyarrow = ["pyo3"]
arrow = { version = "17.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.85.0", optional = true }
+object_store = { version = "0.3", optional = true }
ordered-float = "3.0"
parquet = { version = "17.0.0", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index 014034672..c1d0f29b1 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -49,6 +49,9 @@ pub enum DataFusionError {
/// Wraps an error from the Avro crate
#[cfg(feature = "avro")]
AvroError(AvroError),
+ /// Wraps an error from the object_store crate
+ #[cfg(feature = "object_store")]
+ ObjectStore(object_store::Error),
/// Error associated to I/O operations and associated traits.
IoError(io::Error),
/// Error returned when SQL is syntactically incorrect.
@@ -203,6 +206,20 @@ impl From<AvroError> for DataFusionError {
}
}
+#[cfg(feature = "object_store")]
+impl From<object_store::Error> for DataFusionError {
+ fn from(e: object_store::Error) -> Self {
+ DataFusionError::ObjectStore(e)
+ }
+}
+
+#[cfg(feature = "object_store")]
+impl From<object_store::path::Error> for DataFusionError {
+ fn from(e: object_store::path::Error) -> Self {
+ DataFusionError::ObjectStore(e.into())
+ }
+}
+
impl From<ParserError> for DataFusionError {
fn from(e: ParserError) -> Self {
DataFusionError::SQL(e)
@@ -264,6 +281,10 @@ impl Display for DataFusionError {
DataFusionError::JITError(ref desc) => {
write!(f, "JIT error: {}", desc)
}
+ #[cfg(feature = "object_store")]
+ DataFusionError::ObjectStore(ref desc) => {
+ write!(f, "Object Store error: {}", desc)
+ }
}
}
}
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index af49d862f..c6cf32bd7 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -58,9 +58,9 @@ ahash = { version = "0.7", default-features = false }
arrow = { version = "17.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
+bytes = "1.1"
chrono = { version = "0.4", default-features = false }
-datafusion-common = { path = "../common", version = "9.0.0", features = ["parquet"] }
-datafusion-data-access = { path = "../data-access", version = "9.0.0" }
+datafusion-common = { path = "../common", version = "9.0.0", features = ["parquet", "object_store"] }
datafusion-expr = { path = "../expr", version = "9.0.0" }
datafusion-jit = { path = "../jit", version = "9.0.0", optional = true }
datafusion-optimizer = { path = "../optimizer", version = "9.0.0" }
@@ -75,9 +75,10 @@ lazy_static = { version = "^1.4.0" }
log = "^0.4"
num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
+object_store = "0.3.0"
ordered-float = "3.0"
parking_lot = "0.12"
-parquet = { version = "17.0.0", features = ["arrow"] }
+parquet = { version = "17.0.0", features = ["arrow", "async"] }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs
index db25c1edc..7634328f3 100644
--- a/datafusion/core/src/catalog/schema.rs
+++ b/datafusion/core/src/catalog/schema.rs
@@ -132,7 +132,6 @@ mod tests {
use std::sync::Arc;
use arrow::datatypes::Schema;
- use datafusion_data_access::object_store::local::LocalFileSystem;
use crate::assert_batches_eq;
use crate::catalog::catalog::{CatalogProvider, MemoryCatalogProvider};
@@ -170,8 +169,6 @@ mod tests {
let schema = MemorySchemaProvider::new();
let ctx = SessionContext::new();
- let store = Arc::new(LocalFileSystem {});
- ctx.runtime_env().register_object_store("file", store);
let config = ListingTableConfig::new(table_path)
.infer(&ctx.state())
diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs
index 63781da52..dec368808 100644
--- a/datafusion/core/src/datasource/file_format/avro.rs
+++ b/datafusion/core/src/datasource/file_format/avro.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
-use datafusion_data_access::FileMeta;
+use object_store::{GetResult, ObjectMeta, ObjectStore};
use super::FileFormat;
use crate::avro_to_arrow::read_avro_schema_from_reader;
@@ -32,7 +32,6 @@ use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
-use datafusion_data_access::object_store::ObjectStore;
/// The default file extension of avro files
pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
@@ -49,12 +48,18 @@ impl FileFormat for AvroFormat {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
- files: &[FileMeta],
+ objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let mut schemas = vec![];
- for file in files {
- let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
- let schema = read_avro_schema_from_reader(&mut reader)?;
+ for object in objects {
+ let schema = match store.get(&object.location).await? {
+ GetResult::File(mut file, _) => read_avro_schema_from_reader(&mut file)?,
+ r @ GetResult::Stream(_) => {
+ // TODO: Fetching entire file to get schema is potentially wasteful
+ let data = r.bytes().await?;
+ read_avro_schema_from_reader(&mut data.as_ref())?
+ }
+ };
schemas.push(schema);
}
let merged_schema = Schema::try_merge(schemas)?;
@@ -65,7 +70,7 @@ impl FileFormat for AvroFormat {
&self,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
- _file: &FileMeta,
+ _object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs
index 0d665e9ef..d72a6e767 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -23,7 +23,9 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
-use datafusion_data_access::FileMeta;
+use datafusion_common::DataFusionError;
+use futures::TryFutureExt;
+use object_store::{ObjectMeta, ObjectStore};
use super::FileFormat;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
@@ -32,7 +34,6 @@ use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
-use datafusion_data_access::object_store::ObjectStore;
/// The default file extension of csv files
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
@@ -96,16 +97,21 @@ impl FileFormat for CsvFormat {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
- files: &[FileMeta],
+ objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let mut schemas = vec![];
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
- for file in files {
- let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
+ for object in objects {
+ let data = store
+ .get(&object.location)
+ .and_then(|r| r.bytes())
+ .await
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
- &mut reader,
+ &mut data.as_ref(),
self.delimiter,
Some(records_to_read),
self.has_header,
@@ -128,7 +134,7 @@ impl FileFormat for CsvFormat {
&self,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
- _file: &FileMeta,
+ _object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs
index e9b49c42d..9a889ab4c 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef;
use arrow::json::reader::infer_json_schema_from_iterator;
use arrow::json::reader::ValueIter;
use async_trait::async_trait;
-use datafusion_data_access::{object_store::ObjectStore, FileMeta};
+use object_store::{GetResult, ObjectMeta, ObjectStore};
use super::FileFormat;
use super::FileScanConfig;
@@ -71,21 +71,33 @@ impl FileFormat for JsonFormat {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
- files: &[FileMeta],
+ objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let mut schemas = Vec::new();
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
- for file in files {
- let reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
- let mut reader = BufReader::new(reader);
- let iter = ValueIter::new(&mut reader, None);
- let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
+ for object in objects {
+ let mut take_while = || {
let should_take = records_to_read > 0;
if should_take {
records_to_read -= 1;
}
should_take
- }))?;
+ };
+
+ let schema = match store.get(&object.location).await? {
+ GetResult::File(file, _) => {
+ let mut reader = BufReader::new(file);
+ let iter = ValueIter::new(&mut reader, None);
+ infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
+ }
+ r @ GetResult::Stream(_) => {
+ let data = r.bytes().await?;
+ let mut reader = BufReader::new(data.as_ref());
+ let iter = ValueIter::new(&mut reader, None);
+ infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
+ }
+ };
+
schemas.push(schema);
if records_to_read == 0 {
break;
@@ -100,7 +112,7 @@ impl FileFormat for JsonFormat {
&self,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
- _file: &FileMeta,
+ _object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
@@ -120,15 +132,12 @@ mod tests {
use super::super::test_util::scan_format;
use arrow::array::Int64Array;
use futures::StreamExt;
+ use object_store::local::LocalFileSystem;
use super::*;
+ use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
- use crate::{
- datafusion_data_access::object_store::local::{
- local_unpartitioned_file, LocalFileSystem,
- },
- physical_plan::collect,
- };
+ use crate::test::object_store::local_unpartitioned_file;
#[tokio::test]
async fn read_small_batches() -> Result<()> {
@@ -229,12 +238,12 @@ mod tests {
#[tokio::test]
async fn infer_schema_with_limit() {
- let store = Arc::new(LocalFileSystem {}) as _;
+ let store = Arc::new(LocalFileSystem::new()) as _;
let filename = "tests/jsons/schema_infer_limit.json";
let format = JsonFormat::default().with_schema_infer_max_rec(Some(3));
let file_schema = format
- .infer_schema(&store, &[local_unpartitioned_file(filename.to_string())])
+ .infer_schema(&store, &[local_unpartitioned_file(filename)])
.await
.expect("Schema inference");
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index a15750394..8a0d5b97e 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -36,8 +36,7 @@ use crate::physical_plan::file_format::FileScanConfig;
use crate::physical_plan::{ExecutionPlan, Statistics};
use async_trait::async_trait;
-use datafusion_data_access::object_store::ObjectStore;
-use datafusion_data_access::FileMeta;
+use object_store::{ObjectMeta, ObjectStore};
/// This trait abstracts all the file format specific implementations
/// from the `TableProvider`. This helps code re-utilization across
@@ -55,7 +54,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
- files: &[FileMeta],
+ objects: &[ObjectMeta],
) -> Result<SchemaRef>;
/// Infer the statistics for the provided object. The cost and accuracy of the
@@ -69,7 +68,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
&self,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
- file: &FileMeta,
+ object: &ObjectMeta,
) -> Result<Statistics>;
/// Take a list of files and convert it to the appropriate executor
@@ -86,9 +85,8 @@ pub(crate) mod test_util {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
- use datafusion_data_access::object_store::local::{
- local_unpartitioned_file, LocalFileSystem,
- };
+ use crate::test::object_store::local_unpartitioned_file;
+ use object_store::local::LocalFileSystem;
pub async fn scan_format(
format: &dyn FileFormat,
@@ -97,7 +95,7 @@ pub(crate) mod test_util {
projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
- let store = Arc::new(LocalFileSystem {}) as _;
+ let store = Arc::new(LocalFileSystem::new()) as _;
let meta = local_unpartitioned_file(format!("{}/{}", store_root, file_name));
let file_schema = format.infer_schema(&store, &[meta.clone()]).await?;
@@ -107,7 +105,7 @@ pub(crate) mod test_util {
.await?;
let file_groups = vec![vec![PartitionedFile {
- file_meta: meta,
+ object_meta: meta,
partition_values: vec![],
range: None,
}]];
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 8825fe7e0..19794f6ca 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -18,21 +18,17 @@
//! Parquet format abstractions
use std::any::Any;
-use std::io::Read;
use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
-use datafusion_data_access::FileMeta;
+use datafusion_common::DataFusionError;
use hashbrown::HashMap;
-use parquet::arrow::ArrowReader;
-use parquet::arrow::ParquetFileArrowReader;
-use parquet::errors::ParquetError;
-use parquet::errors::Result as ParquetResult;
-use parquet::file::reader::ChunkReader;
-use parquet::file::reader::Length;
-use parquet::file::serialized_reader::SerializedFileReader;
+use object_store::{ObjectMeta, ObjectStore};
+use parquet::arrow::parquet_to_arrow_schema;
+use parquet::file::footer::{decode_footer, decode_metadata};
+use parquet::file::metadata::ParquetMetaData;
use parquet::file::statistics::Statistics as ParquetStatistics;
use super::FileFormat;
@@ -42,15 +38,12 @@ use crate::arrow::array::{
};
use crate::arrow::datatypes::{DataType, Field};
use crate::datasource::{create_max_min_accs, get_col_stats};
-use crate::error::DataFusionError;
use crate::error::Result;
use crate::logical_plan::combine_filters;
use crate::logical_plan::Expr;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
-use crate::physical_plan::{metrics, ExecutionPlan};
-use crate::physical_plan::{Accumulator, Statistics};
-use datafusion_data_access::object_store::{ObjectReader, ObjectStore};
+use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics};
/// The default file extension of parquet files
pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
@@ -91,11 +84,11 @@ impl FileFormat for ParquetFormat {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
- files: &[FileMeta],
+ objects: &[ObjectMeta],
) -> Result<SchemaRef> {
- let mut schemas = Vec::with_capacity(files.len());
- for file in files {
- let schema = fetch_schema(store.as_ref(), file)?;
+ let mut schemas = Vec::with_capacity(objects.len());
+ for object in objects {
+ let schema = fetch_schema(store.as_ref(), object).await?;
schemas.push(schema)
}
let schema = Schema::try_merge(schemas)?;
@@ -106,9 +99,9 @@ impl FileFormat for ParquetFormat {
&self,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
- file: &FileMeta,
+ object: &ObjectMeta,
) -> Result<Statistics> {
- let stats = fetch_statistics(store.as_ref(), table_schema, file)?;
+ let stats = fetch_statistics(store.as_ref(), table_schema, object).await?;
Ok(stats)
}
@@ -294,37 +287,70 @@ fn summarize_min_max(
}
}
-/// Read and parse the schema of the Parquet file at location `path`
-fn fetch_schema(store: &dyn ObjectStore, file: &FileMeta) -> Result<Schema> {
- let object_reader = store.file_reader(file.sized_file.clone())?;
- let obj_reader = ChunkObjectReader {
- object_reader,
- bytes_scanned: None,
- };
- let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
- let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
- let schema = arrow_reader.get_schema()?;
+pub(crate) async fn fetch_parquet_metadata(
+ store: &dyn ObjectStore,
+ meta: &ObjectMeta,
+) -> Result<ParquetMetaData> {
+ if meta.size < 8 {
+ return Err(DataFusionError::Execution(format!(
+ "file size of {} is less than footer",
+ meta.size
+ )));
+ }
+ let footer_start = meta.size - 8;
+ let suffix = store
+ .get_range(&meta.location, footer_start..meta.size)
+ .await?;
+
+ let mut footer = [0; 8];
+ footer.copy_from_slice(suffix.as_ref());
+
+ let length = decode_footer(&footer)?;
+
+ if meta.size < length + 8 {
+ return Err(DataFusionError::Execution(format!(
+ "file size of {} is less than footer + metadata {}",
+ meta.size,
+ length + 8
+ )));
+ }
+
+ let metadata_start = meta.size - length - 8;
+ let metadata = store
+ .get_range(&meta.location, metadata_start..footer_start)
+ .await?;
+
+ Ok(decode_metadata(metadata.as_ref())?)
+}
+
+/// Read and parse the schema of the Parquet file at location `path`
+async fn fetch_schema(store: &dyn ObjectStore, file: &ObjectMeta) -> Result<Schema> {
+ let metadata = fetch_parquet_metadata(store, file).await?;
+ let file_metadata = metadata.file_metadata();
+ let schema = parquet_to_arrow_schema(
+ file_metadata.schema_descr(),
+ file_metadata.key_value_metadata(),
+ )?;
Ok(schema)
}
/// Read and parse the statistics of the Parquet file at location `path`
-fn fetch_statistics(
+async fn fetch_statistics(
store: &dyn ObjectStore,
table_schema: SchemaRef,
- file: &FileMeta,
+ file: &ObjectMeta,
) -> Result<Statistics> {
- let object_reader = store.file_reader(file.sized_file.clone())?;
- let obj_reader = ChunkObjectReader {
- object_reader,
- bytes_scanned: None,
- };
- let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
- let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
- let file_schema = arrow_reader.get_schema()?;
+ let metadata = fetch_parquet_metadata(store, file).await?;
+ let file_metadata = metadata.file_metadata();
+
+ let file_schema = parquet_to_arrow_schema(
+ file_metadata.schema_descr(),
+ file_metadata.key_value_metadata(),
+ )?;
+
let num_fields = table_schema.fields().len();
let fields = table_schema.fields().to_vec();
- let meta_data = arrow_reader.metadata();
let mut num_rows = 0;
let mut total_byte_size = 0;
@@ -335,7 +361,7 @@ fn fetch_statistics(
let (mut max_values, mut min_values) = create_max_min_accs(&table_schema);
- for row_group_meta in meta_data.row_groups() {
+ for row_group_meta in metadata.row_groups() {
num_rows += row_group_meta.num_rows();
total_byte_size += row_group_meta.total_byte_size();
@@ -395,46 +421,18 @@ fn fetch_statistics(
Ok(statistics)
}
-/// A wrapper around the object reader to make it implement `ChunkReader`
-pub struct ChunkObjectReader {
- /// The underlying object reader
- pub object_reader: Arc<dyn ObjectReader>,
- /// Optional counter which will track total number of bytes scanned
- pub bytes_scanned: Option<metrics::Count>,
-}
-
-impl Length for ChunkObjectReader {
- fn len(&self) -> u64 {
- self.object_reader.length()
- }
-}
-
-impl ChunkReader for ChunkObjectReader {
- type T = Box<dyn Read + Send + Sync>;
-
- fn get_read(&self, start: u64, length: usize) -> ParquetResult<Self::T> {
- if let Some(m) = self.bytes_scanned.as_ref() {
- m.add(length)
- }
- self.object_reader
- .sync_chunk_reader(start, length)
- .map_err(DataFusionError::IoError)
- .map_err(|e| ParquetError::ArrowError(e.to_string()))
- }
-}
-
#[cfg(test)]
pub(crate) mod test_util {
use super::*;
+ use crate::test::object_store::local_unpartitioned_file;
use arrow::record_batch::RecordBatch;
- use datafusion_data_access::object_store::local::local_unpartitioned_file;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use tempfile::NamedTempFile;
pub async fn store_parquet(
batches: Vec<RecordBatch>,
- ) -> Result<(Vec<FileMeta>, Vec<NamedTempFile>)> {
+ ) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
let files: Vec<_> = batches
.into_iter()
.map(|batch| {
@@ -451,11 +449,7 @@ pub(crate) mod test_util {
})
.collect();
- let meta: Vec<_> = files
- .iter()
- .map(|f| local_unpartitioned_file(f.path().to_string_lossy().to_string()))
- .collect();
-
+ let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
Ok((meta, files))
}
}
@@ -464,7 +458,6 @@ pub(crate) mod test_util {
mod tests {
use super::super::test_util::scan_format;
use crate::physical_plan::collect;
- use datafusion_data_access::object_store::local::LocalFileSystem;
use super::*;
@@ -478,6 +471,7 @@ mod tests {
use arrow::record_batch::RecordBatch;
use datafusion_common::ScalarValue;
use futures::StreamExt;
+ use object_store::local::LocalFileSystem;
#[tokio::test]
async fn read_merged_batches() -> Result<()> {
@@ -489,13 +483,13 @@ mod tests {
let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap();
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
- let store = Arc::new(LocalFileSystem {}) as _;
+ let store = Arc::new(LocalFileSystem::new()) as _;
let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
let format = ParquetFormat::default();
let schema = format.infer_schema(&store, &meta).await.unwrap();
- let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0])?;
+ let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0]).await?;
assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
@@ -503,7 +497,7 @@ mod tests {
assert_eq!(c1_stats.null_count, Some(1));
assert_eq!(c2_stats.null_count, Some(3));
- let stats = fetch_statistics(store.as_ref(), schema, &meta[1])?;
+ let stats = fetch_statistics(store.as_ref(), schema, &meta[1]).await?;
assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
@@ -560,8 +554,8 @@ mod tests {
let _ = collect(exec.clone(), task_ctx.clone()).await?;
let _ = collect(exec_projected.clone(), task_ctx).await?;
- assert_bytes_scanned(exec, 1409);
- assert_bytes_scanned(exec_projected, 811);
+ assert_bytes_scanned(exec, 671);
+ assert_bytes_scanned(exec_projected, 73);
Ok(())
}
@@ -580,7 +574,7 @@ mod tests {
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(11, batches[0].num_columns());
- assert_eq!(8, batches[0].num_rows());
+ assert_eq!(1, batches[0].num_rows());
Ok(())
}
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index 4b482d968..873d005b4 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -41,8 +41,10 @@ use crate::{
use super::PartitionedFile;
use crate::datasource::listing::ListingTableUrl;
-use datafusion_data_access::{object_store::ObjectStore, FileMeta, SizedFile};
+use datafusion_common::DataFusionError;
use datafusion_expr::Volatility;
+use object_store::path::Path;
+use object_store::{ObjectMeta, ObjectStore};
const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_";
const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_";
@@ -184,7 +186,7 @@ pub async fn pruned_partition_list<'a>(
Ok(Box::pin(list.try_filter_map(move |file_meta| async move {
let parsed_path = parse_partitions_for_path(
table_path,
- file_meta.path(),
+ &file_meta.location,
table_partition_cols,
)
.map(|p| {
@@ -195,7 +197,7 @@ pub async fn pruned_partition_list<'a>(
Ok(parsed_path.map(|partition_values| PartitionedFile {
partition_values,
- file_meta,
+ object_meta: file_meta,
range: None,
}))
})))
@@ -214,10 +216,9 @@ pub async fn pruned_partition_list<'a>(
df = df.filter(filter.clone())?;
}
let filtered_batches = df.collect().await?;
+ let paths = batches_to_paths(&filtered_batches)?;
- Ok(Box::pin(futures::stream::iter(
- batches_to_paths(&filtered_batches).into_iter().map(Ok),
- )))
+ Ok(Box::pin(futures::stream::iter(paths.into_iter().map(Ok))))
}
}
@@ -231,7 +232,7 @@ pub async fn pruned_partition_list<'a>(
fn paths_to_batch(
table_partition_cols: &[String],
table_path: &ListingTableUrl,
- metas: &[FileMeta],
+ metas: &[ObjectMeta],
) -> Result<RecordBatch> {
let mut key_builder = StringBuilder::new(metas.len());
let mut length_builder = UInt64Builder::new(metas.len());
@@ -241,20 +242,19 @@ fn paths_to_batch(
.map(|_| StringBuilder::new(metas.len()))
.collect::<Vec<_>>();
for file_meta in metas {
- if let Some(partition_values) =
- parse_partitions_for_path(table_path, file_meta.path(), table_partition_cols)
- {
- key_builder.append_value(file_meta.path())?;
- length_builder.append_value(file_meta.size())?;
- match file_meta.last_modified {
- Some(lm) => modified_builder.append_value(lm.timestamp_millis())?,
- None => modified_builder.append_null()?,
- }
+ if let Some(partition_values) = parse_partitions_for_path(
+ table_path,
+ &file_meta.location,
+ table_partition_cols,
+ ) {
+ key_builder.append_value(file_meta.location.as_ref())?;
+ length_builder.append_value(file_meta.size as u64)?;
+ modified_builder.append_value(file_meta.last_modified.timestamp_millis())?;
for (i, part_val) in partition_values.iter().enumerate() {
partition_builders[i].append_value(part_val)?;
}
} else {
- debug!("No partitioning for path {}", file_meta.path());
+ debug!("No partitioning for path {}", file_meta.location);
}
}
@@ -283,7 +283,7 @@ fn paths_to_batch(
}
/// convert a set of record batches created by `paths_to_batch()` back to partitioned files.
-fn batches_to_paths(batches: &[RecordBatch]) -> Vec<PartitionedFile> {
+fn batches_to_paths(batches: &[RecordBatch]) -> Result<Vec<PartitionedFile>> {
batches
.iter()
.flat_map(|batch| {
@@ -303,23 +303,21 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Vec<PartitionedFile> {
.downcast_ref::<Date64Array>()
.unwrap();
- (0..batch.num_rows()).map(move |row| PartitionedFile {
- file_meta: FileMeta {
- last_modified: match modified_array.is_null(row) {
- false => Some(Utc.timestamp_millis(modified_array.value(row))),
- true => None,
- },
- sized_file: SizedFile {
- path: key_array.value(row).to_owned(),
- size: length_array.value(row),
+ (0..batch.num_rows()).map(move |row| {
+ Ok(PartitionedFile {
+ object_meta: ObjectMeta {
+ location: Path::parse(key_array.value(row))
+ .map_err(|e| DataFusionError::External(Box::new(e)))?,
+ last_modified: Utc.timestamp_millis(modified_array.value(row)),
+ size: length_array.value(row) as usize,
},
- },
- partition_values: (3..batch.columns().len())
- .map(|col| {
- ScalarValue::try_from_array(batch.column(col), row).unwrap()
- })
- .collect(),
- range: None,
+ partition_values: (3..batch.columns().len())
+ .map(|col| {
+ ScalarValue::try_from_array(batch.column(col), row).unwrap()
+ })
+ .collect(),
+ range: None,
+ })
})
})
.collect()
@@ -329,7 +327,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Vec<PartitionedFile> {
/// associated to the partitions defined by `table_partition_cols`
fn parse_partitions_for_path<'a>(
table_path: &ListingTableUrl,
- file_path: &'a str,
+ file_path: &'a Path,
table_partition_cols: &[String],
) -> Option<Vec<&'a str>> {
let subpath = table_path.strip_prefix(file_path)?;
@@ -346,10 +344,8 @@ fn parse_partitions_for_path<'a>(
#[cfg(test)]
mod tests {
- use crate::{
- logical_plan::{case, col, lit},
- test::object_store::TestObjectStore,
- };
+ use crate::logical_plan::{case, col, lit};
+ use crate::test::object_store::make_test_store;
use futures::StreamExt;
use super::*;
@@ -396,7 +392,7 @@ mod tests {
#[tokio::test]
async fn test_pruned_partition_list_empty() {
- let store = TestObjectStore::new_arc(&[
+ let store = make_test_store(&[
("tablepath/mypartition=val1/notparquetfile", 100),
("tablepath/file.parquet", 100),
]);
@@ -418,7 +414,7 @@ mod tests {
#[tokio::test]
async fn test_pruned_partition_list() {
- let store = TestObjectStore::new_arc(&[
+ let store = make_test_store(&[
("tablepath/mypartition=val1/file.parquet", 100),
("tablepath/mypartition=val2/file.parquet", 100),
("tablepath/mypartition=val1/other=val3/file.parquet", 100),
@@ -440,7 +436,7 @@ mod tests {
assert_eq!(pruned.len(), 2);
let f1 = &pruned[0];
assert_eq!(
- f1.file_meta.path(),
+ f1.object_meta.location.as_ref(),
"tablepath/mypartition=val1/file.parquet"
);
assert_eq!(
@@ -449,7 +445,7 @@ mod tests {
);
let f2 = &pruned[1];
assert_eq!(
- f2.file_meta.path(),
+ f2.object_meta.location.as_ref(),
"tablepath/mypartition=val1/other=val3/file.parquet"
);
assert_eq!(
@@ -460,7 +456,7 @@ mod tests {
#[tokio::test]
async fn test_pruned_partition_list_multi() {
- let store = TestObjectStore::new_arc(&[
+ let store = make_test_store(&[
("tablepath/part1=p1v1/file.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
@@ -487,7 +483,7 @@ mod tests {
assert_eq!(pruned.len(), 2);
let f1 = &pruned[0];
assert_eq!(
- f1.file_meta.path(),
+ f1.object_meta.location.as_ref(),
"tablepath/part1=p1v2/part2=p2v1/file1.parquet"
);
assert_eq!(
@@ -499,7 +495,7 @@ mod tests {
);
let f2 = &pruned[1];
assert_eq!(
- f2.file_meta.path(),
+ f2.object_meta.location.as_ref(),
"tablepath/part1=p1v2/part2=p2v1/file2.parquet"
);
assert_eq!(
@@ -517,7 +513,7 @@ mod tests {
Some(vec![]),
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
- "bucket/mytable/file.csv",
+ &Path::from("bucket/mytable/file.csv"),
&[]
)
);
@@ -525,7 +521,7 @@ mod tests {
None,
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/othertable").unwrap(),
- "bucket/mytable/file.csv",
+ &Path::from("bucket/mytable/file.csv"),
&[]
)
);
@@ -533,7 +529,7 @@ mod tests {
None,
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
- "bucket/mytable/file.csv",
+ &Path::from("bucket/mytable/file.csv"),
&[String::from("mypartition")]
)
);
@@ -541,7 +537,7 @@ mod tests {
Some(vec!["v1"]),
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
- "bucket/mytable/mypartition=v1/file.csv",
+ &Path::from("bucket/mytable/mypartition=v1/file.csv"),
&[String::from("mypartition")]
)
);
@@ -549,7 +545,7 @@ mod tests {
Some(vec!["v1"]),
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/mytable/").unwrap(),
- "bucket/mytable/mypartition=v1/file.csv",
+ &Path::from("bucket/mytable/mypartition=v1/file.csv"),
&[String::from("mypartition")]
)
);
@@ -558,7 +554,7 @@ mod tests {
None,
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
- "bucket/mytable/v1/file.csv",
+ &Path::from("bucket/mytable/v1/file.csv"),
&[String::from("mypartition")]
)
);
@@ -566,7 +562,7 @@ mod tests {
Some(vec!["v1", "v2"]),
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
- "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv",
+ &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
&[String::from("mypartition"), String::from("otherpartition")]
)
);
@@ -574,49 +570,24 @@ mod tests {
Some(vec!["v1"]),
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
- "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv",
- &[String::from("mypartition")]
- )
- );
- }
-
- #[cfg(target_os = "windows")]
- #[test]
- fn test_parse_partitions_for_path_windows() {
- assert_eq!(
- Some(vec!["v1"]),
- parse_partitions_for_path(
- &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
- "bucket\\mytable\\mypartition=v1\\file.csv",
+ &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
&[String::from("mypartition")]
)
);
- assert_eq!(
- Some(vec!["v1", "v2"]),
- parse_partitions_for_path(
- &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
- "bucket\\mytable\\mypartition=v1\\otherpartition=v2\\file.csv",
- &[String::from("mypartition"), String::from("otherpartition")]
- )
- );
}
#[test]
fn test_path_batch_roundtrip_no_partiton() {
let files = vec![
- FileMeta {
- sized_file: SizedFile {
- path: String::from("mybucket/tablepath/part1=val1/file.parquet"),
- size: 100,
- },
- last_modified: Some(Utc.timestamp_millis(1634722979123)),
+ ObjectMeta {
+ location: Path::from("mybucket/tablepath/part1=val1/file.parquet"),
+ last_modified: Utc.timestamp_millis(1634722979123),
+ size: 100,
},
- FileMeta {
- sized_file: SizedFile {
- path: String::from("mybucket/tablepath/part1=val2/file.parquet"),
- size: 100,
- },
- last_modified: None,
+ ObjectMeta {
+ location: Path::from("mybucket/tablepath/part1=val2/file.parquet"),
+ last_modified: Utc.timestamp_millis(0),
+ size: 100,
},
];
@@ -624,14 +595,14 @@ mod tests {
let batches = paths_to_batch(&[], &table_path, &files)
.expect("Serialization of file list to batch failed");
- let parsed_files = batches_to_paths(&[batches]);
+ let parsed_files = batches_to_paths(&[batches]).unwrap();
assert_eq!(parsed_files.len(), 2);
assert_eq!(&parsed_files[0].partition_values, &[]);
assert_eq!(&parsed_files[1].partition_values, &[]);
let parsed_metas = parsed_files
.into_iter()
- .map(|pf| pf.file_meta)
+ .map(|pf| pf.object_meta)
.collect::<Vec<_>>();
assert_eq!(parsed_metas, files);
}
@@ -639,19 +610,15 @@ mod tests {
#[test]
fn test_path_batch_roundtrip_with_partition() {
let files = vec![
- FileMeta {
- sized_file: SizedFile {
- path: String::from("mybucket/tablepath/part1=val1/file.parquet"),
- size: 100,
- },
- last_modified: Some(Utc.timestamp_millis(1634722979123)),
+ ObjectMeta {
+ location: Path::from("mybucket/tablepath/part1=val1/file.parquet"),
+ last_modified: Utc.timestamp_millis(1634722979123),
+ size: 100,
},
- FileMeta {
- sized_file: SizedFile {
- path: String::from("mybucket/tablepath/part1=val2/file.parquet"),
- size: 100,
- },
- last_modified: None,
+ ObjectMeta {
+ location: Path::from("mybucket/tablepath/part1=val2/file.parquet"),
+ last_modified: Utc.timestamp_millis(0),
+ size: 100,
},
];
@@ -662,7 +629,7 @@ mod tests {
)
.expect("Serialization of file list to batch failed");
- let parsed_files = batches_to_paths(&[batches]);
+ let parsed_files = batches_to_paths(&[batches]).unwrap();
assert_eq!(parsed_files.len(), 2);
assert_eq!(
&parsed_files[0].partition_values,
@@ -675,7 +642,7 @@ mod tests {
let parsed_metas = parsed_files
.into_iter()
- .map(|pf| pf.file_meta)
+ .map(|pf| pf.object_meta)
.collect::<Vec<_>>();
assert_eq!(parsed_metas, files);
}
diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs
index c11de5f80..85d4b6f7d 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -22,9 +22,11 @@ mod helpers;
mod table;
mod url;
+use crate::error::Result;
+use chrono::TimeZone;
use datafusion_common::ScalarValue;
-use datafusion_data_access::{FileMeta, Result, SizedFile};
use futures::Stream;
+use object_store::{path::Path, ObjectMeta};
use std::pin::Pin;
pub use self::url::ListingTableUrl;
@@ -51,7 +53,7 @@ pub struct FileRange {
/// and partition column values that need to be appended to each row.
pub struct PartitionedFile {
/// Path for the file (e.g. URL, filesystem path, etc)
- pub file_meta: FileMeta,
+ pub object_meta: ObjectMeta,
/// Values of partition columns to be appended to each row
pub partition_values: Vec<ScalarValue>,
/// An optional file range for a more fine-grained parallel execution
@@ -62,9 +64,10 @@ impl PartitionedFile {
/// Create a simple file without metadata or partition
pub fn new(path: String, size: u64) -> Self {
Self {
- file_meta: FileMeta {
- sized_file: SizedFile { path, size },
- last_modified: None,
+ object_meta: ObjectMeta {
+ location: Path::from(path),
+ last_modified: chrono::Utc.timestamp_nanos(0),
+ size: size as usize,
},
partition_values: vec![],
range: None,
@@ -74,9 +77,10 @@ impl PartitionedFile {
/// Create a file range without metadata or partition
pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
Self {
- file_meta: FileMeta {
- sized_file: SizedFile { path, size },
- last_modified: None,
+ object_meta: ObjectMeta {
+ location: Path::from(path),
+ last_modified: chrono::Utc.timestamp_nanos(0),
+ size: size as usize,
},
partition_values: vec![],
range: Some(FileRange { start, end }),
@@ -84,16 +88,10 @@ impl PartitionedFile {
}
}
-impl std::fmt::Display for PartitionedFile {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "{}", self.file_meta)
- }
-}
-
-impl From<FileMeta> for PartitionedFile {
- fn from(file_meta: FileMeta) -> Self {
+impl From<ObjectMeta> for PartitionedFile {
+ fn from(object_meta: ObjectMeta) -> Self {
PartitionedFile {
- file_meta,
+ object_meta,
partition_values: vec![],
range: None,
}
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 1b0807085..abf5394fb 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -126,7 +126,7 @@ impl ListingTableConfig {
.await
.ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
- let file_type = file.path().rsplit('.').next().ok_or_else(|| {
+ let file_type = file.location.as_ref().rsplit('.').next().ok_or_else(|| {
DataFusionError::Internal("Unable to infer file suffix".into())
})?;
@@ -394,7 +394,7 @@ impl ListingTable {
let statistics = if self.options.collect_stat {
self.options
.format
- .infer_stats(&store, self.file_schema.clone(), &part_file.file_meta)
+ .infer_stats(&store, self.file_schema.clone(), &part_file.object_meta)
.await?
} else {
Statistics::default()
diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs
index 041a6ab7f..8676f2118 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -17,13 +17,12 @@
use crate::datasource::object_store::ObjectStoreUrl;
use datafusion_common::{DataFusionError, Result};
-use datafusion_data_access::object_store::ObjectStore;
-use datafusion_data_access::FileMeta;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use glob::Pattern;
use itertools::Itertools;
-use std::path::is_separator;
+use object_store::path::Path;
+use object_store::{ObjectMeta, ObjectStore};
use url::Url;
/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
@@ -32,6 +31,8 @@ use url::Url;
pub struct ListingTableUrl {
/// A URL that identifies a file or directory to list files from
url: Url,
+ /// The path prefix
+ prefix: Path,
/// An optional glob expression used to filter files
glob: Option<Pattern>,
}
@@ -79,7 +80,7 @@ impl ListingTableUrl {
}
match Url::parse(s) {
- Ok(url) => Ok(Self { url, glob: None }),
+ Ok(url) => Ok(Self::new(url, None)),
Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
Err(e) => Err(DataFusionError::External(Box::new(e))),
}
@@ -102,7 +103,13 @@ impl ListingTableUrl {
false => Url::from_directory_path(path).unwrap(),
};
- Ok(Self { url, glob })
+ Ok(Self::new(url, glob))
+ }
+
+ /// Creates a new [`ListingTableUrl`] from a url and optional glob expression
+ fn new(url: Url, glob: Option<Pattern>) -> Self {
+ let prefix = Path::parse(url.path()).expect("should be URL safe");
+ Self { url, prefix, glob }
}
/// Returns the URL scheme
@@ -110,46 +117,19 @@ impl ListingTableUrl {
self.url.scheme()
}
- /// Returns the path as expected by [`ObjectStore`]
- ///
- /// In particular for file scheme URLs, this is an absolute
- /// on the local filesystem in the OS-specific path representation
- ///
- /// For other URLs, this is a the host and path of the URL,
- /// delimited by `/`, and with no leading `/`
- ///
- /// TODO: Handle paths consistently (#2489)
- fn prefix(&self) -> &str {
- match self.scheme() {
- "file" => match cfg!(target_family = "windows") {
- true => self.url.path().strip_prefix('/').unwrap(),
- false => self.url.path(),
- },
- _ => &self.url[url::Position::BeforeHost..url::Position::AfterPath],
- }
- }
-
/// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
/// an iterator of the remaining path segments
- ///
- /// TODO: Handle paths consistently (#2489)
pub(crate) fn strip_prefix<'a, 'b: 'a>(
&'a self,
- path: &'b str,
+ path: &'b Path,
) -> Option<impl Iterator<Item = &'b str> + 'a> {
- let prefix = self.prefix();
- // Ignore empty path segments
- let diff = itertools::diff_with(
- path.split(is_separator).filter(|s| !s.is_empty()),
- prefix.split(is_separator).filter(|s| !s.is_empty()),
- |a, b| a == b,
- );
-
- match diff {
- // Match with remaining
- Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath),
- _ => None,
- }
+ use object_store::path::DELIMITER;
+ let path: &str = path.as_ref();
+ let stripped = match self.prefix.as_ref() {
+ "" => path,
+ p => path.strip_prefix(p)?.strip_prefix(DELIMITER)?,
+ };
+ Some(stripped.split(DELIMITER))
}
/// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
@@ -157,31 +137,34 @@ impl ListingTableUrl {
&'a self,
store: &'a dyn ObjectStore,
file_extension: &'a str,
- ) -> BoxStream<'a, Result<FileMeta>> {
- futures::stream::once(async move {
- let prefix = self.prefix();
- store.list_file(prefix.as_ref()).await
- })
- .try_flatten()
- .map_err(DataFusionError::IoError)
- .try_filter(move |meta| {
- let path = meta.path();
-
- let extension_match = path.ends_with(file_extension);
- let glob_match = match &self.glob {
- Some(glob) => match self.strip_prefix(path) {
- Some(mut segments) => {
- let stripped = segments.join("/");
- glob.matches(&stripped)
- }
- None => false,
- },
- None => true,
- };
-
- futures::future::ready(extension_match && glob_match)
- })
- .boxed()
+ ) -> BoxStream<'a, Result<ObjectMeta>> {
+ // If the prefix is a file, use a head request, otherwise list
+ let is_dir = self.url.as_str().ends_with('/');
+ let list = match is_dir {
+ true => futures::stream::once(store.list(Some(&self.prefix)))
+ .try_flatten()
+ .boxed(),
+ false => futures::stream::once(store.head(&self.prefix)).boxed(),
+ };
+
+ list.map_err(Into::into)
+ .try_filter(move |meta| {
+ let path = &meta.location;
+ let extension_match = path.as_ref().ends_with(file_extension);
+ let glob_match = match &self.glob {
+ Some(glob) => match self.strip_prefix(path) {
+ Some(mut segments) => {
+ let stripped = segments.join("/");
+ glob.matches(&stripped)
+ }
+ None => false,
+ },
+ None => true,
+ };
+
+ futures::future::ready(extension_match && glob_match)
+ })
+ .boxed()
}
/// Returns this [`ListingTableUrl`] as a string
@@ -250,23 +233,32 @@ mod tests {
let root = root.to_string_lossy();
let url = ListingTableUrl::parse(&root).unwrap();
- let child = format!("{}/partition/file", root);
+ let child = url.prefix.child("partition").child("file");
let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
assert_eq!(prefix, vec!["partition", "file"]);
+
+ let url = ListingTableUrl::parse("file:///").unwrap();
+ let child = Path::parse("/foo/bar").unwrap();
+ let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
+ assert_eq!(prefix, vec!["foo", "bar"]);
+
+ let url = ListingTableUrl::parse("file:///foo").unwrap();
+ let child = Path::parse("/foob/bar").unwrap();
+ assert!(url.strip_prefix(&child).is_none());
}
#[test]
fn test_prefix_s3() {
let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
- assert_eq!(url.prefix(), "bucket/foo/bar");
+ assert_eq!(url.prefix.as_ref(), "foo/bar");
- let path = "bucket/foo/bar/partition/foo.parquet";
- let prefix: Vec<_> = url.strip_prefix(path).unwrap().collect();
+ let path = Path::from("foo/bar/partition/foo.parquet");
+ let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
assert_eq!(prefix, vec!["partition", "foo.parquet"]);
- let path = "other-bucket/foo/bar/partition/foo.parquet";
- assert!(url.strip_prefix(path).is_none());
+ let path = Path::from("other/bar/partition/foo.parquet");
+ assert!(url.strip_prefix(&path).is_none());
}
#[test]
diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs
index ac7e1a847..aca5b0ca4 100644
--- a/datafusion/core/src/datasource/object_store.rs
+++ b/datafusion/core/src/datasource/object_store.rs
@@ -20,8 +20,8 @@
//! and query data inside these systems.
use datafusion_common::{DataFusionError, Result};
-use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME};
-use datafusion_data_access::object_store::ObjectStore;
+use object_store::local::LocalFileSystem;
+use object_store::ObjectStore;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
@@ -84,7 +84,7 @@ impl std::fmt::Display for ObjectStoreUrl {
/// Object store registry
pub struct ObjectStoreRegistry {
/// A map from scheme to object store that serve list / read operations for the store
- pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+ object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
}
impl std::fmt::Debug for ObjectStoreRegistry {
@@ -109,8 +109,7 @@ impl ObjectStoreRegistry {
/// ['LocalFileSystem'] store is registered in by default to support read local files natively.
pub fn new() -> Self {
let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
- map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
-
+ map.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
Self {
object_stores: RwLock::new(map),
}
@@ -120,34 +119,32 @@ impl ObjectStoreRegistry {
/// If a store of the same prefix existed before, it is replaced in the registry and returned.
pub fn register_store(
&self,
- scheme: String,
+ scheme: impl AsRef<str>,
+ host: impl AsRef<str>,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
let mut stores = self.object_stores.write();
- stores.insert(scheme, store)
- }
-
- /// Get the store registered for scheme
- pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
- let stores = self.object_stores.read();
- stores.get(scheme).cloned()
+ let s = format!("{}://{}", scheme.as_ref(), host.as_ref());
+ stores.insert(s, store)
}
/// Get a suitable store for the provided URL. For example:
///
- /// - URL with scheme `file://` or no schema will return the default LocalFS store
- /// - URL with scheme `s3://` will return the S3 store if it's registered
+ /// - URL with scheme `file:///` or no schema will return the default LocalFS store
+ /// - URL with scheme `s3://bucket/` will return the S3 store if it's registered
///
pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
let url = url.as_ref();
- let store = self.get(url.scheme()).ok_or_else(|| {
+ let s = &url[url::Position::BeforeScheme..url::Position::AfterHost];
+ let stores = self.object_stores.read();
+ let store = stores.get(s).ok_or_else(|| {
DataFusionError::Internal(format!(
"No suitable object store found for {}",
url
))
})?;
- Ok(store)
+ Ok(store.clone())
}
}
@@ -155,7 +152,6 @@ impl ObjectStoreRegistry {
mod tests {
use super::*;
use crate::datasource::listing::ListingTableUrl;
- use datafusion_data_access::object_store::local::LocalFileSystem;
use std::sync::Arc;
#[test]
@@ -197,7 +193,7 @@ mod tests {
#[test]
fn test_get_by_url_s3() {
let sut = ObjectStoreRegistry::default();
- sut.register_store("s3".to_string(), Arc::new(LocalFileSystem {}));
+ sut.register_store("s3", "bucket", Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
sut.get_by_url(&url).unwrap();
}
diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs
index 2f134990f..d810c882f 100644
--- a/datafusion/core/src/execution/runtime_env.rs
+++ b/datafusion/core/src/execution/runtime_env.rs
@@ -28,7 +28,7 @@ use crate::{
use crate::datasource::object_store::ObjectStoreRegistry;
use datafusion_common::DataFusionError;
-use datafusion_data_access::object_store::ObjectStore;
+use object_store::ObjectStore;
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::sync::Arc;
@@ -92,12 +92,12 @@ impl RuntimeEnv {
/// Returns the `ObjectStore` previously registered for this scheme, if any
pub fn register_object_store(
&self,
- scheme: impl Into<String>,
+ scheme: impl AsRef<str>,
+ host: impl AsRef<str>,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
- let scheme = scheme.into();
self.object_store_registry
- .register_store(scheme, object_store)
+ .register_store(scheme, host, object_store)
}
/// Retrieves a `ObjectStore` instance for a url
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index 33501a10f..c100f4224 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -228,7 +228,6 @@ pub use parquet;
// re-export DataFusion crates
pub use datafusion_common as common;
-pub use datafusion_data_access;
pub use datafusion_expr as logical_expr;
pub use datafusion_optimizer as optimizer;
pub use datafusion_physical_expr as physical_expr;
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index d03951990..56d78fb9e 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -16,23 +16,17 @@
// under the License.
//! Execution plan for reading line-delimited Avro files
-#[cfg(feature = "avro")]
-use crate::avro_to_arrow;
use crate::error::Result;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use arrow::datatypes::SchemaRef;
-#[cfg(feature = "avro")]
-use arrow::error::ArrowError;
use crate::execution::context::TaskContext;
use std::any::Any;
use std::sync::Arc;
-#[cfg(feature = "avro")]
-use super::file_stream::{BatchIter, FileStream};
use super::FileScanConfig;
/// Execution plan for scanning Avro data source
@@ -109,39 +103,17 @@ impl ExecutionPlan for AvroExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- let proj = self.base_config.projected_file_column_names();
-
- let batch_size = context.session_config().batch_size();
- let file_schema = Arc::clone(&self.base_config.file_schema);
-
- // The avro reader cannot limit the number of records, so `remaining` is ignored.
- let fun = move |file, _remaining: &Option<usize>| {
- let reader_res = avro_to_arrow::Reader::try_new(
- file,
- Arc::clone(&file_schema),
- batch_size,
- proj.clone(),
- );
- match reader_res {
- Ok(r) => Box::new(r) as BatchIter,
- Err(e) => Box::new(
- vec![Err(ArrowError::ExternalError(Box::new(e)))].into_iter(),
- ),
- }
- };
-
- let object_store = context
- .runtime_env()
- .object_store(&self.base_config.object_store_url)?;
-
- Ok(Box::pin(FileStream::new(
- object_store,
- self.base_config.file_groups[partition].clone(),
- fun,
- Arc::clone(&self.projected_schema),
- self.base_config.limit,
- self.base_config.table_partition_cols.clone(),
- )))
+ use super::file_stream::FileStream;
+
+ let config = Arc::new(avro::AvroConfig {
+ schema: Arc::clone(&self.base_config.file_schema),
+ batch_size: context.session_config().batch_size(),
+ projection: self.base_config.projected_file_column_names(),
+ });
+ let opener = avro::AvroOpener { config };
+
+ let stream = FileStream::new(&self.base_config, partition, context, opener)?;
+ Ok(Box::pin(stream))
}
fn fmt_as(
@@ -166,6 +138,64 @@ impl ExecutionPlan for AvroExec {
}
}
+#[cfg(feature = "avro")]
+mod avro {
+ use super::*;
+ use crate::datasource::listing::FileRange;
+ use crate::physical_plan::file_format::file_stream::{FormatReader, ReaderFuture};
+ use bytes::Buf;
+ use futures::StreamExt;
+ use object_store::{GetResult, ObjectMeta, ObjectStore};
+
+ pub struct AvroConfig {
+ pub schema: SchemaRef,
+ pub batch_size: usize,
+ pub projection: Option<Vec<String>>,
+ }
+
+ impl AvroConfig {
+ fn open<R: std::io::Read>(
+ &self,
+ reader: R,
+ ) -> Result<crate::avro_to_arrow::Reader<'static, R>> {
+ crate::avro_to_arrow::Reader::try_new(
+ reader,
+ self.schema.clone(),
+ self.batch_size,
+ self.projection.clone(),
+ )
+ }
+ }
+
+ pub struct AvroOpener {
+ pub config: Arc<AvroConfig>,
+ }
+
+ impl FormatReader for AvroOpener {
+ fn open(
+ &self,
+ store: Arc<dyn ObjectStore>,
+ file: ObjectMeta,
+ _range: Option<FileRange>,
+ ) -> ReaderFuture {
+ let config = self.config.clone();
+ Box::pin(async move {
+ match store.get(&file.location).await? {
+ GetResult::File(file, _) => {
+ let reader = config.open(file)?;
+ Ok(futures::stream::iter(reader).boxed())
+ }
+ r @ GetResult::Stream(_) => {
+ let bytes = r.bytes().await?;
+ let reader = config.open(bytes.reader())?;
+ Ok(futures::stream::iter(reader).boxed())
+ }
+ }
+ })
+ }
+ }
+}
+
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
@@ -174,11 +204,10 @@ mod tests {
use crate::datasource::object_store::ObjectStoreUrl;
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
+ use crate::test::object_store::local_unpartitioned_file;
use arrow::datatypes::{DataType, Field, Schema};
- use datafusion_data_access::object_store::local::{
- local_unpartitioned_file, LocalFileSystem,
- };
use futures::StreamExt;
+ use object_store::local::LocalFileSystem;
use super::*;
@@ -186,7 +215,7 @@ mod tests {
async fn avro_exec_without_partition() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
- let store = Arc::new(LocalFileSystem {}) as _;
+ let store = Arc::new(LocalFileSystem::new()) as _;
let meta = local_unpartitioned_file(filename);
let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?;
@@ -246,7 +275,7 @@ mod tests {
async fn avro_exec_missing_column() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
- let object_store = Arc::new(LocalFileSystem {}) as _;
+ let object_store = Arc::new(LocalFileSystem::new()) as _;
let object_store_url = ObjectStoreUrl::local_filesystem();
let meta = local_unpartitioned_file(filename);
let actual_schema = AvroFormat {}
@@ -315,7 +344,7 @@ mod tests {
async fn avro_exec_with_partition() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
- let object_store = Arc::new(LocalFileSystem {}) as _;
+ let object_store = Arc::new(LocalFileSystem::new()) as _;
let object_store_url = ObjectStoreUrl::local_filesystem();
let meta = local_unpartitioned_file(filename);
let file_schema = AvroFormat {}
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 0f8273ec1..186da2ad6 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -24,16 +24,21 @@ use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
+use crate::datasource::listing::FileRange;
+use crate::physical_plan::file_format::file_stream::{
+ FileStream, FormatReader, ReaderFuture,
+};
use arrow::csv;
use arrow::datatypes::SchemaRef;
+use bytes::Buf;
use futures::{StreamExt, TryStreamExt};
+use object_store::{GetResult, ObjectMeta, ObjectStore};
use std::any::Any;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use tokio::task::{self, JoinHandle};
-use super::file_stream::{BatchIter, FileStream};
use super::FileScanConfig;
/// Execution plan for scanning a CSV file
@@ -115,40 +120,17 @@ impl ExecutionPlan for CsvExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- let batch_size = context.session_config().batch_size();
- let file_schema = Arc::clone(&self.base_config.file_schema);
- let file_projection = self.base_config.file_column_projection_indices();
- let has_header = self.has_header;
- let delimiter = self.delimiter;
- let start_line = if has_header { 1 } else { 0 };
-
- let fun = move |file, remaining: &Option<usize>| {
- let bounds = remaining.map(|x| (0, x + start_line));
- let datetime_format = None;
- Box::new(csv::Reader::new(
- file,
- Arc::clone(&file_schema),
- has_header,
- Some(delimiter),
- batch_size,
- bounds,
- file_projection.clone(),
- datetime_format,
- )) as BatchIter
- };
-
- let object_store = context
- .runtime_env()
- .object_store(&self.base_config.object_store_url)?;
-
- Ok(Box::pin(FileStream::new(
- object_store,
- self.base_config.file_groups[partition].clone(),
- fun,
- Arc::clone(&self.projected_schema),
- self.base_config.limit,
- self.base_config.table_partition_cols.clone(),
- )))
+ let config = Arc::new(CsvConfig {
+ batch_size: context.session_config().batch_size(),
+ file_schema: Arc::clone(&self.base_config.file_schema),
+ file_projection: self.base_config.file_column_projection_indices(),
+ has_header: self.has_header,
+ delimiter: self.delimiter,
+ });
+
+ let opener = CsvOpener { config };
+ let stream = FileStream::new(&self.base_config, partition, context, opener)?;
+ Ok(Box::pin(stream) as SendableRecordBatchStream)
}
fn fmt_as(
@@ -175,6 +157,57 @@ impl ExecutionPlan for CsvExec {
}
}
+#[derive(Debug, Clone)]
+struct CsvConfig {
+ batch_size: usize,
+ file_schema: SchemaRef,
+ file_projection: Option<Vec<usize>>,
+ has_header: bool,
+ delimiter: u8,
+}
+
+impl CsvConfig {
+ fn open<R: std::io::Read>(&self, reader: R) -> csv::Reader<R> {
+ let datetime_format = None;
+ csv::Reader::new(
+ reader,
+ Arc::clone(&self.file_schema),
+ self.has_header,
+ Some(self.delimiter),
+ self.batch_size,
+ None,
+ self.file_projection.clone(),
+ datetime_format,
+ )
+ }
+}
+
+struct CsvOpener {
+ config: Arc<CsvConfig>,
+}
+
+impl FormatReader for CsvOpener {
+ fn open(
+ &self,
+ store: Arc<dyn ObjectStore>,
+ file: ObjectMeta,
+ _range: Option<FileRange>,
+ ) -> ReaderFuture {
+ let config = self.config.clone();
+ Box::pin(async move {
+ match store.get(&file.location).await? {
+ GetResult::File(file, _) => {
+ Ok(futures::stream::iter(config.open(file)).boxed())
+ }
+ r @ GetResult::Stream(_) => {
+ let bytes = r.bytes().await?;
+ Ok(futures::stream::iter(config.open(bytes.reader())).boxed())
+ }
+ }
+ })
+ }
+}
+
pub async fn plan_to_csv(
state: &SessionState,
plan: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 97a95b568..ca57028ab 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -21,51 +21,43 @@
//! Note: Most traits here need to be marked `Sync + Send` to be
//! compliant with the `SendableRecordBatchStream` trait.
-use crate::datasource::listing::PartitionedFile;
-use crate::{physical_plan::RecordBatchStream, scalar::ScalarValue};
-use arrow::{
- datatypes::SchemaRef,
- error::{ArrowError, Result as ArrowResult},
- record_batch::RecordBatch,
-};
-use datafusion_data_access::object_store::ObjectStore;
-use futures::Stream;
-use std::{
- io::Read,
- iter,
- pin::Pin,
- sync::Arc,
- task::{Context, Poll},
-};
-
-use super::PartitionColumnProjector;
-
-pub type FileIter = Box<dyn Iterator<Item = PartitionedFile> + Send + Sync>;
-pub type BatchIter = Box<dyn Iterator<Item = ArrowResult<RecordBatch>> + Send + Sync>;
-
-/// A closure that creates a file format reader (iterator over `RecordBatch`) from a `Read` object
-/// and an optional number of required records.
-pub trait FormatReaderOpener:
- FnMut(Box<dyn Read + Send + Sync>, &Option<usize>) -> BatchIter + Send + Unpin + 'static
-{
-}
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+use futures::{ready, FutureExt, Stream, StreamExt};
+use object_store::{ObjectMeta, ObjectStore};
+
+use datafusion_common::ScalarValue;
+
+use crate::datasource::listing::{FileRange, PartitionedFile};
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::file_format::{FileScanConfig, PartitionColumnProjector};
+use crate::physical_plan::RecordBatchStream;
+
+/// A fallible future that resolves to a stream of [`RecordBatch`]
+pub type ReaderFuture =
+ BoxFuture<'static, Result<BoxStream<'static, ArrowResult<RecordBatch>>>>;
-impl<T> FormatReaderOpener for T where
- T: FnMut(Box<dyn Read + Send + Sync>, &Option<usize>) -> BatchIter
- + Send
- + Unpin
- + 'static
-{
+pub trait FormatReader: Unpin {
+ fn open(
+ &self,
+ store: Arc<dyn ObjectStore>,
+ file: ObjectMeta,
+ range: Option<FileRange>,
+ ) -> ReaderFuture;
}
/// A stream that iterates record batch by record batch, file over file.
-pub struct FileStream<F: FormatReaderOpener> {
- /// An iterator over record batches of the last file returned by file_iter
- batch_iter: BatchIter,
- /// Partitioning column values for the current batch_iter
- partition_values: Vec<ScalarValue>,
+pub struct FileStream<F: FormatReader> {
/// An iterator over input files.
- file_iter: FileIter,
+ file_iter: VecDeque<PartitionedFile>,
/// The stream schema (file schema including partition columns and after
/// projection).
projected_schema: SchemaRef,
@@ -80,104 +72,155 @@ pub struct FileStream<F: FormatReaderOpener> {
pc_projector: PartitionColumnProjector,
/// the store from which to source the files.
object_store: Arc<dyn ObjectStore>,
+ /// The stream state
+ state: FileStreamState,
}
-impl<F: FormatReaderOpener> FileStream<F> {
+enum FileStreamState {
+ /// The idle state, no file is currently being read
+ Idle,
+ /// Currently performing asynchronous IO to obtain a stream of RecordBatch
+ /// for a given parquet file
+ Open {
+ /// A [`ReaderFuture`] returned by [`FormatReader::open`]
+ future: ReaderFuture,
+ /// The partition values for this file
+ partition_values: Vec<ScalarValue>,
+ },
+ /// Scanning the [`BoxStream`] returned by the completion of a [`ReaderFuture`]
+ /// returned by [`FormatReader::open`]
+ Scan {
+ /// Partitioning column values for the current batch_iter
+ partition_values: Vec<ScalarValue>,
+ /// The reader instance
+ reader: BoxStream<'static, ArrowResult<RecordBatch>>,
+ },
+ /// Encountered an error
+ Error,
+ /// Reached the row limit
+ Limit,
+}
+
+impl<F: FormatReader> FileStream<F> {
pub fn new(
- object_store: Arc<dyn ObjectStore>,
- files: Vec<PartitionedFile>,
+ config: &FileScanConfig,
+ partition: usize,
+ context: Arc<TaskContext>,
file_reader: F,
- projected_schema: SchemaRef,
- limit: Option<usize>,
- table_partition_cols: Vec<String>,
- ) -> Self {
+ ) -> Result<Self> {
+ let (projected_schema, _) = config.project();
let pc_projector = PartitionColumnProjector::new(
- Arc::clone(&projected_schema),
- &table_partition_cols,
+ projected_schema.clone(),
+ &config.table_partition_cols,
);
- Self {
- file_iter: Box::new(files.into_iter()),
- batch_iter: Box::new(iter::empty()),
- partition_values: vec![],
- remain: limit,
+ let files = config.file_groups[partition].clone();
+
+ let object_store = context
+ .runtime_env()
+ .object_store(&config.object_store_url)?;
+
+ Ok(Self {
+ file_iter: files.into(),
projected_schema,
+ remain: config.limit,
file_reader,
pc_projector,
object_store,
- }
+ state: FileStreamState::Idle,
+ })
}
- /// Acts as a flat_map of record batches over files. Adds the partitioning
- /// Columns to the returned record batches.
- fn next_batch(&mut self) -> Option<ArrowResult<RecordBatch>> {
- match self.batch_iter.next() {
- Some(Ok(batch)) => {
- Some(self.pc_projector.project(batch, &self.partition_values))
- }
- Some(Err(e)) => Some(Err(e)),
- None => match self.file_iter.next() {
- Some(f) => {
- self.partition_values = f.partition_values;
- self.object_store
- .file_reader(f.file_meta.sized_file)
- .and_then(|r| r.sync_reader())
- .map_err(|e| ArrowError::ExternalError(Box::new(e)))
- .and_then(|f| {
- self.batch_iter = (self.file_reader)(f, &self.remain);
- self.next_batch().transpose()
- })
- .transpose()
+ fn poll_inner(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<ArrowResult<RecordBatch>>> {
+ loop {
+ match &mut self.state {
+ FileStreamState::Idle => {
+ let file = match self.file_iter.pop_front() {
+ Some(file) => file,
+ None => return Poll::Ready(None),
+ };
+
+ let future = self.file_reader.open(
+ self.object_store.clone(),
+ file.object_meta,
+ file.range,
+ );
+
+ self.state = FileStreamState::Open {
+ future,
+ partition_values: file.partition_values,
+ }
}
- None => None,
- },
+ FileStreamState::Open {
+ future,
+ partition_values,
+ } => match ready!(future.poll_unpin(cx)) {
+ Ok(reader) => {
+ self.state = FileStreamState::Scan {
+ partition_values: std::mem::take(partition_values),
+ reader,
+ };
+ }
+ Err(e) => {
+ self.state = FileStreamState::Error;
+ return Poll::Ready(Some(Err(e.into())));
+ }
+ },
+ FileStreamState::Scan {
+ reader,
+ partition_values,
+ } => match ready!(reader.poll_next_unpin(cx)) {
+ Some(result) => {
+ let result = result
+ .and_then(|b| self.pc_projector.project(b, partition_values))
+ .map(|batch| match &mut self.remain {
+ Some(remain) => {
+ if *remain > batch.num_rows() {
+ *remain -= batch.num_rows();
+ batch
+ } else {
+ let batch = batch.slice(0, *remain);
+ self.state = FileStreamState::Limit;
+ *remain = 0;
+ batch
+ }
+ }
+ None => batch,
+ });
+
+ if result.is_err() {
+ self.state = FileStreamState::Error
+ }
+
+ return Poll::Ready(Some(result));
+ }
+ None => self.state = FileStreamState::Idle,
+ },
+ FileStreamState::Error | FileStreamState::Limit => {
+ return Poll::Ready(None)
+ }
+ }
}
}
}
-impl<F: FormatReaderOpener> Stream for FileStream<F> {
+impl<F: FormatReader> Stream for FileStream<F> {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
+ cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- // check if finished or no limit
- match self.remain {
- Some(r) if r == 0 => return Poll::Ready(None),
- None => return Poll::Ready(self.get_mut().next_batch()),
- Some(r) => r,
- };
-
- Poll::Ready(match self.as_mut().next_batch() {
- Some(Ok(item)) => {
- if let Some(remain) = self.remain.as_mut() {
- if *remain >= item.num_rows() {
- *remain -= item.num_rows();
- Some(Ok(item))
- } else {
- let len = *remain;
- *remain = 0;
- Some(Ok(RecordBatch::try_new(
- item.schema(),
- item.columns()
- .iter()
- .map(|column| column.slice(0, len))
- .collect(),
- )?))
- }
- } else {
- Some(Ok(item))
- }
- }
- other => other,
- })
+ self.poll_inner(cx)
}
}
-impl<F: FormatReaderOpener> RecordBatchStream for FileStream<F> {
+impl<F: FormatReader> RecordBatchStream for FileStream<F> {
fn schema(&self) -> SchemaRef {
- Arc::clone(&self.projected_schema)
+ self.projected_schema.clone()
}
}
@@ -186,33 +229,54 @@ mod tests {
use futures::StreamExt;
use super::*;
+ use crate::datasource::object_store::ObjectStoreUrl;
+ use crate::prelude::SessionContext;
use crate::{
error::Result,
- test::{make_partition, object_store::TestObjectStore},
+ test::{make_partition, object_store::register_test_store},
};
+ struct TestOpener {
+ records: Vec<RecordBatch>,
+ }
+
+ impl FormatReader for TestOpener {
+ fn open(
+ &self,
+ _store: Arc<dyn ObjectStore>,
+ _file: ObjectMeta,
+ _range: Option<FileRange>,
+ ) -> ReaderFuture {
+ let iterator = self.records.clone().into_iter().map(Ok);
+ let stream = futures::stream::iter(iterator).boxed();
+ futures::future::ready(Ok(stream)).boxed()
+ }
+ }
+
/// helper that creates a stream of 2 files with the same pair of batches in each ([0,1,2] and [0,1])
async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
let records = vec![make_partition(3), make_partition(2)];
+ let file_schema = records[0].schema();
- let source_schema = records[0].schema();
+ let reader = TestOpener { records };
- let reader = move |_file, _remain: &Option<usize>| {
- // this reader returns the same batch regardless of the file
- Box::new(records.clone().into_iter().map(Ok)) as BatchIter
- };
+ let ctx = SessionContext::new();
+ register_test_store(&ctx, &[("mock_file1", 10), ("mock_file2", 20)]);
- let file_stream = FileStream::new(
- TestObjectStore::new_arc(&[("mock_file1", 10), ("mock_file2", 20)]),
- vec![
+ let config = FileScanConfig {
+ object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+ file_schema,
+ file_groups: vec![vec![
PartitionedFile::new("mock_file1".to_owned(), 10),
PartitionedFile::new("mock_file2".to_owned(), 20),
- ],
- reader,
- source_schema,
+ ]],
+ statistics: Default::default(),
+ projection: None,
limit,
- vec![],
- );
+ table_partition_cols: vec![],
+ };
+
+ let file_stream = FileStream::new(&config, 0, ctx.task_ctx(), reader).unwrap();
file_stream
.map(|b| b.expect("No error expected in stream"))
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index a829218f4..385ac427c 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -18,22 +18,27 @@
//! Execution plan for reading line-delimited JSON files
use arrow::json::reader::DecoderOptions;
+use crate::datasource::listing::FileRange;
use crate::error::{DataFusionError, Result};
use crate::execution::context::SessionState;
use crate::execution::context::TaskContext;
use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::file_format::file_stream::{
+ FileStream, FormatReader, ReaderFuture,
+};
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use arrow::{datatypes::SchemaRef, json};
+use bytes::Buf;
use futures::{StreamExt, TryStreamExt};
+use object_store::{GetResult, ObjectMeta, ObjectStore};
use std::any::Any;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use tokio::task::{self, JoinHandle};
-use super::file_stream::{BatchIter, FileStream};
use super::FileScanConfig;
/// Execution plan for scanning NdJson data source
@@ -99,35 +104,21 @@ impl ExecutionPlan for NdJsonExec {
let batch_size = context.session_config().batch_size();
let file_schema = Arc::clone(&self.base_config.file_schema);
- // The json reader cannot limit the number of records, so `remaining` is ignored.
- let fun = move |file, _remaining: &Option<usize>| {
- // TODO: make DecoderOptions implement Clone so we can
- // clone here rather than recreating the options each time
- // https://github.com/apache/arrow-rs/issues/1580
- let options = DecoderOptions::new().with_batch_size(batch_size);
-
- let options = if let Some(proj) = proj.clone() {
- options.with_projection(proj)
- } else {
- options
- };
-
- Box::new(json::Reader::new(file, Arc::clone(&file_schema), options))
- as BatchIter
+ let options = DecoderOptions::new().with_batch_size(batch_size);
+ let options = if let Some(proj) = proj {
+ options.with_projection(proj)
+ } else {
+ options
+ };
+
+ let opener = JsonOpener {
+ file_schema,
+ options,
};
- let object_store = context
- .runtime_env()
- .object_store(&self.base_config.object_store_url)?;
-
- Ok(Box::pin(FileStream::new(
- object_store,
- self.base_config.file_groups[partition].clone(),
- fun,
- Arc::clone(&self.projected_schema),
- self.base_config.limit,
- self.base_config.table_partition_cols.clone(),
- )))
+ let stream = FileStream::new(&self.base_config, partition, context, opener)?;
+
+ Ok(Box::pin(stream) as SendableRecordBatchStream)
}
fn fmt_as(
@@ -152,6 +143,38 @@ impl ExecutionPlan for NdJsonExec {
}
}
+struct JsonOpener {
+ options: DecoderOptions,
+ file_schema: SchemaRef,
+}
+
+impl FormatReader for JsonOpener {
+ fn open(
+ &self,
+ store: Arc<dyn ObjectStore>,
+ file: ObjectMeta,
+ _range: Option<FileRange>,
+ ) -> ReaderFuture {
+ let options = self.options.clone();
+ let schema = self.file_schema.clone();
+ Box::pin(async move {
+ match store.get(&file.location).await? {
+ GetResult::File(file, _) => {
+ let reader = json::Reader::new(file, schema.clone(), options);
+ Ok(futures::stream::iter(reader).boxed())
+ }
+ r @ GetResult::Stream(_) => {
+ let bytes = r.bytes().await?;
+ let reader =
+ json::Reader::new(bytes.reader(), schema.clone(), options);
+
+ Ok(futures::stream::iter(reader).boxed())
+ }
+ }
+ })
+ }
+}
+
pub async fn plan_to_json(
state: &SessionState,
plan: Arc<dyn ExecutionPlan>,
@@ -201,7 +224,7 @@ mod tests {
use crate::datasource::object_store::ObjectStoreUrl;
use crate::prelude::NdJsonReadOptions;
use crate::prelude::*;
- use datafusion_data_access::object_store::local::local_unpartitioned_file;
+ use crate::test::object_store::local_unpartitioned_file;
use tempfile::TempDir;
use super::*;
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 86015c472..e59e8248f 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -163,7 +163,7 @@ impl<'a> Display for FileGroupsDisplay<'a> {
.iter()
.map(|pp| {
pp.iter()
- .map(|pf| pf.file_meta.path())
+ .map(|pf| pf.object_meta.location.as_ref())
.collect::<Vec<_>>()
.join(", ")
})
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index e719499c3..cae92ddbb 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -18,42 +18,40 @@
//! Execution plan for reading Parquet files
use fmt::Debug;
-use std::collections::VecDeque;
use std::fmt;
use std::fs;
-use std::path::Path;
-use std::pin::Pin;
+use std::ops::Range;
use std::sync::Arc;
-use std::task::{Context, Poll};
use std::{any::Any, convert::TryInto};
use arrow::{
array::ArrayRef,
datatypes::{Schema, SchemaRef},
- error::{ArrowError, Result as ArrowResult},
- record_batch::RecordBatch,
+ error::ArrowError,
};
-use futures::{Stream, StreamExt, TryStreamExt};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use log::debug;
-use parquet::arrow::{
- arrow_reader::ParquetRecordBatchReader, ArrowReader, ArrowWriter,
- ParquetFileArrowReader, ProjectionMask,
-};
-use parquet::file::reader::FileReader;
+use object_store::{ObjectMeta, ObjectStore};
+use parquet::arrow::async_reader::AsyncFileReader;
+use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
+use parquet::errors::ParquetError;
use parquet::file::{
- metadata::RowGroupMetaData, properties::WriterProperties,
- reader::SerializedFileReader, serialized_reader::ReadOptionsBuilder,
+ metadata::{ParquetMetaData, RowGroupMetaData},
+ properties::WriterProperties,
statistics::Statistics as ParquetStatistics,
};
use datafusion_common::Column;
-use datafusion_data_access::object_store::ObjectStore;
use datafusion_expr::Expr;
-use crate::physical_plan::metrics::BaselineMetrics;
-use crate::physical_plan::stream::RecordBatchReceiverStream;
+use crate::datasource::file_format::parquet::fetch_parquet_metadata;
+use crate::datasource::listing::FileRange;
+use crate::physical_plan::file_format::file_stream::{
+ FileStream, FormatReader, ReaderFuture,
+};
use crate::{
- datasource::{file_format::parquet::ChunkObjectReader, listing::PartitionedFile},
error::{DataFusionError, Result},
execution::context::{SessionState, TaskContext},
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
@@ -61,14 +59,12 @@ use crate::{
expressions::PhysicalSortExpr,
file_format::{FileScanConfig, SchemaAdapter},
metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
- DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
- SendableRecordBatchStream, Statistics,
+ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+ Statistics,
},
scalar::ScalarValue,
};
-use super::PartitionColumnProjector;
-
/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
pub struct ParquetExec {
@@ -94,7 +90,6 @@ struct ParquetFileMetrics {
impl ParquetExec {
/// Create a new Parquet reader execution plan provided file list and schema.
- /// Even if `limit` is set, ParquetExec rounds up the number of records to the next `batch_size`.
pub fn new(base_config: FileScanConfig, predicate: Option<Expr>) -> Self {
debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
base_config.file_groups, base_config.projection, predicate, base_config.limit);
@@ -210,52 +205,20 @@ impl ExecutionPlan for ParquetExec {
Some(proj) => proj,
None => (0..self.base_config.file_schema.fields().len()).collect(),
};
- let partition_col_proj = PartitionColumnProjector::new(
- Arc::clone(&self.projected_schema),
- &self.base_config.table_partition_cols,
- );
- let object_store = context
- .runtime_env()
- .object_store(&self.base_config.object_store_url)?;
-
- let stream = ParquetExecStream {
- error: false,
+ let opener = ParquetOpener {
partition_index,
- metrics: self.metrics.clone(),
- object_store,
- pruning_predicate: self.pruning_predicate.clone(),
+ projection: Arc::from(projection),
batch_size: context.session_config().batch_size(),
- schema: self.projected_schema.clone(),
- projection,
- remaining_rows: self.base_config.limit,
- reader: None,
- files: self.base_config.file_groups[partition_index].clone().into(),
- projector: partition_col_proj,
- adapter: SchemaAdapter::new(self.base_config.file_schema.clone()),
- baseline_metrics: BaselineMetrics::new(&self.metrics, partition_index),
+ pruning_predicate: self.pruning_predicate.clone(),
+ table_schema: self.base_config.file_schema.clone(),
+ metrics: self.metrics.clone(),
};
- // Use spawn_blocking only if running from a tokio context (#2201)
- match tokio::runtime::Handle::try_current() {
- Ok(handle) => {
- let (response_tx, response_rx) = tokio::sync::mpsc::channel(2);
- let schema = stream.schema();
- let join_handle = handle.spawn_blocking(move || {
- for result in stream {
- if response_tx.blocking_send(result).is_err() {
- break;
- }
- }
- });
- Ok(RecordBatchReceiverStream::create(
- &schema,
- response_rx,
- join_handle,
- ))
- }
- Err(_) => Ok(Box::pin(stream)),
- }
+ let stream =
+ FileStream::new(&self.base_config, partition_index, context, opener)?;
+
+ Ok(Box::pin(stream))
}
fn fmt_as(
@@ -296,164 +259,110 @@ impl ExecutionPlan for ParquetExec {
}
}
-/// Implements [`RecordBatchStream`] for a collection of [`PartitionedFile`]
-///
-/// NB: This will perform blocking IO synchronously without yielding which may
-/// be problematic in certain contexts (e.g. a tokio runtime that also performs
-/// network IO)
-struct ParquetExecStream {
- error: bool,
+/// Implements [`FormatReader`] for a parquet file
+struct ParquetOpener {
partition_index: usize,
- metrics: ExecutionPlanMetricsSet,
- object_store: Arc<dyn ObjectStore>,
- pruning_predicate: Option<PruningPredicate>,
+ projection: Arc<[usize]>,
batch_size: usize,
- schema: SchemaRef,
- projection: Vec<usize>,
- remaining_rows: Option<usize>,
- reader: Option<(ParquetRecordBatchReader, PartitionedFile)>,
- files: VecDeque<PartitionedFile>,
- projector: PartitionColumnProjector,
- adapter: SchemaAdapter,
- baseline_metrics: BaselineMetrics,
+ pruning_predicate: Option<PruningPredicate>,
+ table_schema: SchemaRef,
+ metrics: ExecutionPlanMetricsSet,
}
-impl ParquetExecStream {
- fn create_reader(
- &mut self,
- file: &PartitionedFile,
- ) -> Result<ParquetRecordBatchReader> {
- let file_metrics = ParquetFileMetrics::new(
+impl FormatReader for ParquetOpener {
+ fn open(
+ &self,
+ store: Arc<dyn ObjectStore>,
+ meta: ObjectMeta,
+ range: Option<FileRange>,
+ ) -> ReaderFuture {
+ let metrics = ParquetFileMetrics::new(
self.partition_index,
- file.file_meta.path(),
+ meta.location.as_ref(),
&self.metrics,
);
- let bytes_scanned = file_metrics.bytes_scanned.clone();
- let object_reader = self
- .object_store
- .file_reader(file.file_meta.sized_file.clone())?;
-
- let mut opt = ReadOptionsBuilder::new();
- if let Some(pruning_predicate) = &self.pruning_predicate {
- opt = opt.with_predicate(build_row_group_predicate(
- pruning_predicate,
- file_metrics,
- ));
- }
- if let Some(range) = &file.range {
- assert!(
- range.start >= 0 && range.end > 0 && range.end > range.start,
- "invalid range specified: {:?}",
- range
- );
- opt = opt.with_range(range.start, range.end);
- }
-
- let file_reader = SerializedFileReader::new_with_options(
- ChunkObjectReader {
- object_reader,
- bytes_scanned: Some(bytes_scanned),
- },
- opt.build(),
- )?;
-
- let file_metadata = file_reader.metadata().file_metadata();
- let parquet_schema = file_metadata.schema_descr_ptr();
-
- let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
- let arrow_schema = arrow_reader.get_schema()?;
-
- let adapted_projections = self
- .adapter
- .map_projections(&arrow_schema, &self.projection)?;
- let mask = ProjectionMask::roots(&parquet_schema, adapted_projections);
- let reader = arrow_reader.get_record_reader_by_columns(mask, self.batch_size)?;
-
- Ok(reader)
- }
-}
-
-impl Iterator for ParquetExecStream {
- type Item = ArrowResult<RecordBatch>;
+ let reader = ParquetFileReader {
+ store,
+ meta,
+ metrics: metrics.clone(),
+ };
- fn next(&mut self) -> Option<Self::Item> {
- let cloned_time = self.baseline_metrics.elapsed_compute().clone();
- // records time on drop
- let _timer = cloned_time.timer();
+ let schema_adapter = SchemaAdapter::new(self.table_schema.clone());
+ let batch_size = self.batch_size;
+ let projection = self.projection.clone();
+ let pruning_predicate = self.pruning_predicate.clone();
- if self.error || matches!(self.remaining_rows, Some(0)) {
- return None;
- }
+ Box::pin(async move {
+ let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
+ let adapted_projections =
+ schema_adapter.map_projections(builder.schema(), &projection)?;
- // TODO: Split this out into separate operators (#2079)
- loop {
- let (reader, file) = match self.reader.as_mut() {
- Some(current) => current,
- None => match self.files.pop_front() {
- None => return None,
- Some(file) => match self.create_reader(&file) {
- Ok(reader) => self.reader.insert((reader, file)),
- Err(e) => {
- self.error = true;
- return Some(Err(ArrowError::ExternalError(Box::new(e))));
- }
- },
- },
- };
+ let mask = ProjectionMask::roots(
+ builder.parquet_schema(),
+ adapted_projections.iter().cloned(),
+ );
- let result = reader.next().map(|result| {
- result
- .and_then(|batch| {
- self.adapter
- .adapt_batch(batch, &self.projection)
- .map_err(|e| ArrowError::ExternalError(Box::new(e)))
- })
- .and_then(|batch| {
- self.projector.project(batch, &file.partition_values)
+ let groups = builder.metadata().row_groups();
+ let row_groups = prune_row_groups(groups, range, pruning_predicate, &metrics);
+
+ let stream = builder
+ .with_projection(mask)
+ .with_batch_size(batch_size)
+ .with_row_groups(row_groups)
+ .build()?;
+
+ let adapted = stream
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))
+ .map(move |maybe_batch| {
+ maybe_batch.and_then(|b| {
+ schema_adapter
+ .adapt_batch(b, &projection)
+ .map_err(Into::into)
})
- });
-
- let result = match result {
- Some(result) => result,
- None => {
- self.reader = None;
- continue;
- }
- };
-
- match (&result, self.remaining_rows.as_mut()) {
- (Ok(batch), Some(remaining_rows)) => {
- *remaining_rows = remaining_rows.saturating_sub(batch.num_rows());
- }
- _ => self.error = result.is_err(),
- }
-
- //record output rows in parquetExec
- if let Ok(batch) = &result {
- self.baseline_metrics.record_output(batch.num_rows());
- }
+ });
- return Some(result);
- }
+ Ok(adapted.boxed())
+ })
}
}
-impl Stream for ParquetExecStream {
- type Item = ArrowResult<RecordBatch>;
+/// Implements [`AsyncFileReader`] for a parquet file in object storage
+struct ParquetFileReader {
+ store: Arc<dyn ObjectStore>,
+ meta: ObjectMeta,
+ metrics: ParquetFileMetrics,
+}
- fn poll_next(
- mut self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- let poll = Poll::Ready(Iterator::next(&mut *self));
- self.baseline_metrics.record_poll(poll)
+impl AsyncFileReader for ParquetFileReader {
+ fn get_bytes(
+ &mut self,
+ range: Range<usize>,
+ ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
+ self.metrics.bytes_scanned.add(range.end - range.start);
+
+ self.store
+ .get_range(&self.meta.location, range)
+ .map_err(|e| {
+ ParquetError::General(format!("AsyncChunkReader::get_bytes error: {}", e))
+ })
+ .boxed()
}
-}
-impl RecordBatchStream for ParquetExecStream {
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
+ fn get_metadata(
+ &mut self,
+ ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
+ Box::pin(async move {
+ let metadata = fetch_parquet_metadata(self.store.as_ref(), &self.meta)
+ .await
+ .map_err(|e| {
+ ParquetError::General(format!(
+ "AsyncChunkReader::get_metadata error: {}",
+ e
+ ))
+ })?;
+ Ok(Arc::new(metadata))
+ })
}
}
@@ -556,36 +465,47 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
}
}
-fn build_row_group_predicate(
- pruning_predicate: &PruningPredicate,
- metrics: ParquetFileMetrics,
-) -> Box<dyn FnMut(&RowGroupMetaData, usize) -> bool> {
- let pruning_predicate = pruning_predicate.clone();
- Box::new(
- move |row_group_metadata: &RowGroupMetaData, _i: usize| -> bool {
- let parquet_schema = pruning_predicate.schema().as_ref();
+fn prune_row_groups(
+ groups: &[RowGroupMetaData],
+ range: Option<FileRange>,
+ predicate: Option<PruningPredicate>,
+ metrics: &ParquetFileMetrics,
+) -> Vec<usize> {
+ // TODO: Columnar pruning
+ let mut filtered = Vec::with_capacity(groups.len());
+ for (idx, metadata) in groups.iter().enumerate() {
+ if let Some(range) = &range {
+ let offset = metadata.column(0).file_offset();
+ if offset < range.start || offset >= range.end {
+ continue;
+ }
+ }
+
+ if let Some(predicate) = &predicate {
let pruning_stats = RowGroupPruningStatistics {
- row_group_metadata,
- parquet_schema,
+ row_group_metadata: metadata,
+ parquet_schema: predicate.schema().as_ref(),
};
- let predicate_values = pruning_predicate.prune(&pruning_stats);
- match predicate_values {
+ match predicate.prune(&pruning_stats) {
Ok(values) => {
// NB: false means don't scan row group
- let num_pruned = values.iter().filter(|&v| !*v).count();
- metrics.row_groups_pruned.add(num_pruned);
- values[0]
+ if !values[0] {
+ metrics.row_groups_pruned.add(1);
+ continue;
+ }
}
// stats filter array could not be built
// return a closure which will not filter out any row groups
Err(e) => {
debug!("Error evaluating row group predicate values {}", e);
metrics.predicate_evaluation_errors.add(1);
- true
}
}
- },
- )
+ }
+
+ filtered.push(idx)
+ }
+ filtered
}
/// Executes a query and writes the results to a partitioned Parquet file.
@@ -597,7 +517,7 @@ pub async fn plan_to_parquet(
) -> Result<()> {
let path = path.as_ref();
// create directory to contain the Parquet files (one per partition)
- let fs_path = Path::new(path);
+ let fs_path = std::path::Path::new(path);
match fs::create_dir(fs_path) {
Ok(()) => {
let mut tasks = vec![];
@@ -635,9 +555,6 @@ pub async fn plan_to_parquet(
mod tests {
use crate::{
assert_batches_sorted_eq, assert_contains,
- datafusion_data_access::{
- object_store::local::LocalFileSystem, FileMeta, SizedFile,
- },
datasource::file_format::{parquet::ParquetFormat, FileFormat},
physical_plan::collect,
};
@@ -645,18 +562,23 @@ mod tests {
use super::*;
use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::datasource::file_format::test_util::scan_format;
- use crate::datasource::listing::FileRange;
+ use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::options::CsvReadOptions;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
+ use crate::test::object_store::local_unpartitioned_file;
use arrow::array::Float32Array;
+ use arrow::record_batch::RecordBatch;
use arrow::{
array::{Int64Array, Int8Array, StringArray},
datatypes::{DataType, Field},
};
- use datafusion_data_access::object_store::local::local_unpartitioned_file;
+ use chrono::{TimeZone, Utc};
use datafusion_expr::{col, lit};
use futures::StreamExt;
+ use object_store::local::LocalFileSystem;
+ use object_store::path::Path;
+ use object_store::ObjectMeta;
use parquet::{
basic::Type as PhysicalType,
file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
@@ -1057,9 +979,9 @@ mod tests {
#[tokio::test]
async fn parquet_exec_with_range() -> Result<()> {
- fn file_range(meta: &FileMeta, start: i64, end: i64) -> PartitionedFile {
+ fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile {
PartitionedFile {
- file_meta: meta.clone(),
+ object_meta: meta.clone(),
partition_values: vec![],
range: Some(FileRange { start, end }),
}
@@ -1102,7 +1024,7 @@ mod tests {
let meta = local_unpartitioned_file(filename);
- let store = Arc::new(LocalFileSystem {}) as _;
+ let store = Arc::new(LocalFileSystem::new()) as _;
let file_schema = ParquetFormat::default()
.infer_schema(&store, &[meta.clone()])
.await?;
@@ -1156,7 +1078,7 @@ mod tests {
.unwrap();
let partitioned_file = PartitionedFile {
- file_meta: meta,
+ object_meta: meta,
partition_values: vec![
ScalarValue::Utf8(Some("2021".to_owned())),
ScalarValue::Utf8(Some("10".to_owned())),
@@ -1212,13 +1134,15 @@ mod tests {
async fn parquet_exec_with_error() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
+ let location = Path::from_filesystem_path(".")
+ .unwrap()
+ .child("invalid.parquet");
+
let partitioned_file = PartitionedFile {
- file_meta: FileMeta {
- sized_file: SizedFile {
- size: 1337,
- path: "invalid".into(),
- },
- last_modified: None,
+ object_meta: ObjectMeta {
+ location,
+ last_modified: Utc.timestamp_nanos(0),
+ size: 1337,
},
partition_values: vec![],
range: None,
@@ -1240,10 +1164,7 @@ mod tests {
let mut results = parquet_exec.execute(0, task_ctx)?;
let batch = results.next().await.unwrap();
// invalid file should produce an error to that effect
- assert_contains!(
- batch.unwrap_err().to_string(),
- "External error: Parquet error: Arrow: IO error"
- );
+ assert_contains!(batch.unwrap_err().to_string(), "invalid.parquet not found");
assert!(results.next().await.is_none());
Ok(())
@@ -1255,12 +1176,13 @@ mod tests {
}
#[test]
- fn row_group_pruning_predicate_simple_expr() -> Result<()> {
+ fn row_group_pruning_predicate_simple_expr() {
use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
let expr = col("c1").gt(lit(15));
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
- let pruning_predicate = PruningPredicate::try_new(expr, Arc::new(schema))?;
+ let pruning_predicate =
+ PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]);
let rgm1 = get_row_group_meta_data(
@@ -1271,26 +1193,22 @@ mod tests {
&schema_descr,
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
);
- let row_group_metadata = vec![rgm1, rgm2];
- let mut row_group_predicate =
- build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
- let row_group_filter = row_group_metadata
- .iter()
- .enumerate()
- .map(|(i, g)| row_group_predicate(g, i))
- .collect::<Vec<_>>();
- assert_eq!(row_group_filter, vec![false, true]);
- Ok(())
+ let metrics = parquet_file_metrics();
+ assert_eq!(
+ prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+ vec![1]
+ );
}
#[test]
- fn row_group_pruning_predicate_missing_stats() -> Result<()> {
+ fn row_group_pruning_predicate_missing_stats() {
use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
let expr = col("c1").gt(lit(15));
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
- let pruning_predicate = PruningPredicate::try_new(expr, Arc::new(schema))?;
+ let pruning_predicate =
+ PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]);
let rgm1 = get_row_group_meta_data(
@@ -1301,23 +1219,17 @@ mod tests {
&schema_descr,
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
);
- let row_group_metadata = vec![rgm1, rgm2];
- let mut row_group_predicate =
- build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
- let row_group_filter = row_group_metadata
- .iter()
- .enumerate()
- .map(|(i, g)| row_group_predicate(g, i))
- .collect::<Vec<_>>();
+ let metrics = parquet_file_metrics();
// missing statistics for first row group mean that the result from the predicate expression
// is null / undefined so the first row group can't be filtered out
- assert_eq!(row_group_filter, vec![true, true]);
-
- Ok(())
+ assert_eq!(
+ prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+ vec![0, 1]
+ );
}
#[test]
- fn row_group_pruning_predicate_partial_expr() -> Result<()> {
+ fn row_group_pruning_predicate_partial_expr() {
use datafusion_expr::{col, lit};
// test row group predicate with partially supported expression
// int > 1 and int % 2 => c1_max > 1 and true
@@ -1326,7 +1238,7 @@ mod tests {
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]));
- let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?;
+ let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let schema_descr = get_test_schema_descr(vec![
("c1", PhysicalType::INT32),
@@ -1346,32 +1258,27 @@ mod tests {
ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
],
);
- let row_group_metadata = vec![rgm1, rgm2];
- let mut row_group_predicate =
- build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
- let row_group_filter = row_group_metadata
- .iter()
- .enumerate()
- .map(|(i, g)| row_group_predicate(g, i))
- .collect::<Vec<_>>();
+
+ let metrics = parquet_file_metrics();
+ let groups = &[rgm1, rgm2];
// the first row group is still filtered out because the predicate expression can be partially evaluated
// when conditions are joined using AND
- assert_eq!(row_group_filter, vec![false, true]);
+ assert_eq!(
+ prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
+ vec![1]
+ );
// if conditions in predicate are joined with OR and an unsupported expression is used
// this bypasses the entire predicate expression and no row groups are filtered out
let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
- let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
- let mut row_group_predicate =
- build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
- let row_group_filter = row_group_metadata
- .iter()
- .enumerate()
- .map(|(i, g)| row_group_predicate(g, i))
- .collect::<Vec<_>>();
- assert_eq!(row_group_filter, vec![true, true]);
+ let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
- Ok(())
+ // if conditions in predicate are joined with OR and an unsupported expression is used
+ // this bypasses the entire predicate expression and no row groups are filtered out
+ assert_eq!(
+ prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
+ vec![0, 1]
+ );
}
fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
@@ -1397,7 +1304,7 @@ mod tests {
}
#[test]
- fn row_group_pruning_predicate_null_expr() -> Result<()> {
+ fn row_group_pruning_predicate_null_expr() {
use datafusion_expr::{col, lit};
// int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
@@ -1405,24 +1312,19 @@ mod tests {
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
- let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
- let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate();
+ let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+ let groups = gen_row_group_meta_data_for_pruning_predicate();
- let mut row_group_predicate =
- build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
- let row_group_filter = row_group_metadata
- .iter()
- .enumerate()
- .map(|(i, g)| row_group_predicate(g, i))
- .collect::<Vec<_>>();
+ let metrics = parquet_file_metrics();
// First row group was filtered out because it contains no null value on "c2".
- assert_eq!(row_group_filter, vec![false, true]);
-
- Ok(())
+ assert_eq!(
+ prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
+ vec![1]
+ );
}
#[test]
- fn row_group_pruning_predicate_eq_null_expr() -> Result<()> {
+ fn row_group_pruning_predicate_eq_null_expr() {
use datafusion_expr::{col, lit};
// test row group predicate with an unknown (Null) expr
//
@@ -1434,22 +1336,16 @@ mod tests {
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
- let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
- let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate();
-
- let mut row_group_predicate =
- build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
- let row_group_filter = row_group_metadata
- .iter()
- .enumerate()
- .map(|(i, g)| row_group_predicate(g, i))
- .collect::<Vec<_>>();
+ let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+ let groups = gen_row_group_meta_data_for_pruning_predicate();
+ let metrics = parquet_file_metrics();
// bool = NULL always evaluates to NULL (and thus will not
// pass predicates. Ideally these should both be false
- assert_eq!(row_group_filter, vec![false, true]);
-
- Ok(())
+ assert_eq!(
+ prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
+ vec![1]
+ );
}
fn get_row_group_meta_data(
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index c66b8dc4a..691845785 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -288,7 +288,7 @@ pub fn with_new_children_if_necessary(
/// ```
/// use datafusion::prelude::*;
/// use datafusion::physical_plan::displayable;
-/// use std::path::is_separator;
+/// use object_store::path::Path;
///
/// #[tokio::main]
/// async fn main() {
@@ -312,8 +312,8 @@ pub fn with_new_children_if_necessary(
/// let plan_string = format!("{}", displayable_plan.indent());
///
/// let working_directory = std::env::current_dir().unwrap();
-/// let normalized = working_directory.to_string_lossy().replace(is_separator, "/");
-/// let plan_string = plan_string.replace(&normalized, "WORKING_DIR");
+/// let normalized = Path::from_filesystem_path(working_directory).unwrap();
+/// let plan_string = plan_string.replace(normalized.as_ref(), "WORKING_DIR");
///
/// assert_eq!("ProjectionExec: expr=[a@0 as a]\
/// \n CoalesceBatchesExec: target_batch_size=4096\
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 304c1b376..598d81879 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -24,12 +24,12 @@ use crate::error::Result;
use crate::from_slice::FromSlice;
use crate::logical_plan::LogicalPlan;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
+use crate::test::object_store::local_unpartitioned_file;
use crate::test_util::aggr_test_schema;
use array::{Array, ArrayRef};
use arrow::array::{self, DecimalBuilder, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
-use datafusion_data_access::object_store::local::local_unpartitioned_file;
use futures::{Future, FutureExt};
use std::fs::File;
use std::io::prelude::*;
@@ -109,9 +109,7 @@ pub fn partitioned_csv_config(
files
.into_iter()
- .map(
- |f| vec![local_unpartitioned_file(f.to_str().unwrap().to_owned()).into()],
- )
+ .map(|f| vec![local_unpartitioned_file(f).into()])
.collect::<Vec<_>>()
} else {
vec![vec![local_unpartitioned_file(path).into()]]
diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs
index fdd053346..e61653e88 100644
--- a/datafusion/core/src/test/object_store.rs
+++ b/datafusion/core/src/test/object_store.rs
@@ -14,113 +14,40 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//! Object store implem used for testing
-
-use std::{
- io,
- io::{Cursor, Read},
- sync::Arc,
-};
-
-use crate::datafusion_data_access::{
- object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore},
- FileMeta, Result, SizedFile,
-};
+//! Object store implementation used for testing
use crate::prelude::SessionContext;
-use async_trait::async_trait;
-use futures::{stream, AsyncRead, StreamExt};
+use futures::FutureExt;
+use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
+use std::sync::Arc;
/// Returns a test object store with the provided `ctx`
-pub(crate) fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
+pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
ctx.runtime_env()
- .register_object_store("test", TestObjectStore::new_arc(files));
-}
-
-#[derive(Debug)]
-/// An object store implem that is useful for testing.
-/// `ObjectReader`s are filled with zero bytes.
-pub struct TestObjectStore {
- /// The `(path,size)` of the files that "exist" in the store
- files: Vec<(String, u64)>,
-}
-
-impl TestObjectStore {
- pub fn new_arc(files: &[(&str, u64)]) -> Arc<dyn ObjectStore> {
- Arc::new(Self {
- files: files.iter().map(|f| (f.0.to_owned(), f.1)).collect(),
- })
- }
+ .register_object_store("test", "", make_test_store(files));
}
-#[async_trait]
-impl ObjectStore for TestObjectStore {
- async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
- let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_string();
- Ok(Box::pin(
- stream::iter(
- self.files
- .clone()
- .into_iter()
- .filter(move |f| f.0.starts_with(&prefix)),
- )
- .map(|f| {
- Ok(FileMeta {
- sized_file: SizedFile {
- path: f.0.clone(),
- size: f.1,
- },
- last_modified: None,
- })
- }),
- ))
- }
+/// Create a test object store with the provided files
+pub fn make_test_store(files: &[(&str, u64)]) -> Arc<dyn ObjectStore> {
+ let memory = InMemory::new();
- async fn list_dir(
- &self,
- _prefix: &str,
- _delimiter: Option<String>,
- ) -> Result<ListEntryStream> {
- unimplemented!()
+ for (name, size) in files {
+ memory
+ .put(&Path::from(*name), vec![0; *size as usize].into())
+ .now_or_never()
+ .unwrap()
+ .unwrap();
}
- fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
- match self.files.iter().find(|item| file.path == item.0) {
- Some((_, size)) if *size == file.size => {
- Ok(Arc::new(EmptyObjectReader(*size)))
- }
- Some(_) => Err(io::Error::new(
- io::ErrorKind::NotFound,
- "found in test list but wrong size",
- )),
- None => Err(io::Error::new(
- io::ErrorKind::NotFound,
- "not in provided test list",
- )),
- }
- }
+ Arc::new(memory)
}
-struct EmptyObjectReader(u64);
-
-#[async_trait]
-impl ObjectReader for EmptyObjectReader {
- async fn chunk_reader(
- &self,
- _start: u64,
- _length: usize,
- ) -> Result<Box<dyn AsyncRead>> {
- unimplemented!()
- }
-
- fn sync_chunk_reader(
- &self,
- _start: u64,
- _length: usize,
- ) -> Result<Box<dyn Read + Send + Sync>> {
- Ok(Box::new(Cursor::new(vec![0; self.0 as usize])))
- }
-
- fn length(&self) -> u64 {
- self.0
+/// Helper method to fetch the file size and date at given path and create a `ObjectMeta`
+pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta {
+ let location = Path::from_filesystem_path(path.as_ref()).unwrap();
+ let metadata = std::fs::metadata(path).expect("Local file metadata");
+ ObjectMeta {
+ location,
+ last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
+ size: metadata.len() as usize,
}
}
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index e3da9d986..a88445d78 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -17,19 +17,17 @@
//! Test queries on partitioned datasets
-use std::{fs, io, sync::Arc};
+use std::fs::File;
+use std::io::{Read, Seek, SeekFrom};
+use std::ops::Range;
+use std::sync::Arc;
use async_trait::async_trait;
+use bytes::Bytes;
+use chrono::{TimeZone, Utc};
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::{
assert_batches_sorted_eq,
- datafusion_data_access::{
- object_store::{
- local::LocalFileSystem, FileMetaStream, ListEntryStream, ObjectReader,
- ObjectStore,
- },
- FileMeta, SizedFile,
- },
datasource::{
file_format::{csv::CsvFormat, parquet::ParquetFormat},
listing::{ListingOptions, ListingTable, ListingTableConfig},
@@ -40,7 +38,9 @@ use datafusion::{
test_util::{self, arrow_test_data, parquet_test_data},
};
use datafusion_common::ScalarValue;
+use futures::stream::BoxStream;
use futures::{stream, StreamExt};
+use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore};
#[tokio::test]
async fn parquet_distinct_partition_col() -> Result<()> {
@@ -183,7 +183,7 @@ async fn csv_filter_with_file_col() -> Result<()> {
"mytable/date=2021-10-28/file.csv",
],
&["date"],
- "mirror:///mytable",
+ "mirror:///mytable/",
);
let result = ctx
@@ -219,7 +219,7 @@ async fn csv_projection_on_partition() -> Result<()> {
"mytable/date=2021-10-28/file.csv",
],
&["date"],
- "mirror:///mytable",
+ "mirror:///mytable/",
);
let result = ctx
@@ -256,7 +256,7 @@ async fn csv_grouping_by_partition() -> Result<()> {
"mytable/date=2021-10-28/file.csv",
],
&["date"],
- "mirror:///mytable",
+ "mirror:///mytable/",
);
let result = ctx
@@ -417,6 +417,7 @@ fn register_partitioned_aggregate_csv(
let file_schema = test_util::aggr_test_schema();
ctx.runtime_env().register_object_store(
"mirror",
+ "",
MirroringObjectStore::new_arc(csv_file_path, store_paths),
);
@@ -444,6 +445,7 @@ async fn register_partitioned_alltypes_parquet(
let parquet_file_path = format!("{}/{}", testdata, source_file);
ctx.runtime_env().register_object_store(
"mirror",
+ "",
MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths),
);
@@ -481,9 +483,15 @@ pub struct MirroringObjectStore {
file_size: u64,
}
+impl std::fmt::Display for MirroringObjectStore {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{:?}", self)
+ }
+}
+
impl MirroringObjectStore {
pub fn new_arc(mirrored_file: String, paths: &[&str]) -> Arc<dyn ObjectStore> {
- let metadata = fs::metadata(&mirrored_file).expect("Local file metadata");
+ let metadata = std::fs::metadata(&mirrored_file).expect("Local file metadata");
Arc::new(Self {
files: paths.iter().map(|&f| f.to_owned()).collect(),
mirrored_file,
@@ -494,12 +502,54 @@ impl MirroringObjectStore {
#[async_trait]
impl ObjectStore for MirroringObjectStore {
- async fn list_file(
+ async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> {
+ unimplemented!()
+ }
+
+ async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
+ self.files.iter().find(|x| *x == location.as_ref()).unwrap();
+ let path = std::path::PathBuf::from(&self.mirrored_file);
+ let file = File::open(&path).unwrap();
+ Ok(GetResult::File(file, path))
+ }
+
+ async fn get_range(
+ &self,
+ location: &Path,
+ range: Range<usize>,
+ ) -> object_store::Result<Bytes> {
+ self.files.iter().find(|x| *x == location.as_ref()).unwrap();
+ let path = std::path::PathBuf::from(&self.mirrored_file);
+ let mut file = File::open(&path).unwrap();
+ file.seek(SeekFrom::Start(range.start as u64)).unwrap();
+
+ let to_read = range.end - range.start;
+ let mut data = Vec::with_capacity(to_read);
+ let read = file.take(to_read as u64).read_to_end(&mut data).unwrap();
+ assert_eq!(read, to_read);
+
+ Ok(data.into())
+ }
+
+ async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
+ self.files.iter().find(|x| *x == location.as_ref()).unwrap();
+ Ok(ObjectMeta {
+ location: location.clone(),
+ last_modified: Utc.timestamp_nanos(0),
+ size: self.file_size as usize,
+ })
+ }
+
+ async fn delete(&self, _location: &Path) -> object_store::Result<()> {
+ unimplemented!()
+ }
+
+ async fn list(
&self,
- prefix: &str,
- ) -> datafusion_data_access::Result<FileMetaStream> {
- let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_string();
- let size = self.file_size;
+ prefix: Option<&Path>,
+ ) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> {
+ let prefix = prefix.map(|p| p.as_ref()).unwrap_or("").to_string();
+ let size = self.file_size as usize;
Ok(Box::pin(
stream::iter(
self.files
@@ -508,39 +558,31 @@ impl ObjectStore for MirroringObjectStore {
.filter(move |f| f.starts_with(&prefix)),
)
.map(move |f| {
- Ok(FileMeta {
- sized_file: SizedFile { path: f, size },
- last_modified: None,
+ Ok(ObjectMeta {
+ location: Path::parse(f)?,
+ last_modified: Utc.timestamp_nanos(0),
+ size,
})
}),
))
}
- async fn list_dir(
+ async fn list_with_delimiter(
&self,
- _prefix: &str,
- _delimiter: Option<String>,
- ) -> datafusion_data_access::Result<ListEntryStream> {
+ _prefix: Option<&Path>,
+ ) -> object_store::Result<ListResult> {
unimplemented!()
}
- fn file_reader(
+ async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
+ unimplemented!()
+ }
+
+ async fn copy_if_not_exists(
&self,
- file: SizedFile,
- ) -> datafusion_data_access::Result<Arc<dyn ObjectReader>> {
- assert_eq!(
- self.file_size, file.size,
- "Requested files should have the same size as the mirrored file"
- );
- match self.files.iter().find(|&item| &file.path == item) {
- Some(_) => Ok(LocalFileSystem {}.file_reader(SizedFile {
- path: self.mirrored_file.clone(),
- size: self.file_size,
- })?),
- None => Err(io::Error::new(
- io::ErrorKind::NotFound,
- "not in provided test list",
- )),
- }
+ _from: &Path,
+ _to: &Path,
+ ) -> object_store::Result<()> {
+ unimplemented!()
}
}
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 1de6af06a..2c840321f 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -22,12 +22,10 @@ use datafusion::error::Result;
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::SessionContext;
-use datafusion_data_access::object_store::local::{
- local_unpartitioned_file, LocalFileSystem,
-};
use datafusion_row::layout::RowType::{Compact, WordAligned};
use datafusion_row::reader::read_as_batch;
use datafusion_row::writer::write_batch_unchecked;
+use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
use std::sync::Arc;
#[tokio::test]
@@ -79,12 +77,15 @@ async fn get_exec(
) -> Result<Arc<dyn ExecutionPlan>> {
let testdata = datafusion::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, file_name);
- let meta = local_unpartitioned_file(filename);
+
+ let path = Path::from_filesystem_path(filename).unwrap();
let format = ParquetFormat::default();
- let object_store = Arc::new(LocalFileSystem {}) as _;
+ let object_store = Arc::new(LocalFileSystem::new()) as Arc<dyn ObjectStore>;
let object_store_url = ObjectStoreUrl::local_filesystem();
+ let meta = object_store.head(&path).await.unwrap();
+
let file_schema = format
.infer_schema(&object_store, &[meta.clone()])
.await
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 0e3e08873..a7f4cabe9 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -46,11 +46,11 @@ use datafusion::{
};
use datafusion::{execution::context::SessionContext, physical_plan::displayable};
use datafusion_expr::Volatility;
+use object_store::path::Path;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use tempfile::TempDir;
-use url::Url;
/// A macro to assert that some particular line contains two substrings
///
@@ -897,16 +897,9 @@ impl ExplainNormalizer {
// Push path as is
replacements.push((path.to_string_lossy().to_string(), key.to_string()));
- // Push canonical version of path
- let canonical = path.canonicalize().unwrap();
- replacements.push((canonical.to_string_lossy().to_string(), key.to_string()));
-
- if cfg!(target_family = "windows") {
- // Push URL representation of path, to handle windows
- let url = Url::from_file_path(canonical).unwrap();
- let path = url.path().strip_prefix('/').unwrap();
- replacements.push((path.to_string(), key.to_string()));
- }
+ // Push URL representation of path
+ let path = Path::from_filesystem_path(path).unwrap();
+ replacements.push((path.to_string(), key.to_string()));
};
push_path(test_util::arrow_test_data().into(), "ARROW_TEST_DATA");