You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/06/01 20:58:07 UTC

[arrow-datafusion] branch master updated: Remove ObjectStore from FileScanConfig and ListingTableConfig (#2668)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 585bc3a62 Remove ObjectStore from FileScanConfig and ListingTableConfig (#2668)
585bc3a62 is described below

commit 585bc3a629b92ea7a86ebfe8bf762dbef4155710
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Jun 1 21:58:02 2022 +0100

    Remove ObjectStore from FileScanConfig and ListingTableConfig (#2668)
    
    * Remove ObjectStore from FileScanConfig and ListingTableConfig
    
    * Update ballista pin
---
 benchmarks/src/bin/tpch.rs                         |   3 +-
 datafusion-examples/examples/flight_server.rs      |   4 +-
 datafusion/core/benches/sort_limit_query_sql.rs    |   3 +-
 datafusion/core/src/catalog/schema.rs              |   7 +-
 datafusion/core/src/datasource/file_format/mod.rs  |   1 -
 datafusion/core/src/datasource/listing/table.rs    | 141 ++++++++++-----------
 datafusion/core/src/execution/context.rs           |  29 ++---
 datafusion/core/src/execution/runtime_env.rs       |   2 +-
 .../core/src/physical_optimizer/repartition.rs     |   2 -
 .../core/src/physical_plan/file_format/avro.rs     |   9 +-
 .../core/src/physical_plan/file_format/csv.rs      |   6 +-
 .../core/src/physical_plan/file_format/json.rs     |  37 +++---
 .../core/src/physical_plan/file_format/mod.rs      |   6 +-
 .../core/src/physical_plan/file_format/parquet.rs  |  20 ++-
 datafusion/core/src/test/mod.rs                    |   5 +-
 datafusion/core/src/test/object_store.rs           |   7 +
 datafusion/core/tests/path_partition.rs            |  33 +++--
 datafusion/core/tests/row.rs                       |   1 -
 datafusion/proto/src/logical_plan.rs               |  10 +-
 dev/build-arrow-ballista.sh                        |   2 +-
 20 files changed, 152 insertions(+), 176 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 4e49bff09..5d4895698 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -44,7 +44,6 @@ use datafusion::{
 };
 use datafusion::{
     arrow::util::pretty,
-    datafusion_data_access::object_store::local::LocalFileSystem,
     datasource::listing::{ListingOptions, ListingTable, ListingTableConfig},
 };
 
@@ -427,7 +426,7 @@ fn get_table(
     };
 
     let table_path = ListingTableUrl::parse(path)?;
-    let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path)
+    let config = ListingTableConfig::new(table_path)
         .with_listing_options(options)
         .with_schema(schema);
 
diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs
index a3d7c0f56..5dbd694ac 100644
--- a/datafusion-examples/examples/flight_server.rs
+++ b/datafusion-examples/examples/flight_server.rs
@@ -19,7 +19,6 @@ use std::pin::Pin;
 use std::sync::Arc;
 
 use arrow_flight::SchemaAsIpc;
-use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
 use futures::Stream;
@@ -71,8 +70,9 @@ impl FlightService for FlightServiceImpl {
         let table_path =
             ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?;
 
+        let ctx = SessionContext::new();
         let schema = listing_options
-            .infer_schema(Arc::new(LocalFileSystem {}), &table_path)
+            .infer_schema(&ctx.state(), &table_path)
             .await
             .unwrap();
 
diff --git a/datafusion/core/benches/sort_limit_query_sql.rs b/datafusion/core/benches/sort_limit_query_sql.rs
index 198eb941f..e7aa33bd7 100644
--- a/datafusion/core/benches/sort_limit_query_sql.rs
+++ b/datafusion/core/benches/sort_limit_query_sql.rs
@@ -18,7 +18,6 @@
 #[macro_use]
 extern crate criterion;
 use criterion::Criterion;
-use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
 use datafusion::datasource::file_format::csv::CsvFormat;
 use datafusion::datasource::listing::{
     ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
@@ -71,7 +70,7 @@ fn create_context() -> Arc<Mutex<SessionContext>> {
     // create CSV data source
     let listing_options = ListingOptions::new(Arc::new(CsvFormat::default()));
 
-    let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path)
+    let config = ListingTableConfig::new(table_path)
         .with_listing_options(listing_options)
         .with_schema(schema);
 
diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs
index 6771f3e8d..db25c1edc 100644
--- a/datafusion/core/src/catalog/schema.rs
+++ b/datafusion/core/src/catalog/schema.rs
@@ -171,11 +171,10 @@ mod tests {
 
         let ctx = SessionContext::new();
         let store = Arc::new(LocalFileSystem {});
-        ctx.runtime_env()
-            .register_object_store("file", store.clone());
+        ctx.runtime_env().register_object_store("file", store);
 
-        let config = ListingTableConfig::new(store, table_path)
-            .infer()
+        let config = ListingTableConfig::new(table_path)
+            .infer(&ctx.state())
             .await
             .unwrap();
         let table = ListingTable::try_new(config).unwrap();
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index eae86fa9c..a15750394 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -115,7 +115,6 @@ pub(crate) mod test_util {
         let exec = format
             .create_physical_plan(
                 FileScanConfig {
-                    object_store: store,
                     object_store_url: ObjectStoreUrl::local_filesystem(),
                     file_schema,
                     file_groups,
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index bc7ed2604..29cde3c9f 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -45,14 +45,11 @@ use crate::{
 };
 
 use super::PartitionedFile;
-use datafusion_data_access::object_store::ObjectStore;
 
 use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
 
 /// Configuration for creating a 'ListingTable'  
 pub struct ListingTableConfig {
-    /// `ObjectStore` that contains the files for the `ListingTable`.
-    pub object_store: Arc<dyn ObjectStore>,
     /// Path on the `ObjectStore` for creating `ListingTable`.
     pub table_path: ListingTableUrl,
     /// Optional `SchemaRef` for the to be created `ListingTable`.
@@ -63,9 +60,8 @@ pub struct ListingTableConfig {
 
 impl ListingTableConfig {
     /// Creates new `ListingTableConfig`.  The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_path`.
-    pub fn new(object_store: Arc<dyn ObjectStore>, table_path: ListingTableUrl) -> Self {
+    pub fn new(table_path: ListingTableUrl) -> Self {
         Self {
-            object_store,
             table_path,
             file_schema: None,
             options: None,
@@ -74,7 +70,6 @@ impl ListingTableConfig {
     /// Add `schema` to `ListingTableConfig`
     pub fn with_schema(self, schema: SchemaRef) -> Self {
         Self {
-            object_store: self.object_store,
             table_path: self.table_path,
             file_schema: Some(schema),
             options: self.options,
@@ -84,7 +79,6 @@ impl ListingTableConfig {
     /// Add `listing_options` to `ListingTableConfig`
     pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
         Self {
-            object_store: self.object_store,
             table_path: self.table_path,
             file_schema: self.file_schema,
             options: Some(listing_options),
@@ -105,10 +99,12 @@ impl ListingTableConfig {
     }
 
     /// Infer `ListingOptions` based on `table_path` suffix.
-    pub async fn infer_options(self) -> Result<Self> {
+    pub async fn infer_options(self, ctx: &SessionState) -> Result<Self> {
+        let store = ctx.runtime_env.object_store(&self.table_path)?;
+
         let file = self
             .table_path
-            .list_all_files(self.object_store.as_ref(), "")
+            .list_all_files(store.as_ref(), "")
             .next()
             .await
             .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
@@ -123,12 +119,11 @@ impl ListingTableConfig {
             format,
             collect_stat: true,
             file_extension: file_type.to_string(),
-            target_partitions: num_cpus::get(),
+            target_partitions: ctx.config.target_partitions,
             table_partition_cols: vec![],
         };
 
         Ok(Self {
-            object_store: self.object_store,
             table_path: self.table_path,
             file_schema: self.file_schema,
             options: Some(listing_options),
@@ -136,15 +131,12 @@ impl ListingTableConfig {
     }
 
     /// Infer `SchemaRef` based on `table_path` suffix.  Requires `self.options` to be set prior to using.
-    pub async fn infer_schema(self) -> Result<Self> {
+    pub async fn infer_schema(self, ctx: &SessionState) -> Result<Self> {
         match self.options {
             Some(options) => {
-                let schema = options
-                    .infer_schema(self.object_store.clone(), &self.table_path)
-                    .await?;
+                let schema = options.infer_schema(ctx, &self.table_path).await?;
 
                 Ok(Self {
-                    object_store: self.object_store,
                     table_path: self.table_path,
                     file_schema: Some(schema),
                     options: Some(options),
@@ -157,8 +149,8 @@ impl ListingTableConfig {
     }
 
     /// Convenience wrapper for calling `infer_options` and `infer_schema`
-    pub async fn infer(self) -> Result<Self> {
-        self.infer_options().await?.infer_schema().await
+    pub async fn infer(self, ctx: &SessionState) -> Result<Self> {
+        self.infer_options(ctx).await?.infer_schema(ctx).await
     }
 }
 
@@ -212,11 +204,16 @@ impl ListingOptions {
     /// locally or ask a remote service to do it (e.g a scheduler).
     pub async fn infer_schema<'a>(
         &'a self,
-        store: Arc<dyn ObjectStore>,
+        ctx: &SessionState,
         table_path: &'a ListingTableUrl,
     ) -> Result<SchemaRef> {
-        let list_stream = table_path.list_all_files(store.as_ref(), &self.file_extension);
-        let files: Vec<_> = list_stream.try_collect().await?;
+        let store = ctx.runtime_env.object_store(table_path)?;
+
+        let files: Vec<_> = table_path
+            .list_all_files(store.as_ref(), &self.file_extension)
+            .try_collect()
+            .await?;
+
         self.format.infer_schema(&store, &files).await
     }
 }
@@ -224,7 +221,6 @@ impl ListingOptions {
 /// An implementation of `TableProvider` that uses the object store
 /// or file system listing capability to get the list of files.
 pub struct ListingTable {
-    object_store: Arc<dyn ObjectStore>,
     table_path: ListingTableUrl,
     /// File fields only
     file_schema: SchemaRef,
@@ -261,8 +257,7 @@ impl ListingTable {
         }
 
         let table = Self {
-            object_store: config.object_store.clone(),
-            table_path: config.table_path.clone(),
+            table_path: config.table_path,
             file_schema,
             table_schema: Arc::new(Schema::new(table_fields)),
             options,
@@ -271,11 +266,6 @@ impl ListingTable {
         Ok(table)
     }
 
-    /// Get object store ref
-    pub fn object_store(&self) -> &Arc<dyn ObjectStore> {
-        &self.object_store
-    }
-
     /// Get path ref
     pub fn table_path(&self) -> &ListingTableUrl {
         &self.table_path
@@ -303,13 +293,13 @@ impl TableProvider for ListingTable {
 
     async fn scan(
         &self,
-        _ctx: &SessionState,
+        ctx: &SessionState,
         projection: &Option<Vec<usize>>,
         filters: &[Expr],
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let (partitioned_file_lists, statistics) =
-            self.list_files_for_scan(filters, limit).await?;
+            self.list_files_for_scan(ctx, filters, limit).await?;
 
         // if no files need to be read, return an `EmptyExec`
         if partitioned_file_lists.is_empty() {
@@ -323,7 +313,6 @@ impl TableProvider for ListingTable {
             .format
             .create_physical_plan(
                 FileScanConfig {
-                    object_store: Arc::clone(&self.object_store),
                     object_store_url: self.table_path.object_store(),
                     file_schema: Arc::clone(&self.file_schema),
                     file_groups: partitioned_file_lists,
@@ -358,12 +347,14 @@ impl ListingTable {
     /// be distributed to different threads / executors.
     async fn list_files_for_scan<'a>(
         &'a self,
+        ctx: &'a SessionState,
         filters: &'a [Expr],
         limit: Option<usize>,
     ) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
+        let store = ctx.runtime_env.object_store(&self.table_path)?;
         // list files (with partitions)
         let file_list = pruned_partition_list(
-            self.object_store.as_ref(),
+            store.as_ref(),
             &self.table_path,
             filters,
             &self.options.file_extension,
@@ -373,25 +364,17 @@ impl ListingTable {
 
         // collect the statistics if required by the config
         // TODO: Collect statistics and schema in single-pass
-        let object_store = Arc::clone(&self.object_store);
-        let files = file_list.then(move |part_file| {
-            let object_store = object_store.clone();
-            async move {
-                let part_file = part_file?;
-                let statistics = if self.options.collect_stat {
-                    self.options
-                        .format
-                        .infer_stats(
-                            &object_store,
-                            self.file_schema.clone(),
-                            &part_file.file_meta,
-                        )
-                        .await?
-                } else {
-                    Statistics::default()
-                };
-                Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
-            }
+        let files = file_list.then(|part_file| async {
+            let part_file = part_file?;
+            let statistics = if self.options.collect_stat {
+                self.options
+                    .format
+                    .infer_stats(&store, self.file_schema.clone(), &part_file.file_meta)
+                    .await?
+            } else {
+                Statistics::default()
+            };
+            Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
         });
 
         let (files, statistics) =
@@ -409,10 +392,9 @@ mod tests {
     use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
     use crate::prelude::SessionContext;
     use crate::{
-        datafusion_data_access::object_store::local::LocalFileSystem,
         datasource::file_format::{avro::AvroFormat, parquet::ParquetFormat},
         logical_plan::{col, lit},
-        test::{columns, object_store::TestObjectStore},
+        test::{columns, object_store::register_test_store},
     };
     use arrow::datatypes::DataType;
 
@@ -422,7 +404,7 @@ mod tests {
     async fn read_single_file() -> Result<()> {
         let ctx = SessionContext::new();
 
-        let table = load_table("alltypes_plain.parquet").await?;
+        let table = load_table(&ctx, "alltypes_plain.parquet").await?;
         let projection = None;
         let exec = table
             .scan(&ctx.state(), &projection, &[], None)
@@ -444,17 +426,18 @@ mod tests {
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
         let table_path = ListingTableUrl::parse(filename).unwrap();
+
+        let ctx = SessionContext::new();
+        let state = ctx.state();
+
         let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
-        let schema = opt
-            .infer_schema(Arc::new(LocalFileSystem {}), &table_path)
-            .await?;
-        let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path)
+        let schema = opt.infer_schema(&state, &table_path).await?;
+        let config = ListingTableConfig::new(table_path)
             .with_listing_options(opt)
             .with_schema(schema);
         let table = ListingTable::try_new(config)?;
 
-        let ctx = SessionContext::new();
-        let exec = table.scan(&ctx.state(), &None, &[], None).await?;
+        let exec = table.scan(&state, &None, &[], None).await?;
         assert_eq!(exec.statistics().num_rows, Some(8));
         assert_eq!(exec.statistics().total_byte_size, Some(671));
 
@@ -463,8 +446,9 @@ mod tests {
 
     #[tokio::test]
     async fn read_empty_table() -> Result<()> {
+        let ctx = SessionContext::new();
         let path = String::from("table/p1=v1/file.avro");
-        let store = TestObjectStore::new_arc(&[(&path, 100)]);
+        register_test_store(&ctx, &[(&path, 100)]);
 
         let opt = ListingOptions {
             file_extension: DEFAULT_AVRO_EXTENSION.to_owned(),
@@ -474,10 +458,10 @@ mod tests {
             collect_stat: true,
         };
 
-        let table_path = ListingTableUrl::parse("file:///table/").unwrap();
+        let table_path = ListingTableUrl::parse("test:///table/").unwrap();
         let file_schema =
             Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
-        let config = ListingTableConfig::new(store, table_path)
+        let config = ListingTableConfig::new(table_path)
             .with_listing_options(opt)
             .with_schema(file_schema);
         let table = ListingTable::try_new(config)?;
@@ -490,7 +474,6 @@ mod tests {
         // this will filter out the only file in the store
         let filter = Expr::not_eq(col("p1"), lit("v1"));
 
-        let ctx = SessionContext::new();
         let scan = table
             .scan(&ctx.state(), &None, &[filter], None)
             .await
@@ -516,7 +499,7 @@ mod tests {
                 "bucket/key-prefix/file3",
                 "bucket/key-prefix/file4",
             ],
-            "file:///bucket/key-prefix/",
+            "test:///bucket/key-prefix/",
             12,
             5,
         )
@@ -530,7 +513,7 @@ mod tests {
                 "bucket/key-prefix/file2",
                 "bucket/key-prefix/file3",
             ],
-            "file:///bucket/key-prefix/",
+            "test:///bucket/key-prefix/",
             4,
             4,
         )
@@ -545,14 +528,14 @@ mod tests {
                 "bucket/key-prefix/file3",
                 "bucket/key-prefix/file4",
             ],
-            "file:///bucket/key-prefix/",
+            "test:///bucket/key-prefix/",
             2,
             2,
         )
         .await?;
 
         // no files => no groups
-        assert_list_files_for_scan_grouping(&[], "file:///bucket/key-prefix/", 2, 0)
+        assert_list_files_for_scan_grouping(&[], "test:///bucket/key-prefix/", 2, 0)
             .await?;
 
         // files that don't match the prefix
@@ -562,7 +545,7 @@ mod tests {
                 "bucket/key-prefix/file1",
                 "bucket/other-prefix/roguefile",
             ],
-            "file:///bucket/key-prefix/",
+            "test:///bucket/key-prefix/",
             10,
             2,
         )
@@ -570,12 +553,16 @@ mod tests {
         Ok(())
     }
 
-    async fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
+    async fn load_table(
+        ctx: &SessionContext,
+        name: &str,
+    ) -> Result<Arc<dyn TableProvider>> {
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/{}", testdata, name);
         let table_path = ListingTableUrl::parse(filename).unwrap();
-        let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path)
-            .infer()
+
+        let config = ListingTableConfig::new(table_path)
+            .infer(&ctx.state())
             .await?;
         let table = ListingTable::try_new(config)?;
         Ok(Arc::new(table))
@@ -589,8 +576,8 @@ mod tests {
         target_partitions: usize,
         output_partitioning: usize,
     ) -> Result<()> {
-        let mock_store =
-            TestObjectStore::new_arc(&files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
+        let ctx = SessionContext::new();
+        register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
 
         let format = AvroFormat {};
 
@@ -605,13 +592,13 @@ mod tests {
         let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
 
         let table_path = ListingTableUrl::parse(table_prefix).unwrap();
-        let config = ListingTableConfig::new(mock_store, table_path)
+        let config = ListingTableConfig::new(table_path)
             .with_listing_options(opt)
             .with_schema(Arc::new(schema));
 
         let table = ListingTable::try_new(config)?;
 
-        let (file_list, _) = table.list_files_for_scan(&[], None).await?;
+        let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
 
         assert_eq!(file_list.len(), output_partitioning);
 
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 29049fcdf..d5b372c36 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -521,7 +521,6 @@ impl SessionContext {
         options: AvroReadOptions<'_>,
     ) -> Result<Arc<DataFrame>> {
         let table_path = ListingTableUrl::parse(table_path)?;
-        let object_store = self.runtime_env().object_store(&table_path)?;
         let target_partitions = self.copied_config().target_partitions;
 
         let listing_options = options.to_listing_options(target_partitions);
@@ -530,12 +529,12 @@ impl SessionContext {
             Some(s) => s,
             None => {
                 listing_options
-                    .infer_schema(Arc::clone(&object_store), &table_path)
+                    .infer_schema(&self.state(), &table_path)
                     .await?
             }
         };
 
-        let config = ListingTableConfig::new(object_store, table_path)
+        let config = ListingTableConfig::new(table_path)
             .with_listing_options(listing_options)
             .with_schema(resolved_schema);
         let provider = ListingTable::try_new(config)?;
@@ -549,7 +548,6 @@ impl SessionContext {
         options: NdJsonReadOptions<'_>,
     ) -> Result<Arc<DataFrame>> {
         let table_path = ListingTableUrl::parse(table_path)?;
-        let object_store = self.runtime_env().object_store(&table_path)?;
         let target_partitions = self.copied_config().target_partitions;
 
         let listing_options = options.to_listing_options(target_partitions);
@@ -558,11 +556,11 @@ impl SessionContext {
             Some(s) => s,
             None => {
                 listing_options
-                    .infer_schema(Arc::clone(&object_store), &table_path)
+                    .infer_schema(&self.state(), &table_path)
                     .await?
             }
         };
-        let config = ListingTableConfig::new(object_store, table_path)
+        let config = ListingTableConfig::new(table_path)
             .with_listing_options(listing_options)
             .with_schema(resolved_schema);
         let provider = ListingTable::try_new(config)?;
@@ -585,18 +583,17 @@ impl SessionContext {
         options: CsvReadOptions<'_>,
     ) -> Result<Arc<DataFrame>> {
         let table_path = ListingTableUrl::parse(table_path)?;
-        let object_store = self.runtime_env().object_store(&table_path)?;
         let target_partitions = self.copied_config().target_partitions;
         let listing_options = options.to_listing_options(target_partitions);
         let resolved_schema = match options.schema {
             Some(s) => Arc::new(s.to_owned()),
             None => {
                 listing_options
-                    .infer_schema(Arc::clone(&object_store), &table_path)
+                    .infer_schema(&self.state(), &table_path)
                     .await?
             }
         };
-        let config = ListingTableConfig::new(object_store, table_path.clone())
+        let config = ListingTableConfig::new(table_path.clone())
             .with_listing_options(listing_options)
             .with_schema(resolved_schema);
 
@@ -611,17 +608,16 @@ impl SessionContext {
         options: ParquetReadOptions<'_>,
     ) -> Result<Arc<DataFrame>> {
         let table_path = ListingTableUrl::parse(table_path)?;
-        let object_store = self.runtime_env().object_store(&table_path)?;
         let target_partitions = self.copied_config().target_partitions;
 
         let listing_options = options.to_listing_options(target_partitions);
 
         // with parquet we resolve the schema in all cases
         let resolved_schema = listing_options
-            .infer_schema(Arc::clone(&object_store), &table_path)
+            .infer_schema(&self.state(), &table_path)
             .await?;
 
-        let config = ListingTableConfig::new(object_store, table_path)
+        let config = ListingTableConfig::new(table_path)
             .with_listing_options(listing_options)
             .with_schema(resolved_schema);
 
@@ -649,16 +645,11 @@ impl SessionContext {
         provided_schema: Option<SchemaRef>,
     ) -> Result<()> {
         let table_path = ListingTableUrl::parse(table_path)?;
-        let object_store = self.runtime_env().object_store(&table_path)?;
         let resolved_schema = match provided_schema {
-            None => {
-                options
-                    .infer_schema(Arc::clone(&object_store), &table_path)
-                    .await?
-            }
+            None => options.infer_schema(&self.state(), &table_path).await?,
             Some(s) => s,
         };
-        let config = ListingTableConfig::new(object_store, table_path)
+        let config = ListingTableConfig::new(table_path)
             .with_listing_options(options)
             .with_schema(resolved_schema);
         let table = ListingTable::try_new(config)?;
diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs
index 26d1471a1..2f134990f 100644
--- a/datafusion/core/src/execution/runtime_env.rs
+++ b/datafusion/core/src/execution/runtime_env.rs
@@ -100,7 +100,7 @@ impl RuntimeEnv {
             .register_store(scheme, object_store)
     }
 
-    /// Retrieves a `ObjectStore` instance by scheme
+    /// Retrieves a `ObjectStore` instance for a url
     pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
         self.object_store_registry
             .get_by_url(url)
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index f24e1e4e6..bdb01e205 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -252,7 +252,6 @@ mod tests {
     use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
     use crate::physical_plan::union::UnionExec;
     use crate::physical_plan::{displayable, Statistics};
-    use crate::test::object_store::TestObjectStore;
 
     fn schema() -> SchemaRef {
         Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]))
@@ -261,7 +260,6 @@ mod tests {
     fn parquet_exec() -> Arc<ParquetExec> {
         Arc::new(ParquetExec::new(
             FileScanConfig {
-                object_store: TestObjectStore::new_arc(&[("x", 100)]),
                 object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
                 file_schema: schema(),
                 file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 8f4d30be0..ae1efb297 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -130,8 +130,12 @@ impl ExecutionPlan for AvroExec {
             }
         };
 
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.base_config.object_store_url)?;
+
         Ok(Box::pin(FileStream::new(
-            Arc::clone(&self.base_config.object_store),
+            object_store,
             self.base_config.file_groups[partition].clone(),
             fun,
             Arc::clone(&self.projected_schema),
@@ -188,7 +192,6 @@ mod tests {
         let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?;
 
         let avro_exec = AvroExec::new(FileScanConfig {
-            object_store: Arc::new(LocalFileSystem {}),
             object_store_url: ObjectStoreUrl::local_filesystem(),
             file_groups: vec![vec![meta.into()]],
             file_schema,
@@ -258,7 +261,6 @@ mod tests {
         let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
 
         let avro_exec = AvroExec::new(FileScanConfig {
-            object_store,
             object_store_url,
             file_groups: vec![vec![meta.into()]],
             file_schema,
@@ -328,7 +330,6 @@ mod tests {
             // select specific columns of the files as well as the partitioning
             // column which is supposed to be the last column in the table schema.
             projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
-            object_store,
             object_store_url,
             file_groups: vec![vec![partitioned_file]],
             file_schema,
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index b78662e46..7dddb70e9 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -137,8 +137,12 @@ impl ExecutionPlan for CsvExec {
             )) as BatchIter
         };
 
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.base_config.object_store_url)?;
+
         Ok(Box::pin(FileStream::new(
-            Arc::clone(&self.base_config.object_store),
+            object_store,
             self.base_config.file_groups[partition].clone(),
             fun,
             Arc::clone(&self.projected_schema),
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index 3a179a7a2..397fee5fe 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -116,8 +116,12 @@ impl ExecutionPlan for NdJsonExec {
                 as BatchIter
         };
 
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.base_config.object_store_url)?;
+
         Ok(Box::pin(FileStream::new(
-            Arc::clone(&self.base_config.object_store),
+            object_store,
             self.base_config.file_groups[partition].clone(),
             fun,
             Arc::clone(&self.projected_schema),
@@ -192,28 +196,24 @@ mod tests {
     use arrow::datatypes::{Field, Schema};
     use futures::StreamExt;
 
-    use crate::datafusion_data_access::object_store::local::LocalFileSystem;
     use crate::datasource::file_format::{json::JsonFormat, FileFormat};
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::prelude::NdJsonReadOptions;
     use crate::prelude::*;
     use datafusion_data_access::object_store::local::local_unpartitioned_file;
-    use datafusion_data_access::object_store::ObjectStore;
     use tempfile::TempDir;
 
     use super::*;
 
     const TEST_DATA_BASE: &str = "tests/jsons";
 
-    async fn prepare_store() -> (
-        Arc<dyn ObjectStore>,
-        ObjectStoreUrl,
-        Vec<Vec<PartitionedFile>>,
-        SchemaRef,
-    ) {
-        let store = Arc::new(LocalFileSystem {}) as _;
+    async fn prepare_store(
+        ctx: &SessionContext,
+    ) -> (ObjectStoreUrl, Vec<Vec<PartitionedFile>>, SchemaRef) {
         let store_url = ObjectStoreUrl::local_filesystem();
+        let store = ctx.runtime_env().object_store(&store_url).unwrap();
+
         let path = format!("{}/1.json", TEST_DATA_BASE);
         let meta = local_unpartitioned_file(path);
         let schema = JsonFormat::default()
@@ -221,7 +221,7 @@ mod tests {
             .await
             .unwrap();
 
-        (store, store_url, vec![vec![meta.into()]], schema)
+        (store_url, vec![vec![meta.into()]], schema)
     }
 
     #[tokio::test]
@@ -230,11 +230,10 @@ mod tests {
         let task_ctx = session_ctx.task_ctx();
         use arrow::datatypes::DataType;
 
-        let (object_store, object_store_url, file_groups, file_schema) =
-            prepare_store().await;
+        let (object_store_url, file_groups, file_schema) =
+            prepare_store(&session_ctx).await;
 
         let exec = NdJsonExec::new(FileScanConfig {
-            object_store,
             object_store_url,
             file_groups,
             file_schema,
@@ -289,8 +288,8 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         use arrow::datatypes::DataType;
-        let (object_store, object_store_url, file_groups, actual_schema) =
-            prepare_store().await;
+        let (object_store_url, file_groups, actual_schema) =
+            prepare_store(&session_ctx).await;
 
         let mut fields = actual_schema.fields().clone();
         fields.push(Field::new("missing_col", DataType::Int32, true));
@@ -299,7 +298,6 @@ mod tests {
         let file_schema = Arc::new(Schema::new(fields));
 
         let exec = NdJsonExec::new(FileScanConfig {
-            object_store,
             object_store_url,
             file_groups,
             file_schema,
@@ -330,11 +328,10 @@ mod tests {
     async fn nd_json_exec_file_projection() -> Result<()> {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
-        let (object_store, object_store_url, file_groups, file_schema) =
-            prepare_store().await;
+        let (object_store_url, file_groups, file_schema) =
+            prepare_store(&session_ctx).await;
 
         let exec = NdJsonExec::new(FileScanConfig {
-            object_store,
             object_store_url,
             file_groups,
             file_schema,
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 0fed497d3..86015c472 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -45,7 +45,6 @@ use crate::{
 };
 use arrow::array::{new_null_array, UInt16BufferBuilder};
 use arrow::record_batch::RecordBatchOptions;
-use datafusion_data_access::object_store::ObjectStore;
 use lazy_static::lazy_static;
 use log::info;
 use std::{
@@ -66,8 +65,6 @@ lazy_static! {
 /// any given file format.
 #[derive(Debug, Clone)]
 pub struct FileScanConfig {
-    /// Store from which the `files` should be fetched
-    pub object_store: Arc<dyn ObjectStore>,
     /// Object store URL
     pub object_store_url: ObjectStoreUrl,
     /// Schema before projection. It contains the columns that are expected
@@ -402,7 +399,7 @@ fn create_dict_array(
 #[cfg(test)]
 mod tests {
     use crate::{
-        test::{build_table_i32, columns, object_store::TestObjectStore},
+        test::{build_table_i32, columns},
         test_util::aggr_test_schema,
     };
 
@@ -659,7 +656,6 @@ mod tests {
             file_schema,
             file_groups: vec![vec![]],
             limit: None,
-            object_store: TestObjectStore::new_arc(&[]),
             object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
             projection,
             statistics,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 81c4dd427..b1e629be5 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -215,11 +215,15 @@ impl ExecutionPlan for ParquetExec {
             &self.base_config.table_partition_cols,
         );
 
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.base_config.object_store_url)?;
+
         let stream = ParquetExecStream {
             error: false,
             partition_index,
             metrics: self.metrics.clone(),
-            object_store: self.base_config.object_store.clone(),
+            object_store,
             pruning_predicate: self.pruning_predicate.clone(),
             batch_size: context.session_config().batch_size,
             schema: self.projected_schema.clone(),
@@ -686,7 +690,6 @@ mod tests {
         // prepare the scan
         let parquet_exec = ParquetExec::new(
             FileScanConfig {
-                object_store: Arc::new(LocalFileSystem {}),
                 object_store_url: ObjectStoreUrl::local_filesystem(),
                 file_groups: vec![file_groups],
                 file_schema,
@@ -1073,7 +1076,6 @@ mod tests {
         ) -> Result<()> {
             let parquet_exec = ParquetExec::new(
                 FileScanConfig {
-                    object_store: Arc::new(LocalFileSystem {}),
                     object_store_url: ObjectStoreUrl::local_filesystem(),
                     file_groups,
                     file_schema,
@@ -1139,9 +1141,15 @@ mod tests {
     async fn parquet_exec_with_partition() -> Result<()> {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
+
+        let object_store_url = ObjectStoreUrl::local_filesystem();
+        let store = session_ctx
+            .runtime_env()
+            .object_store(&object_store_url)
+            .unwrap();
+
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/alltypes_plain.parquet", testdata);
-        let store = Arc::new(LocalFileSystem {}) as _;
 
         let meta = local_unpartitioned_file(filename);
 
@@ -1162,8 +1170,7 @@ mod tests {
 
         let parquet_exec = ParquetExec::new(
             FileScanConfig {
-                object_store: store,
-                object_store_url: ObjectStoreUrl::local_filesystem(),
+                object_store_url,
                 file_groups: vec![vec![partitioned_file]],
                 file_schema: schema,
                 statistics: Statistics::default(),
@@ -1222,7 +1229,6 @@ mod tests {
 
         let parquet_exec = ParquetExec::new(
             FileScanConfig {
-                object_store: Arc::new(LocalFileSystem {}),
                 object_store_url: ObjectStoreUrl::local_filesystem(),
                 file_groups: vec![vec![partitioned_file]],
                 file_schema: Arc::new(Schema::empty()),
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 19a9db9a4..dd00a5028 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -29,9 +29,7 @@ use array::{Array, ArrayRef};
 use arrow::array::{self, DecimalBuilder, Int32Array};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
-use datafusion_data_access::object_store::local::{
-    local_unpartitioned_file, LocalFileSystem,
-};
+use datafusion_data_access::object_store::local::local_unpartitioned_file;
 use futures::{Future, FutureExt};
 use std::fs::File;
 use std::io::prelude::*;
@@ -120,7 +118,6 @@ pub fn partitioned_csv_config(
     };
 
     Ok(FileScanConfig {
-        object_store: Arc::new(LocalFileSystem {}),
         object_store_url: ObjectStoreUrl::local_filesystem(),
         file_schema: schema,
         file_groups,
diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs
index 95a6f16ed..fdd053346 100644
--- a/datafusion/core/src/test/object_store.rs
+++ b/datafusion/core/src/test/object_store.rs
@@ -26,9 +26,16 @@ use crate::datafusion_data_access::{
     object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore},
     FileMeta, Result, SizedFile,
 };
+use crate::prelude::SessionContext;
 use async_trait::async_trait;
 use futures::{stream, AsyncRead, StreamExt};
 
+/// Returns a test object store with the provided `ctx`
+pub(crate) fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
+    ctx.runtime_env()
+        .register_object_store("test", TestObjectStore::new_arc(files));
+}
+
 #[derive(Debug)]
 /// An object store implem that is useful for testing.
 /// `ObjectReader`s are filled with zero bytes.
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index a7e6ea452..e3da9d986 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -54,7 +54,7 @@ async fn parquet_distinct_partition_col() -> Result<()> {
             "year=2021/month=10/day=28/file.parquet",
         ],
         &["year", "month", "day"],
-        "",
+        "mirror:///",
         "alltypes_plain.parquet",
     )
     .await;
@@ -183,7 +183,7 @@ async fn csv_filter_with_file_col() -> Result<()> {
             "mytable/date=2021-10-28/file.csv",
         ],
         &["date"],
-        "file:///mytable",
+        "mirror:///mytable",
     );
 
     let result = ctx
@@ -219,7 +219,7 @@ async fn csv_projection_on_partition() -> Result<()> {
             "mytable/date=2021-10-28/file.csv",
         ],
         &["date"],
-        "file:///mytable",
+        "mirror:///mytable",
     );
 
     let result = ctx
@@ -256,7 +256,7 @@ async fn csv_grouping_by_partition() -> Result<()> {
             "mytable/date=2021-10-28/file.csv",
         ],
         &["date"],
-        "file:///mytable",
+        "mirror:///mytable",
     );
 
     let result = ctx
@@ -290,7 +290,7 @@ async fn parquet_multiple_partitions() -> Result<()> {
             "year=2021/month=10/day=28/file.parquet",
         ],
         &["year", "month", "day"],
-        "",
+        "mirror:///",
         "alltypes_plain.parquet",
     )
     .await;
@@ -332,7 +332,7 @@ async fn parquet_statistics() -> Result<()> {
             "year=2021/month=10/day=28/file.parquet",
         ],
         &["year", "month", "day"],
-        "",
+        "mirror:///",
         // This is the only file we found in the test set with
         // actual stats. It has 1 column / 1 row.
         "single_nan.parquet",
@@ -392,7 +392,7 @@ async fn parquet_overlapping_columns() -> Result<()> {
             "id=3/file.parquet",
         ],
         &["id"],
-        "",
+        "mirror:///",
         "alltypes_plain.parquet",
     )
     .await;
@@ -415,13 +415,16 @@ fn register_partitioned_aggregate_csv(
     let testdata = arrow_test_data();
     let csv_file_path = format!("{}/csv/aggregate_test_100.csv", testdata);
     let file_schema = test_util::aggr_test_schema();
-    let object_store = MirroringObjectStore::new_arc(csv_file_path, store_paths);
+    ctx.runtime_env().register_object_store(
+        "mirror",
+        MirroringObjectStore::new_arc(csv_file_path, store_paths),
+    );
 
     let mut options = ListingOptions::new(Arc::new(CsvFormat::default()));
     options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect();
 
     let table_path = ListingTableUrl::parse(table_path).unwrap();
-    let config = ListingTableConfig::new(object_store, table_path)
+    let config = ListingTableConfig::new(table_path)
         .with_listing_options(options)
         .with_schema(file_schema);
     let table = ListingTable::try_new(config).unwrap();
@@ -439,23 +442,25 @@ async fn register_partitioned_alltypes_parquet(
 ) {
     let testdata = parquet_test_data();
     let parquet_file_path = format!("{}/{}", testdata, source_file);
-    let object_store =
-        MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths);
+    ctx.runtime_env().register_object_store(
+        "mirror",
+        MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths),
+    );
 
     let mut options = ListingOptions::new(Arc::new(ParquetFormat::default()));
     options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect();
     options.collect_stat = true;
 
-    let table_path = ListingTableUrl::parse(format!("mirror:///{}", table_path)).unwrap();
+    let table_path = ListingTableUrl::parse(table_path).unwrap();
     let store_path =
         ListingTableUrl::parse(format!("mirror:///{}", store_paths[0])).unwrap();
 
     let file_schema = options
-        .infer_schema(Arc::clone(&object_store), &store_path)
+        .infer_schema(&ctx.state(), &store_path)
         .await
         .expect("Parquet schema inference failed");
 
-    let config = ListingTableConfig::new(object_store, table_path)
+    let config = ListingTableConfig::new(table_path)
         .with_listing_options(options)
         .with_schema(file_schema);
 
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 947ebc699..1de6af06a 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -97,7 +97,6 @@ async fn get_exec(
     let exec = format
         .create_physical_plan(
             FileScanConfig {
-                object_store,
                 object_store_url,
                 file_schema,
                 file_groups,
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index 4eff00e01..c6ab11458 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -420,15 +420,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     target_partitions: scan.target_partitions as usize,
                 };
 
-                let object_store = ctx.runtime_env().object_store(&table_path)?;
-
-                println!(
-                    "Found object store {:?} for path {}",
-                    object_store,
-                    scan.path.as_str()
-                );
-
-                let config = ListingTableConfig::new(object_store, table_path)
+                let config = ListingTableConfig::new(table_path)
                     .with_listing_options(options)
                     .with_schema(Arc::new(schema));
 
diff --git a/dev/build-arrow-ballista.sh b/dev/build-arrow-ballista.sh
index 1d287c460..82c0be109 100755
--- a/dev/build-arrow-ballista.sh
+++ b/dev/build-arrow-ballista.sh
@@ -24,7 +24,7 @@ rm -rf arrow-ballista 2>/dev/null
 
 # clone the repo
 # TODO make repo/branch configurable
-git clone https://github.com/tustvold/arrow-ballista -b session-state-table-provider
+git clone https://github.com/tustvold/arrow-ballista -b remove-object-store-plans
 
 # update dependencies to local crates
 python ./dev/make-ballista-deps-local.py