You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/06/01 20:58:07 UTC
[arrow-datafusion] branch master updated: Remove ObjectStore from FileScanConfig and ListingTableConfig (#2668)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 585bc3a62 Remove ObjectStore from FileScanConfig and ListingTableConfig (#2668)
585bc3a62 is described below
commit 585bc3a629b92ea7a86ebfe8bf762dbef4155710
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Jun 1 21:58:02 2022 +0100
Remove ObjectStore from FileScanConfig and ListingTableConfig (#2668)
* Remove ObjectStore from FileScanConfig and ListingTableConfig
* Update ballista pin
---
benchmarks/src/bin/tpch.rs | 3 +-
datafusion-examples/examples/flight_server.rs | 4 +-
datafusion/core/benches/sort_limit_query_sql.rs | 3 +-
datafusion/core/src/catalog/schema.rs | 7 +-
datafusion/core/src/datasource/file_format/mod.rs | 1 -
datafusion/core/src/datasource/listing/table.rs | 141 ++++++++++-----------
datafusion/core/src/execution/context.rs | 29 ++---
datafusion/core/src/execution/runtime_env.rs | 2 +-
.../core/src/physical_optimizer/repartition.rs | 2 -
.../core/src/physical_plan/file_format/avro.rs | 9 +-
.../core/src/physical_plan/file_format/csv.rs | 6 +-
.../core/src/physical_plan/file_format/json.rs | 37 +++---
.../core/src/physical_plan/file_format/mod.rs | 6 +-
.../core/src/physical_plan/file_format/parquet.rs | 20 ++-
datafusion/core/src/test/mod.rs | 5 +-
datafusion/core/src/test/object_store.rs | 7 +
datafusion/core/tests/path_partition.rs | 33 +++--
datafusion/core/tests/row.rs | 1 -
datafusion/proto/src/logical_plan.rs | 10 +-
dev/build-arrow-ballista.sh | 2 +-
20 files changed, 152 insertions(+), 176 deletions(-)
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 4e49bff09..5d4895698 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -44,7 +44,6 @@ use datafusion::{
};
use datafusion::{
arrow::util::pretty,
- datafusion_data_access::object_store::local::LocalFileSystem,
datasource::listing::{ListingOptions, ListingTable, ListingTableConfig},
};
@@ -427,7 +426,7 @@ fn get_table(
};
let table_path = ListingTableUrl::parse(path)?;
- let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path)
+ let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(schema);
diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs
index a3d7c0f56..5dbd694ac 100644
--- a/datafusion-examples/examples/flight_server.rs
+++ b/datafusion-examples/examples/flight_server.rs
@@ -19,7 +19,6 @@ use std::pin::Pin;
use std::sync::Arc;
use arrow_flight::SchemaAsIpc;
-use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
use futures::Stream;
@@ -71,8 +70,9 @@ impl FlightService for FlightServiceImpl {
let table_path =
ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?;
+ let ctx = SessionContext::new();
let schema = listing_options
- .infer_schema(Arc::new(LocalFileSystem {}), &table_path)
+ .infer_schema(&ctx.state(), &table_path)
.await
.unwrap();
diff --git a/datafusion/core/benches/sort_limit_query_sql.rs b/datafusion/core/benches/sort_limit_query_sql.rs
index 198eb941f..e7aa33bd7 100644
--- a/datafusion/core/benches/sort_limit_query_sql.rs
+++ b/datafusion/core/benches/sort_limit_query_sql.rs
@@ -18,7 +18,6 @@
#[macro_use]
extern crate criterion;
use criterion::Criterion;
-use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
@@ -71,7 +70,7 @@ fn create_context() -> Arc<Mutex<SessionContext>> {
// create CSV data source
let listing_options = ListingOptions::new(Arc::new(CsvFormat::default()));
- let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path)
+ let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(schema);
diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs
index 6771f3e8d..db25c1edc 100644
--- a/datafusion/core/src/catalog/schema.rs
+++ b/datafusion/core/src/catalog/schema.rs
@@ -171,11 +171,10 @@ mod tests {
let ctx = SessionContext::new();
let store = Arc::new(LocalFileSystem {});
- ctx.runtime_env()
- .register_object_store("file", store.clone());
+ ctx.runtime_env().register_object_store("file", store);
- let config = ListingTableConfig::new(store, table_path)
- .infer()
+ let config = ListingTableConfig::new(table_path)
+ .infer(&ctx.state())
.await
.unwrap();
let table = ListingTable::try_new(config).unwrap();
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index eae86fa9c..a15750394 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -115,7 +115,6 @@ pub(crate) mod test_util {
let exec = format
.create_physical_plan(
FileScanConfig {
- object_store: store,
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema,
file_groups,
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index bc7ed2604..29cde3c9f 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -45,14 +45,11 @@ use crate::{
};
use super::PartitionedFile;
-use datafusion_data_access::object_store::ObjectStore;
use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
/// Configuration for creating a 'ListingTable'
pub struct ListingTableConfig {
- /// `ObjectStore` that contains the files for the `ListingTable`.
- pub object_store: Arc<dyn ObjectStore>,
/// Path on the `ObjectStore` for creating `ListingTable`.
pub table_path: ListingTableUrl,
/// Optional `SchemaRef` for the to be created `ListingTable`.
@@ -63,9 +60,8 @@ pub struct ListingTableConfig {
impl ListingTableConfig {
/// Creates new `ListingTableConfig`. The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_path`.
- pub fn new(object_store: Arc<dyn ObjectStore>, table_path: ListingTableUrl) -> Self {
+ pub fn new(table_path: ListingTableUrl) -> Self {
Self {
- object_store,
table_path,
file_schema: None,
options: None,
@@ -74,7 +70,6 @@ impl ListingTableConfig {
/// Add `schema` to `ListingTableConfig`
pub fn with_schema(self, schema: SchemaRef) -> Self {
Self {
- object_store: self.object_store,
table_path: self.table_path,
file_schema: Some(schema),
options: self.options,
@@ -84,7 +79,6 @@ impl ListingTableConfig {
/// Add `listing_options` to `ListingTableConfig`
pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
Self {
- object_store: self.object_store,
table_path: self.table_path,
file_schema: self.file_schema,
options: Some(listing_options),
@@ -105,10 +99,12 @@ impl ListingTableConfig {
}
/// Infer `ListingOptions` based on `table_path` suffix.
- pub async fn infer_options(self) -> Result<Self> {
+ pub async fn infer_options(self, ctx: &SessionState) -> Result<Self> {
+ let store = ctx.runtime_env.object_store(&self.table_path)?;
+
let file = self
.table_path
- .list_all_files(self.object_store.as_ref(), "")
+ .list_all_files(store.as_ref(), "")
.next()
.await
.ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
@@ -123,12 +119,11 @@ impl ListingTableConfig {
format,
collect_stat: true,
file_extension: file_type.to_string(),
- target_partitions: num_cpus::get(),
+ target_partitions: ctx.config.target_partitions,
table_partition_cols: vec![],
};
Ok(Self {
- object_store: self.object_store,
table_path: self.table_path,
file_schema: self.file_schema,
options: Some(listing_options),
@@ -136,15 +131,12 @@ impl ListingTableConfig {
}
/// Infer `SchemaRef` based on `table_path` suffix. Requires `self.options` to be set prior to using.
- pub async fn infer_schema(self) -> Result<Self> {
+ pub async fn infer_schema(self, ctx: &SessionState) -> Result<Self> {
match self.options {
Some(options) => {
- let schema = options
- .infer_schema(self.object_store.clone(), &self.table_path)
- .await?;
+ let schema = options.infer_schema(ctx, &self.table_path).await?;
Ok(Self {
- object_store: self.object_store,
table_path: self.table_path,
file_schema: Some(schema),
options: Some(options),
@@ -157,8 +149,8 @@ impl ListingTableConfig {
}
/// Convenience wrapper for calling `infer_options` and `infer_schema`
- pub async fn infer(self) -> Result<Self> {
- self.infer_options().await?.infer_schema().await
+ pub async fn infer(self, ctx: &SessionState) -> Result<Self> {
+ self.infer_options(ctx).await?.infer_schema(ctx).await
}
}
@@ -212,11 +204,16 @@ impl ListingOptions {
/// locally or ask a remote service to do it (e.g a scheduler).
pub async fn infer_schema<'a>(
&'a self,
- store: Arc<dyn ObjectStore>,
+ ctx: &SessionState,
table_path: &'a ListingTableUrl,
) -> Result<SchemaRef> {
- let list_stream = table_path.list_all_files(store.as_ref(), &self.file_extension);
- let files: Vec<_> = list_stream.try_collect().await?;
+ let store = ctx.runtime_env.object_store(table_path)?;
+
+ let files: Vec<_> = table_path
+ .list_all_files(store.as_ref(), &self.file_extension)
+ .try_collect()
+ .await?;
+
self.format.infer_schema(&store, &files).await
}
}
@@ -224,7 +221,6 @@ impl ListingOptions {
/// An implementation of `TableProvider` that uses the object store
/// or file system listing capability to get the list of files.
pub struct ListingTable {
- object_store: Arc<dyn ObjectStore>,
table_path: ListingTableUrl,
/// File fields only
file_schema: SchemaRef,
@@ -261,8 +257,7 @@ impl ListingTable {
}
let table = Self {
- object_store: config.object_store.clone(),
- table_path: config.table_path.clone(),
+ table_path: config.table_path,
file_schema,
table_schema: Arc::new(Schema::new(table_fields)),
options,
@@ -271,11 +266,6 @@ impl ListingTable {
Ok(table)
}
- /// Get object store ref
- pub fn object_store(&self) -> &Arc<dyn ObjectStore> {
- &self.object_store
- }
-
/// Get path ref
pub fn table_path(&self) -> &ListingTableUrl {
&self.table_path
@@ -303,13 +293,13 @@ impl TableProvider for ListingTable {
async fn scan(
&self,
- _ctx: &SessionState,
+ ctx: &SessionState,
projection: &Option<Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let (partitioned_file_lists, statistics) =
- self.list_files_for_scan(filters, limit).await?;
+ self.list_files_for_scan(ctx, filters, limit).await?;
// if no files need to be read, return an `EmptyExec`
if partitioned_file_lists.is_empty() {
@@ -323,7 +313,6 @@ impl TableProvider for ListingTable {
.format
.create_physical_plan(
FileScanConfig {
- object_store: Arc::clone(&self.object_store),
object_store_url: self.table_path.object_store(),
file_schema: Arc::clone(&self.file_schema),
file_groups: partitioned_file_lists,
@@ -358,12 +347,14 @@ impl ListingTable {
/// be distributed to different threads / executors.
async fn list_files_for_scan<'a>(
&'a self,
+ ctx: &'a SessionState,
filters: &'a [Expr],
limit: Option<usize>,
) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
+ let store = ctx.runtime_env.object_store(&self.table_path)?;
// list files (with partitions)
let file_list = pruned_partition_list(
- self.object_store.as_ref(),
+ store.as_ref(),
&self.table_path,
filters,
&self.options.file_extension,
@@ -373,25 +364,17 @@ impl ListingTable {
// collect the statistics if required by the config
// TODO: Collect statistics and schema in single-pass
- let object_store = Arc::clone(&self.object_store);
- let files = file_list.then(move |part_file| {
- let object_store = object_store.clone();
- async move {
- let part_file = part_file?;
- let statistics = if self.options.collect_stat {
- self.options
- .format
- .infer_stats(
- &object_store,
- self.file_schema.clone(),
- &part_file.file_meta,
- )
- .await?
- } else {
- Statistics::default()
- };
- Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
- }
+ let files = file_list.then(|part_file| async {
+ let part_file = part_file?;
+ let statistics = if self.options.collect_stat {
+ self.options
+ .format
+ .infer_stats(&store, self.file_schema.clone(), &part_file.file_meta)
+ .await?
+ } else {
+ Statistics::default()
+ };
+ Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
});
let (files, statistics) =
@@ -409,10 +392,9 @@ mod tests {
use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
use crate::prelude::SessionContext;
use crate::{
- datafusion_data_access::object_store::local::LocalFileSystem,
datasource::file_format::{avro::AvroFormat, parquet::ParquetFormat},
logical_plan::{col, lit},
- test::{columns, object_store::TestObjectStore},
+ test::{columns, object_store::register_test_store},
};
use arrow::datatypes::DataType;
@@ -422,7 +404,7 @@ mod tests {
async fn read_single_file() -> Result<()> {
let ctx = SessionContext::new();
- let table = load_table("alltypes_plain.parquet").await?;
+ let table = load_table(&ctx, "alltypes_plain.parquet").await?;
let projection = None;
let exec = table
.scan(&ctx.state(), &projection, &[], None)
@@ -444,17 +426,18 @@ mod tests {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let table_path = ListingTableUrl::parse(filename).unwrap();
+
+ let ctx = SessionContext::new();
+ let state = ctx.state();
+
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
- let schema = opt
- .infer_schema(Arc::new(LocalFileSystem {}), &table_path)
- .await?;
- let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path)
+ let schema = opt.infer_schema(&state, &table_path).await?;
+ let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(schema);
let table = ListingTable::try_new(config)?;
- let ctx = SessionContext::new();
- let exec = table.scan(&ctx.state(), &None, &[], None).await?;
+ let exec = table.scan(&state, &None, &[], None).await?;
assert_eq!(exec.statistics().num_rows, Some(8));
assert_eq!(exec.statistics().total_byte_size, Some(671));
@@ -463,8 +446,9 @@ mod tests {
#[tokio::test]
async fn read_empty_table() -> Result<()> {
+ let ctx = SessionContext::new();
let path = String::from("table/p1=v1/file.avro");
- let store = TestObjectStore::new_arc(&[(&path, 100)]);
+ register_test_store(&ctx, &[(&path, 100)]);
let opt = ListingOptions {
file_extension: DEFAULT_AVRO_EXTENSION.to_owned(),
@@ -474,10 +458,10 @@ mod tests {
collect_stat: true,
};
- let table_path = ListingTableUrl::parse("file:///table/").unwrap();
+ let table_path = ListingTableUrl::parse("test:///table/").unwrap();
let file_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
- let config = ListingTableConfig::new(store, table_path)
+ let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(file_schema);
let table = ListingTable::try_new(config)?;
@@ -490,7 +474,6 @@ mod tests {
// this will filter out the only file in the store
let filter = Expr::not_eq(col("p1"), lit("v1"));
- let ctx = SessionContext::new();
let scan = table
.scan(&ctx.state(), &None, &[filter], None)
.await
@@ -516,7 +499,7 @@ mod tests {
"bucket/key-prefix/file3",
"bucket/key-prefix/file4",
],
- "file:///bucket/key-prefix/",
+ "test:///bucket/key-prefix/",
12,
5,
)
@@ -530,7 +513,7 @@ mod tests {
"bucket/key-prefix/file2",
"bucket/key-prefix/file3",
],
- "file:///bucket/key-prefix/",
+ "test:///bucket/key-prefix/",
4,
4,
)
@@ -545,14 +528,14 @@ mod tests {
"bucket/key-prefix/file3",
"bucket/key-prefix/file4",
],
- "file:///bucket/key-prefix/",
+ "test:///bucket/key-prefix/",
2,
2,
)
.await?;
// no files => no groups
- assert_list_files_for_scan_grouping(&[], "file:///bucket/key-prefix/", 2, 0)
+ assert_list_files_for_scan_grouping(&[], "test:///bucket/key-prefix/", 2, 0)
.await?;
// files that don't match the prefix
@@ -562,7 +545,7 @@ mod tests {
"bucket/key-prefix/file1",
"bucket/other-prefix/roguefile",
],
- "file:///bucket/key-prefix/",
+ "test:///bucket/key-prefix/",
10,
2,
)
@@ -570,12 +553,16 @@ mod tests {
Ok(())
}
- async fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
+ async fn load_table(
+ ctx: &SessionContext,
+ name: &str,
+ ) -> Result<Arc<dyn TableProvider>> {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, name);
let table_path = ListingTableUrl::parse(filename).unwrap();
- let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path)
- .infer()
+
+ let config = ListingTableConfig::new(table_path)
+ .infer(&ctx.state())
.await?;
let table = ListingTable::try_new(config)?;
Ok(Arc::new(table))
@@ -589,8 +576,8 @@ mod tests {
target_partitions: usize,
output_partitioning: usize,
) -> Result<()> {
- let mock_store =
- TestObjectStore::new_arc(&files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
+ let ctx = SessionContext::new();
+ register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
let format = AvroFormat {};
@@ -605,13 +592,13 @@ mod tests {
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
let table_path = ListingTableUrl::parse(table_prefix).unwrap();
- let config = ListingTableConfig::new(mock_store, table_path)
+ let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(Arc::new(schema));
let table = ListingTable::try_new(config)?;
- let (file_list, _) = table.list_files_for_scan(&[], None).await?;
+ let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
assert_eq!(file_list.len(), output_partitioning);
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 29049fcdf..d5b372c36 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -521,7 +521,6 @@ impl SessionContext {
options: AvroReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
let table_path = ListingTableUrl::parse(table_path)?;
- let object_store = self.runtime_env().object_store(&table_path)?;
let target_partitions = self.copied_config().target_partitions;
let listing_options = options.to_listing_options(target_partitions);
@@ -530,12 +529,12 @@ impl SessionContext {
Some(s) => s,
None => {
listing_options
- .infer_schema(Arc::clone(&object_store), &table_path)
+ .infer_schema(&self.state(), &table_path)
.await?
}
};
- let config = ListingTableConfig::new(object_store, table_path)
+ let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(resolved_schema);
let provider = ListingTable::try_new(config)?;
@@ -549,7 +548,6 @@ impl SessionContext {
options: NdJsonReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
let table_path = ListingTableUrl::parse(table_path)?;
- let object_store = self.runtime_env().object_store(&table_path)?;
let target_partitions = self.copied_config().target_partitions;
let listing_options = options.to_listing_options(target_partitions);
@@ -558,11 +556,11 @@ impl SessionContext {
Some(s) => s,
None => {
listing_options
- .infer_schema(Arc::clone(&object_store), &table_path)
+ .infer_schema(&self.state(), &table_path)
.await?
}
};
- let config = ListingTableConfig::new(object_store, table_path)
+ let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(resolved_schema);
let provider = ListingTable::try_new(config)?;
@@ -585,18 +583,17 @@ impl SessionContext {
options: CsvReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
let table_path = ListingTableUrl::parse(table_path)?;
- let object_store = self.runtime_env().object_store(&table_path)?;
let target_partitions = self.copied_config().target_partitions;
let listing_options = options.to_listing_options(target_partitions);
let resolved_schema = match options.schema {
Some(s) => Arc::new(s.to_owned()),
None => {
listing_options
- .infer_schema(Arc::clone(&object_store), &table_path)
+ .infer_schema(&self.state(), &table_path)
.await?
}
};
- let config = ListingTableConfig::new(object_store, table_path.clone())
+ let config = ListingTableConfig::new(table_path.clone())
.with_listing_options(listing_options)
.with_schema(resolved_schema);
@@ -611,17 +608,16 @@ impl SessionContext {
options: ParquetReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
let table_path = ListingTableUrl::parse(table_path)?;
- let object_store = self.runtime_env().object_store(&table_path)?;
let target_partitions = self.copied_config().target_partitions;
let listing_options = options.to_listing_options(target_partitions);
// with parquet we resolve the schema in all cases
let resolved_schema = listing_options
- .infer_schema(Arc::clone(&object_store), &table_path)
+ .infer_schema(&self.state(), &table_path)
.await?;
- let config = ListingTableConfig::new(object_store, table_path)
+ let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(resolved_schema);
@@ -649,16 +645,11 @@ impl SessionContext {
provided_schema: Option<SchemaRef>,
) -> Result<()> {
let table_path = ListingTableUrl::parse(table_path)?;
- let object_store = self.runtime_env().object_store(&table_path)?;
let resolved_schema = match provided_schema {
- None => {
- options
- .infer_schema(Arc::clone(&object_store), &table_path)
- .await?
- }
+ None => options.infer_schema(&self.state(), &table_path).await?,
Some(s) => s,
};
- let config = ListingTableConfig::new(object_store, table_path)
+ let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(resolved_schema);
let table = ListingTable::try_new(config)?;
diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs
index 26d1471a1..2f134990f 100644
--- a/datafusion/core/src/execution/runtime_env.rs
+++ b/datafusion/core/src/execution/runtime_env.rs
@@ -100,7 +100,7 @@ impl RuntimeEnv {
.register_store(scheme, object_store)
}
- /// Retrieves a `ObjectStore` instance by scheme
+ /// Retrieves a `ObjectStore` instance for a url
pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
self.object_store_registry
.get_by_url(url)
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index f24e1e4e6..bdb01e205 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -252,7 +252,6 @@ mod tests {
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::{displayable, Statistics};
- use crate::test::object_store::TestObjectStore;
fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]))
@@ -261,7 +260,6 @@ mod tests {
fn parquet_exec() -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
- object_store: TestObjectStore::new_arc(&[("x", 100)]),
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema(),
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 8f4d30be0..ae1efb297 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -130,8 +130,12 @@ impl ExecutionPlan for AvroExec {
}
};
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.base_config.object_store_url)?;
+
Ok(Box::pin(FileStream::new(
- Arc::clone(&self.base_config.object_store),
+ object_store,
self.base_config.file_groups[partition].clone(),
fun,
Arc::clone(&self.projected_schema),
@@ -188,7 +192,6 @@ mod tests {
let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?;
let avro_exec = AvroExec::new(FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![meta.into()]],
file_schema,
@@ -258,7 +261,6 @@ mod tests {
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
let avro_exec = AvroExec::new(FileScanConfig {
- object_store,
object_store_url,
file_groups: vec![vec![meta.into()]],
file_schema,
@@ -328,7 +330,6 @@ mod tests {
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table schema.
projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
- object_store,
object_store_url,
file_groups: vec![vec![partitioned_file]],
file_schema,
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index b78662e46..7dddb70e9 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -137,8 +137,12 @@ impl ExecutionPlan for CsvExec {
)) as BatchIter
};
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.base_config.object_store_url)?;
+
Ok(Box::pin(FileStream::new(
- Arc::clone(&self.base_config.object_store),
+ object_store,
self.base_config.file_groups[partition].clone(),
fun,
Arc::clone(&self.projected_schema),
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index 3a179a7a2..397fee5fe 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -116,8 +116,12 @@ impl ExecutionPlan for NdJsonExec {
as BatchIter
};
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.base_config.object_store_url)?;
+
Ok(Box::pin(FileStream::new(
- Arc::clone(&self.base_config.object_store),
+ object_store,
self.base_config.file_groups[partition].clone(),
fun,
Arc::clone(&self.projected_schema),
@@ -192,28 +196,24 @@ mod tests {
use arrow::datatypes::{Field, Schema};
use futures::StreamExt;
- use crate::datafusion_data_access::object_store::local::LocalFileSystem;
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::prelude::NdJsonReadOptions;
use crate::prelude::*;
use datafusion_data_access::object_store::local::local_unpartitioned_file;
- use datafusion_data_access::object_store::ObjectStore;
use tempfile::TempDir;
use super::*;
const TEST_DATA_BASE: &str = "tests/jsons";
- async fn prepare_store() -> (
- Arc<dyn ObjectStore>,
- ObjectStoreUrl,
- Vec<Vec<PartitionedFile>>,
- SchemaRef,
- ) {
- let store = Arc::new(LocalFileSystem {}) as _;
+ async fn prepare_store(
+ ctx: &SessionContext,
+ ) -> (ObjectStoreUrl, Vec<Vec<PartitionedFile>>, SchemaRef) {
let store_url = ObjectStoreUrl::local_filesystem();
+ let store = ctx.runtime_env().object_store(&store_url).unwrap();
+
let path = format!("{}/1.json", TEST_DATA_BASE);
let meta = local_unpartitioned_file(path);
let schema = JsonFormat::default()
@@ -221,7 +221,7 @@ mod tests {
.await
.unwrap();
- (store, store_url, vec![vec![meta.into()]], schema)
+ (store_url, vec![vec![meta.into()]], schema)
}
#[tokio::test]
@@ -230,11 +230,10 @@ mod tests {
let task_ctx = session_ctx.task_ctx();
use arrow::datatypes::DataType;
- let (object_store, object_store_url, file_groups, file_schema) =
- prepare_store().await;
+ let (object_store_url, file_groups, file_schema) =
+ prepare_store(&session_ctx).await;
let exec = NdJsonExec::new(FileScanConfig {
- object_store,
object_store_url,
file_groups,
file_schema,
@@ -289,8 +288,8 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
use arrow::datatypes::DataType;
- let (object_store, object_store_url, file_groups, actual_schema) =
- prepare_store().await;
+ let (object_store_url, file_groups, actual_schema) =
+ prepare_store(&session_ctx).await;
let mut fields = actual_schema.fields().clone();
fields.push(Field::new("missing_col", DataType::Int32, true));
@@ -299,7 +298,6 @@ mod tests {
let file_schema = Arc::new(Schema::new(fields));
let exec = NdJsonExec::new(FileScanConfig {
- object_store,
object_store_url,
file_groups,
file_schema,
@@ -330,11 +328,10 @@ mod tests {
async fn nd_json_exec_file_projection() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
- let (object_store, object_store_url, file_groups, file_schema) =
- prepare_store().await;
+ let (object_store_url, file_groups, file_schema) =
+ prepare_store(&session_ctx).await;
let exec = NdJsonExec::new(FileScanConfig {
- object_store,
object_store_url,
file_groups,
file_schema,
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 0fed497d3..86015c472 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -45,7 +45,6 @@ use crate::{
};
use arrow::array::{new_null_array, UInt16BufferBuilder};
use arrow::record_batch::RecordBatchOptions;
-use datafusion_data_access::object_store::ObjectStore;
use lazy_static::lazy_static;
use log::info;
use std::{
@@ -66,8 +65,6 @@ lazy_static! {
/// any given file format.
#[derive(Debug, Clone)]
pub struct FileScanConfig {
- /// Store from which the `files` should be fetched
- pub object_store: Arc<dyn ObjectStore>,
/// Object store URL
pub object_store_url: ObjectStoreUrl,
/// Schema before projection. It contains the columns that are expected
@@ -402,7 +399,7 @@ fn create_dict_array(
#[cfg(test)]
mod tests {
use crate::{
- test::{build_table_i32, columns, object_store::TestObjectStore},
+ test::{build_table_i32, columns},
test_util::aggr_test_schema,
};
@@ -659,7 +656,6 @@ mod tests {
file_schema,
file_groups: vec![vec![]],
limit: None,
- object_store: TestObjectStore::new_arc(&[]),
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
projection,
statistics,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 81c4dd427..b1e629be5 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -215,11 +215,15 @@ impl ExecutionPlan for ParquetExec {
&self.base_config.table_partition_cols,
);
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.base_config.object_store_url)?;
+
let stream = ParquetExecStream {
error: false,
partition_index,
metrics: self.metrics.clone(),
- object_store: self.base_config.object_store.clone(),
+ object_store,
pruning_predicate: self.pruning_predicate.clone(),
batch_size: context.session_config().batch_size,
schema: self.projected_schema.clone(),
@@ -686,7 +690,6 @@ mod tests {
// prepare the scan
let parquet_exec = ParquetExec::new(
FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![file_groups],
file_schema,
@@ -1073,7 +1076,6 @@ mod tests {
) -> Result<()> {
let parquet_exec = ParquetExec::new(
FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups,
file_schema,
@@ -1139,9 +1141,15 @@ mod tests {
async fn parquet_exec_with_partition() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
+
+ let object_store_url = ObjectStoreUrl::local_filesystem();
+ let store = session_ctx
+ .runtime_env()
+ .object_store(&object_store_url)
+ .unwrap();
+
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/alltypes_plain.parquet", testdata);
- let store = Arc::new(LocalFileSystem {}) as _;
let meta = local_unpartitioned_file(filename);
@@ -1162,8 +1170,7 @@ mod tests {
let parquet_exec = ParquetExec::new(
FileScanConfig {
- object_store: store,
- object_store_url: ObjectStoreUrl::local_filesystem(),
+ object_store_url,
file_groups: vec![vec![partitioned_file]],
file_schema: schema,
statistics: Statistics::default(),
@@ -1222,7 +1229,6 @@ mod tests {
let parquet_exec = ParquetExec::new(
FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![partitioned_file]],
file_schema: Arc::new(Schema::empty()),
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 19a9db9a4..dd00a5028 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -29,9 +29,7 @@ use array::{Array, ArrayRef};
use arrow::array::{self, DecimalBuilder, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
-use datafusion_data_access::object_store::local::{
- local_unpartitioned_file, LocalFileSystem,
-};
+use datafusion_data_access::object_store::local::local_unpartitioned_file;
use futures::{Future, FutureExt};
use std::fs::File;
use std::io::prelude::*;
@@ -120,7 +118,6 @@ pub fn partitioned_csv_config(
};
Ok(FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: schema,
file_groups,
diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs
index 95a6f16ed..fdd053346 100644
--- a/datafusion/core/src/test/object_store.rs
+++ b/datafusion/core/src/test/object_store.rs
@@ -26,9 +26,16 @@ use crate::datafusion_data_access::{
object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore},
FileMeta, Result, SizedFile,
};
+use crate::prelude::SessionContext;
use async_trait::async_trait;
use futures::{stream, AsyncRead, StreamExt};
+/// Returns a test object store with the provided `ctx`
+pub(crate) fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
+ ctx.runtime_env()
+ .register_object_store("test", TestObjectStore::new_arc(files));
+}
+
#[derive(Debug)]
/// An object store implem that is useful for testing.
/// `ObjectReader`s are filled with zero bytes.
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index a7e6ea452..e3da9d986 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -54,7 +54,7 @@ async fn parquet_distinct_partition_col() -> Result<()> {
"year=2021/month=10/day=28/file.parquet",
],
&["year", "month", "day"],
- "",
+ "mirror:///",
"alltypes_plain.parquet",
)
.await;
@@ -183,7 +183,7 @@ async fn csv_filter_with_file_col() -> Result<()> {
"mytable/date=2021-10-28/file.csv",
],
&["date"],
- "file:///mytable",
+ "mirror:///mytable",
);
let result = ctx
@@ -219,7 +219,7 @@ async fn csv_projection_on_partition() -> Result<()> {
"mytable/date=2021-10-28/file.csv",
],
&["date"],
- "file:///mytable",
+ "mirror:///mytable",
);
let result = ctx
@@ -256,7 +256,7 @@ async fn csv_grouping_by_partition() -> Result<()> {
"mytable/date=2021-10-28/file.csv",
],
&["date"],
- "file:///mytable",
+ "mirror:///mytable",
);
let result = ctx
@@ -290,7 +290,7 @@ async fn parquet_multiple_partitions() -> Result<()> {
"year=2021/month=10/day=28/file.parquet",
],
&["year", "month", "day"],
- "",
+ "mirror:///",
"alltypes_plain.parquet",
)
.await;
@@ -332,7 +332,7 @@ async fn parquet_statistics() -> Result<()> {
"year=2021/month=10/day=28/file.parquet",
],
&["year", "month", "day"],
- "",
+ "mirror:///",
// This is the only file we found in the test set with
// actual stats. It has 1 column / 1 row.
"single_nan.parquet",
@@ -392,7 +392,7 @@ async fn parquet_overlapping_columns() -> Result<()> {
"id=3/file.parquet",
],
&["id"],
- "",
+ "mirror:///",
"alltypes_plain.parquet",
)
.await;
@@ -415,13 +415,16 @@ fn register_partitioned_aggregate_csv(
let testdata = arrow_test_data();
let csv_file_path = format!("{}/csv/aggregate_test_100.csv", testdata);
let file_schema = test_util::aggr_test_schema();
- let object_store = MirroringObjectStore::new_arc(csv_file_path, store_paths);
+ ctx.runtime_env().register_object_store(
+ "mirror",
+ MirroringObjectStore::new_arc(csv_file_path, store_paths),
+ );
let mut options = ListingOptions::new(Arc::new(CsvFormat::default()));
options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect();
let table_path = ListingTableUrl::parse(table_path).unwrap();
- let config = ListingTableConfig::new(object_store, table_path)
+ let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(file_schema);
let table = ListingTable::try_new(config).unwrap();
@@ -439,23 +442,25 @@ async fn register_partitioned_alltypes_parquet(
) {
let testdata = parquet_test_data();
let parquet_file_path = format!("{}/{}", testdata, source_file);
- let object_store =
- MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths);
+ ctx.runtime_env().register_object_store(
+ "mirror",
+ MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths),
+ );
let mut options = ListingOptions::new(Arc::new(ParquetFormat::default()));
options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect();
options.collect_stat = true;
- let table_path = ListingTableUrl::parse(format!("mirror:///{}", table_path)).unwrap();
+ let table_path = ListingTableUrl::parse(table_path).unwrap();
let store_path =
ListingTableUrl::parse(format!("mirror:///{}", store_paths[0])).unwrap();
let file_schema = options
- .infer_schema(Arc::clone(&object_store), &store_path)
+ .infer_schema(&ctx.state(), &store_path)
.await
.expect("Parquet schema inference failed");
- let config = ListingTableConfig::new(object_store, table_path)
+ let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(file_schema);
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 947ebc699..1de6af06a 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -97,7 +97,6 @@ async fn get_exec(
let exec = format
.create_physical_plan(
FileScanConfig {
- object_store,
object_store_url,
file_schema,
file_groups,
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index 4eff00e01..c6ab11458 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -420,15 +420,7 @@ impl AsLogicalPlan for LogicalPlanNode {
target_partitions: scan.target_partitions as usize,
};
- let object_store = ctx.runtime_env().object_store(&table_path)?;
-
- println!(
- "Found object store {:?} for path {}",
- object_store,
- scan.path.as_str()
- );
-
- let config = ListingTableConfig::new(object_store, table_path)
+ let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(Arc::new(schema));
diff --git a/dev/build-arrow-ballista.sh b/dev/build-arrow-ballista.sh
index 1d287c460..82c0be109 100755
--- a/dev/build-arrow-ballista.sh
+++ b/dev/build-arrow-ballista.sh
@@ -24,7 +24,7 @@ rm -rf arrow-ballista 2>/dev/null
# clone the repo
# TODO make repo/branch configurable
-git clone https://github.com/tustvold/arrow-ballista -b session-state-table-provider
+git clone https://github.com/tustvold/arrow-ballista -b remove-object-store-plans
# update dependencies to local crates
python ./dev/make-ballista-deps-local.py