You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/05/24 16:25:33 UTC

[arrow-datafusion] branch master updated: Decouple FileFormat from datafusion_data_access (#2572)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ea7dc603 Decouple FileFormat from datafusion_data_access (#2572)
9ea7dc603 is described below

commit 9ea7dc6036a7b1d28c7450db4f26720b732a50de
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue May 24 17:25:26 2022 +0100

    Decouple FileFormat from datafusion_data_access (#2572)
    
    * Decouple FileFormat from datafusion_data_access
    
    * Review feedback
    
    * Update ballista pin
---
 datafusion/core/src/datasource/file_format/avro.rs |  85 ++++------
 datafusion/core/src/datasource/file_format/csv.rs  |  68 +++-----
 datafusion/core/src/datasource/file_format/json.rs |  65 +++----
 datafusion/core/src/datasource/file_format/mod.rs  |  61 ++++++-
 .../core/src/datasource/file_format/parquet.rs     | 186 +++++++++------------
 datafusion/core/src/datasource/listing/mod.rs      |  15 +-
 datafusion/core/src/datasource/listing/table.rs    |  22 +--
 .../core/src/physical_plan/file_format/avro.rs     |  40 ++---
 .../core/src/physical_plan/file_format/json.rs     |  51 +++---
 .../core/src/physical_plan/file_format/parquet.rs  | 140 ++++++----------
 datafusion/core/src/test/mod.rs                    |  12 +-
 datafusion/core/tests/row.rs                       |  16 +-
 dev/build-arrow-ballista.sh                        |   3 +-
 13 files changed, 348 insertions(+), 416 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs
index 882066206..63781da52 100644
--- a/datafusion/core/src/datasource/file_format/avro.rs
+++ b/datafusion/core/src/datasource/file_format/avro.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
 use arrow::datatypes::Schema;
 use arrow::{self, datatypes::SchemaRef};
 use async_trait::async_trait;
-use futures::StreamExt;
+use datafusion_data_access::FileMeta;
 
 use super::FileFormat;
 use crate::avro_to_arrow::read_avro_schema_from_reader;
@@ -32,7 +32,7 @@ use crate::logical_plan::Expr;
 use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::Statistics;
-use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
+use datafusion_data_access::object_store::ObjectStore;
 
 /// The default file extension of avro files
 pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
@@ -46,10 +46,14 @@ impl FileFormat for AvroFormat {
         self
     }
 
-    async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
+    async fn infer_schema(
+        &self,
+        store: &Arc<dyn ObjectStore>,
+        files: &[FileMeta],
+    ) -> Result<SchemaRef> {
         let mut schemas = vec![];
-        while let Some(obj_reader) = readers.next().await {
-            let mut reader = obj_reader?.sync_reader()?;
+        for file in files {
+            let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
             let schema = read_avro_schema_from_reader(&mut reader)?;
             schemas.push(schema);
         }
@@ -59,8 +63,9 @@ impl FileFormat for AvroFormat {
 
     async fn infer_stats(
         &self,
-        _reader: Arc<dyn ObjectReader>,
+        _store: &Arc<dyn ObjectStore>,
         _table_schema: SchemaRef,
+        _file: &FileMeta,
     ) -> Result<Statistics> {
         Ok(Statistics::default())
     }
@@ -78,15 +83,9 @@ impl FileFormat for AvroFormat {
 #[cfg(test)]
 #[cfg(feature = "avro")]
 mod tests {
-    use crate::{
-        datafusion_data_access::object_store::local::{
-            local_object_reader, local_object_reader_stream, LocalFileSystem,
-        },
-        physical_plan::collect,
-    };
-
     use super::*;
-    use crate::datasource::listing::local_unpartitioned_file;
+    use crate::datasource::file_format::test_util::scan_format;
+    use crate::physical_plan::collect;
     use crate::prelude::{SessionConfig, SessionContext};
     use arrow::array::{
         BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
@@ -100,7 +99,7 @@ mod tests {
         let ctx = SessionContext::with_config(config);
         let task_ctx = ctx.task_ctx();
         let projection = None;
-        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.avro", projection, None).await?;
         let stream = exec.execute(0, task_ctx)?;
 
         let tt_batches = stream
@@ -122,7 +121,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = None;
-        let exec = get_exec("alltypes_plain.avro", &projection, Some(1)).await?;
+        let exec = get_exec("alltypes_plain.avro", projection, Some(1)).await?;
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
         assert_eq!(11, batches[0].num_columns());
@@ -136,7 +135,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = None;
-        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.avro", projection, None).await?;
 
         let x: Vec<String> = exec
             .schema()
@@ -188,7 +187,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![1]);
-        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.avro", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(batches.len(), 1);
@@ -218,7 +217,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![0]);
-        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.avro", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(batches.len(), 1);
@@ -245,7 +244,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![10]);
-        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.avro", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(batches.len(), 1);
@@ -272,7 +271,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![6]);
-        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.avro", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(batches.len(), 1);
@@ -302,7 +301,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![7]);
-        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.avro", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(batches.len(), 1);
@@ -332,7 +331,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![9]);
-        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.avro", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(batches.len(), 1);
@@ -359,36 +358,13 @@ mod tests {
 
     async fn get_exec(
         file_name: &str,
-        projection: &Option<Vec<usize>>,
+        projection: Option<Vec<usize>>,
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let testdata = crate::test_util::arrow_test_data();
-        let filename = format!("{}/avro/{}", testdata, file_name);
+        let store_root = format!("{}/avro", testdata);
         let format = AvroFormat {};
-        let file_schema = format
-            .infer_schema(local_object_reader_stream(vec![filename.clone()]))
-            .await
-            .expect("Schema inference");
-        let statistics = format
-            .infer_stats(local_object_reader(filename.clone()), file_schema.clone())
-            .await
-            .expect("Stats inference");
-        let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
-        let exec = format
-            .create_physical_plan(
-                FileScanConfig {
-                    object_store: Arc::new(LocalFileSystem {}),
-                    file_schema,
-                    file_groups,
-                    statistics,
-                    projection: projection.clone(),
-                    limit,
-                    table_partition_cols: vec![],
-                },
-                &[],
-            )
-            .await?;
-        Ok(exec)
+        scan_format(&format, &store_root, file_name, projection, limit).await
     }
 }
 
@@ -397,18 +373,17 @@ mod tests {
 mod tests {
     use super::*;
 
-    use crate::datafusion_data_access::object_store::local::local_object_reader_stream;
+    use super::super::test_util::scan_format;
     use crate::error::DataFusionError;
 
     #[tokio::test]
     async fn test() -> Result<()> {
+        let format = AvroFormat {};
         let testdata = crate::test_util::arrow_test_data();
-        let filename = format!("{}/avro/alltypes_plain.avro", testdata);
-        let schema_result = AvroFormat {}
-            .infer_schema(local_object_reader_stream(vec![filename]))
-            .await;
+        let filename = "avro/alltypes_plain.avro";
+        let result = scan_format(&format, &testdata, filename, None, None).await;
         assert!(matches!(
-            schema_result,
+            result,
             Err(DataFusionError::NotImplemented(msg))
             if msg == *"cannot read avro schema without the 'avro' feature enabled"
         ));
diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs
index cbc2adca1..0d665e9ef 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
 use arrow::datatypes::Schema;
 use arrow::{self, datatypes::SchemaRef};
 use async_trait::async_trait;
-use futures::StreamExt;
+use datafusion_data_access::FileMeta;
 
 use super::FileFormat;
 use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
@@ -32,7 +32,7 @@ use crate::logical_plan::Expr;
 use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::Statistics;
-use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
+use datafusion_data_access::object_store::ObjectStore;
 
 /// The default file extension of csv files
 pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
@@ -93,13 +93,17 @@ impl FileFormat for CsvFormat {
         self
     }
 
-    async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
+    async fn infer_schema(
+        &self,
+        store: &Arc<dyn ObjectStore>,
+        files: &[FileMeta],
+    ) -> Result<SchemaRef> {
         let mut schemas = vec![];
 
-        let mut records_to_read = self.schema_infer_max_rec.unwrap_or(std::usize::MAX);
+        let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
 
-        while let Some(obj_reader) = readers.next().await {
-            let mut reader = obj_reader?.sync_reader()?;
+        for file in files {
+            let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
             let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
                 &mut reader,
                 self.delimiter,
@@ -122,8 +126,9 @@ impl FileFormat for CsvFormat {
 
     async fn infer_stats(
         &self,
-        _reader: Arc<dyn ObjectReader>,
+        _store: &Arc<dyn ObjectStore>,
         _table_schema: SchemaRef,
+        _file: &FileMeta,
     ) -> Result<Statistics> {
         Ok(Statistics::default())
     }
@@ -142,16 +147,11 @@ impl FileFormat for CsvFormat {
 mod tests {
     use arrow::array::StringArray;
 
+    use super::super::test_util::scan_format;
     use super::*;
-    use crate::datasource::listing::local_unpartitioned_file;
+    use crate::physical_plan::collect;
     use crate::prelude::{SessionConfig, SessionContext};
-    use crate::{
-        datafusion_data_access::object_store::local::{
-            local_object_reader, local_object_reader_stream, LocalFileSystem,
-        },
-        datasource::file_format::FileScanConfig,
-        physical_plan::collect,
-    };
+    use futures::StreamExt;
 
     #[tokio::test]
     async fn read_small_batches() -> Result<()> {
@@ -159,7 +159,7 @@ mod tests {
         let ctx = SessionContext::with_config(config);
         // skip column 9 that overflows the automaticly discovered column type of i64 (u64 would work)
         let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]);
-        let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
+        let exec = get_exec("aggregate_test_100.csv", projection, None).await?;
         let task_ctx = ctx.task_ctx();
         let stream = exec.execute(0, task_ctx)?;
 
@@ -186,7 +186,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![0, 1, 2, 3]);
-        let exec = get_exec("aggregate_test_100.csv", &projection, Some(1)).await?;
+        let exec = get_exec("aggregate_test_100.csv", projection, Some(1)).await?;
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
         assert_eq!(4, batches[0].num_columns());
@@ -198,7 +198,7 @@ mod tests {
     #[tokio::test]
     async fn infer_schema() -> Result<()> {
         let projection = None;
-        let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
+        let exec = get_exec("aggregate_test_100.csv", projection, None).await?;
 
         let x: Vec<String> = exec
             .schema()
@@ -233,7 +233,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![0]);
-        let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
+        let exec = get_exec("aggregate_test_100.csv", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await.expect("Collect batches");
 
@@ -258,35 +258,11 @@ mod tests {
 
     async fn get_exec(
         file_name: &str,
-        projection: &Option<Vec<usize>>,
+        projection: Option<Vec<usize>>,
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let testdata = crate::test_util::arrow_test_data();
-        let filename = format!("{}/csv/{}", testdata, file_name);
+        let root = format!("{}/csv", crate::test_util::arrow_test_data());
         let format = CsvFormat::default();
-        let file_schema = format
-            .infer_schema(local_object_reader_stream(vec![filename.clone()]))
-            .await
-            .expect("Schema inference");
-        let statistics = format
-            .infer_stats(local_object_reader(filename.clone()), file_schema.clone())
-            .await
-            .expect("Stats inference");
-        let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
-        let exec = format
-            .create_physical_plan(
-                FileScanConfig {
-                    object_store: Arc::new(LocalFileSystem),
-                    file_schema,
-                    file_groups,
-                    statistics,
-                    projection: projection.clone(),
-                    limit,
-                    table_partition_cols: vec![],
-                },
-                &[],
-            )
-            .await?;
-        Ok(exec)
+        scan_format(&format, &root, file_name, projection, limit).await
     }
 }
diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs
index cd4fd5810..e9b49c42d 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef;
 use arrow::json::reader::infer_json_schema_from_iterator;
 use arrow::json::reader::ValueIter;
 use async_trait::async_trait;
-use futures::StreamExt;
+use datafusion_data_access::{object_store::ObjectStore, FileMeta};
 
 use super::FileFormat;
 use super::FileScanConfig;
@@ -36,7 +36,6 @@ use crate::logical_plan::Expr;
 use crate::physical_plan::file_format::NdJsonExec;
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::Statistics;
-use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
 
 /// The default file extension of json files
 pub const DEFAULT_JSON_EXTENSION: &str = ".json";
@@ -69,11 +68,16 @@ impl FileFormat for JsonFormat {
         self
     }
 
-    async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
+    async fn infer_schema(
+        &self,
+        store: &Arc<dyn ObjectStore>,
+        files: &[FileMeta],
+    ) -> Result<SchemaRef> {
         let mut schemas = Vec::new();
         let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
-        while let Some(obj_reader) = readers.next().await {
-            let mut reader = BufReader::new(obj_reader?.sync_reader()?);
+        for file in files {
+            let reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
+            let mut reader = BufReader::new(reader);
             let iter = ValueIter::new(&mut reader, None);
             let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
                 let should_take = records_to_read > 0;
@@ -94,8 +98,9 @@ impl FileFormat for JsonFormat {
 
     async fn infer_stats(
         &self,
-        _reader: Arc<dyn ObjectReader>,
+        _store: &Arc<dyn ObjectStore>,
         _table_schema: SchemaRef,
+        _file: &FileMeta,
     ) -> Result<Statistics> {
         Ok(Statistics::default())
     }
@@ -112,15 +117,16 @@ impl FileFormat for JsonFormat {
 
 #[cfg(test)]
 mod tests {
+    use super::super::test_util::scan_format;
     use arrow::array::Int64Array;
+    use futures::StreamExt;
 
     use super::*;
     use crate::prelude::{SessionConfig, SessionContext};
     use crate::{
         datafusion_data_access::object_store::local::{
-            local_object_reader, local_object_reader_stream, LocalFileSystem,
+            local_unpartitioned_file, LocalFileSystem,
         },
-        datasource::{file_format::FileScanConfig, listing::local_unpartitioned_file},
         physical_plan::collect,
     };
 
@@ -129,7 +135,7 @@ mod tests {
         let config = SessionConfig::new().with_batch_size(2);
         let ctx = SessionContext::with_config(config);
         let projection = None;
-        let exec = get_exec(&projection, None).await?;
+        let exec = get_exec(projection, None).await?;
         let task_ctx = ctx.task_ctx();
         let stream = exec.execute(0, task_ctx)?;
 
@@ -156,7 +162,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = None;
-        let exec = get_exec(&projection, Some(1)).await?;
+        let exec = get_exec(projection, Some(1)).await?;
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
         assert_eq!(4, batches[0].num_columns());
@@ -168,7 +174,7 @@ mod tests {
     #[tokio::test]
     async fn infer_schema() -> Result<()> {
         let projection = None;
-        let exec = get_exec(&projection, None).await?;
+        let exec = get_exec(projection, None).await?;
 
         let x: Vec<String> = exec
             .schema()
@@ -186,7 +192,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![0]);
-        let exec = get_exec(&projection, None).await?;
+        let exec = get_exec(projection, None).await?;
 
         let batches = collect(exec, task_ctx).await.expect("Collect batches");
 
@@ -213,48 +219,25 @@ mod tests {
     }
 
     async fn get_exec(
-        projection: &Option<Vec<usize>>,
+        projection: Option<Vec<usize>>,
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let filename = "tests/jsons/2.json";
         let format = JsonFormat::default();
-        let file_schema = format
-            .infer_schema(local_object_reader_stream(vec![filename.to_owned()]))
-            .await
-            .expect("Schema inference");
-        let statistics = format
-            .infer_stats(
-                local_object_reader(filename.to_owned()),
-                file_schema.clone(),
-            )
-            .await
-            .expect("Stats inference");
-        let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
-        let exec = format
-            .create_physical_plan(
-                FileScanConfig {
-                    object_store: Arc::new(LocalFileSystem {}),
-                    file_schema,
-                    file_groups,
-                    statistics,
-                    projection: projection.clone(),
-                    limit,
-                    table_partition_cols: vec![],
-                },
-                &[],
-            )
-            .await?;
-        Ok(exec)
+        scan_format(&format, ".", filename, projection, limit).await
     }
 
     #[tokio::test]
     async fn infer_schema_with_limit() {
+        let store = Arc::new(LocalFileSystem {}) as _;
         let filename = "tests/jsons/schema_infer_limit.json";
         let format = JsonFormat::default().with_schema_infer_max_rec(Some(3));
+
         let file_schema = format
-            .infer_schema(local_object_reader_stream(vec![filename.to_owned()]))
+            .infer_schema(&store, &[local_unpartitioned_file(filename.to_string())])
             .await
             .expect("Schema inference");
+
         let fields = file_schema
             .fields()
             .iter()
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index bb37b7557..669ed0efd 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -36,8 +36,8 @@ use crate::physical_plan::file_format::FileScanConfig;
 use crate::physical_plan::{ExecutionPlan, Statistics};
 
 use async_trait::async_trait;
-
-use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
 
 /// This trait abstracts all the file format specific implementations
 /// from the `TableProvider`. This helps code re-utilization across
@@ -52,7 +52,11 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
     /// be analysed up to a given number of records or files (as specified in the
     /// format config) then give the estimated common schema. This might fail if
     /// the files have schemas that cannot be merged.
-    async fn infer_schema(&self, readers: ObjectReaderStream) -> Result<SchemaRef>;
+    async fn infer_schema(
+        &self,
+        store: &Arc<dyn ObjectStore>,
+        files: &[FileMeta],
+    ) -> Result<SchemaRef>;
 
     /// Infer the statistics for the provided object. The cost and accuracy of the
     /// estimated statistics might vary greatly between file formats.
@@ -63,8 +67,9 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
     /// TODO: should the file source return statistics for only columns referred to in the table schema?
     async fn infer_stats(
         &self,
-        reader: Arc<dyn ObjectReader>,
+        store: &Arc<dyn ObjectStore>,
         table_schema: SchemaRef,
+        file: &FileMeta,
     ) -> Result<Statistics>;
 
     /// Take a list of files and convert it to the appropriate executor
@@ -75,3 +80,51 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
         filters: &[Expr],
     ) -> Result<Arc<dyn ExecutionPlan>>;
 }
+
+#[cfg(test)]
+pub(crate) mod test_util {
+    use super::*;
+    use crate::datasource::listing::PartitionedFile;
+    use datafusion_data_access::object_store::local::{
+        local_unpartitioned_file, LocalFileSystem,
+    };
+
+    pub async fn scan_format(
+        format: &dyn FileFormat,
+        store_root: &str,
+        file_name: &str,
+        projection: Option<Vec<usize>>,
+        limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let store = Arc::new(LocalFileSystem {}) as _;
+        let meta = local_unpartitioned_file(format!("{}/{}", store_root, file_name));
+
+        let file_schema = format.infer_schema(&store, &[meta.clone()]).await?;
+
+        let statistics = format
+            .infer_stats(&store, file_schema.clone(), &meta)
+            .await?;
+
+        let file_groups = vec![vec![PartitionedFile {
+            file_meta: meta,
+            partition_values: vec![],
+            range: None,
+        }]];
+
+        let exec = format
+            .create_physical_plan(
+                FileScanConfig {
+                    object_store: store,
+                    file_schema,
+                    file_groups,
+                    statistics,
+                    projection,
+                    limit,
+                    table_partition_cols: vec![],
+                },
+                &[],
+            )
+            .await?;
+        Ok(exec)
+    }
+}
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index a9a0c788b..42bff573c 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -24,7 +24,7 @@ use std::sync::Arc;
 use arrow::datatypes::Schema;
 use arrow::datatypes::SchemaRef;
 use async_trait::async_trait;
-use futures::TryStreamExt;
+use datafusion_data_access::FileMeta;
 use hashbrown::HashMap;
 use parquet::arrow::ArrowReader;
 use parquet::arrow::ParquetFileArrowReader;
@@ -50,9 +50,9 @@ use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
 use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
 use crate::physical_plan::{metrics, ExecutionPlan};
 use crate::physical_plan::{Accumulator, Statistics};
-use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
+use datafusion_data_access::object_store::{ObjectReader, ObjectStore};
 
-/// The default file exetension of parquet files
+/// The default file extension of parquet files
 pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
 
 /// The Apache Parquet `FileFormat` implementation
@@ -88,24 +88,27 @@ impl FileFormat for ParquetFormat {
         self
     }
 
-    async fn infer_schema(&self, readers: ObjectReaderStream) -> Result<SchemaRef> {
-        let merged_schema = readers
-            .map_err(DataFusionError::IoError)
-            .try_fold(Schema::empty(), |acc, reader| async {
-                let next_schema = fetch_schema(reader);
-                Schema::try_merge([acc, next_schema?])
-                    .map_err(DataFusionError::ArrowError)
-            })
-            .await?;
-        Ok(Arc::new(merged_schema))
+    async fn infer_schema(
+        &self,
+        store: &Arc<dyn ObjectStore>,
+        files: &[FileMeta],
+    ) -> Result<SchemaRef> {
+        let mut schemas = Vec::with_capacity(files.len());
+        for file in files {
+            let schema = fetch_schema(store.as_ref(), file)?;
+            schemas.push(schema)
+        }
+        let schema = Schema::try_merge(schemas)?;
+        Ok(Arc::new(schema))
     }
 
     async fn infer_stats(
         &self,
-        reader: Arc<dyn ObjectReader>,
+        store: &Arc<dyn ObjectStore>,
         table_schema: SchemaRef,
+        file: &FileMeta,
     ) -> Result<Statistics> {
-        let stats = fetch_statistics(reader, table_schema)?;
+        let stats = fetch_statistics(store.as_ref(), table_schema, file)?;
         Ok(stats)
     }
 
@@ -274,7 +277,8 @@ fn summarize_min_max(
 }
 
 /// Read and parse the schema of the Parquet file at location `path`
-fn fetch_schema(object_reader: Arc<dyn ObjectReader>) -> Result<Schema> {
+fn fetch_schema(store: &dyn ObjectStore, file: &FileMeta) -> Result<Schema> {
+    let object_reader = store.file_reader(file.sized_file.clone())?;
     let obj_reader = ChunkObjectReader {
         object_reader,
         bytes_scanned: None,
@@ -288,9 +292,11 @@ fn fetch_schema(object_reader: Arc<dyn ObjectReader>) -> Result<Schema> {
 
 /// Read and parse the statistics of the Parquet file at location `path`
 fn fetch_statistics(
-    object_reader: Arc<dyn ObjectReader>,
+    store: &dyn ObjectStore,
     table_schema: SchemaRef,
+    file: &FileMeta,
 ) -> Result<Statistics> {
+    let object_reader = store.file_reader(file.sized_file.clone())?;
     let obj_reader = ChunkObjectReader {
         object_reader,
         bytes_scanned: None,
@@ -396,56 +402,17 @@ impl ChunkReader for ChunkObjectReader {
 }
 
 #[cfg(test)]
-mod tests {
-    use crate::datasource::listing::local_unpartitioned_file;
-    use crate::physical_plan::collect;
-    use datafusion_data_access::object_store::local::{
-        local_object_reader, local_object_reader_stream, LocalFileSystem,
-    };
-
+pub(crate) mod test_util {
     use super::*;
-
-    use crate::physical_plan::metrics::MetricValue;
-    use crate::prelude::{SessionConfig, SessionContext};
-    use arrow::array::{
-        ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
-        StringArray, TimestampNanosecondArray,
-    };
     use arrow::record_batch::RecordBatch;
-    use datafusion_common::ScalarValue;
-    use futures::StreamExt;
+    use datafusion_data_access::object_store::local::local_unpartitioned_file;
     use parquet::arrow::ArrowWriter;
     use parquet::file::properties::WriterProperties;
     use tempfile::NamedTempFile;
 
-    // Add a new column with the specified field name to the RecordBatch
-    fn add_to_batch(
-        batch: &RecordBatch,
-        field_name: &str,
-        array: ArrayRef,
-    ) -> RecordBatch {
-        let mut fields = batch.schema().fields().clone();
-        fields.push(Field::new(field_name, array.data_type().clone(), true));
-        let schema = Arc::new(Schema::new(fields));
-
-        let mut columns = batch.columns().to_vec();
-        columns.push(array);
-        RecordBatch::try_new(schema, columns).expect("error; creating record batch")
-    }
-
-    fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
-        columns.into_iter().fold(
-            RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
-            |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()),
-        )
-    }
-
-    async fn create_table(
+    pub async fn store_parquet(
         batches: Vec<RecordBatch>,
-    ) -> Result<(Vec<NamedTempFile>, Schema)> {
-        let merged_schema =
-            Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone()))?;
-
+    ) -> Result<(Vec<FileMeta>, Vec<NamedTempFile>)> {
         let files: Vec<_> = batches
             .into_iter()
             .map(|batch| {
@@ -464,8 +431,33 @@ mod tests {
             })
             .collect();
 
-        Ok((files, merged_schema))
+        let meta: Vec<_> = files
+            .iter()
+            .map(|f| local_unpartitioned_file(f.path().to_string_lossy().to_string()))
+            .collect();
+
+        Ok((meta, files))
     }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::super::test_util::scan_format;
+    use crate::physical_plan::collect;
+    use datafusion_data_access::object_store::local::LocalFileSystem;
+
+    use super::*;
+
+    use crate::datasource::file_format::parquet::test_util::store_parquet;
+    use crate::physical_plan::metrics::MetricValue;
+    use crate::prelude::{SessionConfig, SessionContext};
+    use arrow::array::{
+        ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
+        StringArray, TimestampNanosecondArray,
+    };
+    use arrow::record_batch::RecordBatch;
+    use datafusion_common::ScalarValue;
+    use futures::StreamExt;
 
     #[tokio::test]
     async fn read_merged_batches() -> Result<()> {
@@ -474,16 +466,16 @@ mod tests {
 
         let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
 
-        let batch1 = create_batch(vec![("c1", c1.clone())]);
+        let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap();
+        let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
 
-        let batch2 = create_batch(vec![("c2", c2)]);
+        let store = Arc::new(LocalFileSystem {}) as _;
+        let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
 
-        let (files, schema) = create_table(vec![batch1, batch2]).await?;
-        let table_schema = Arc::new(schema);
-
-        let reader = local_object_reader(files[0].path().to_string_lossy().to_string());
+        let format = ParquetFormat::default();
+        let schema = format.infer_schema(&store, &meta).await.unwrap();
 
-        let stats = fetch_statistics(reader, table_schema.clone())?;
+        let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0])?;
 
         assert_eq!(stats.num_rows, Some(3));
         let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
@@ -491,9 +483,7 @@ mod tests {
         assert_eq!(c1_stats.null_count, Some(1));
         assert_eq!(c2_stats.null_count, Some(3));
 
-        let reader = local_object_reader(files[1].path().to_string_lossy().to_string());
-
-        let stats = fetch_statistics(reader, table_schema)?;
+        let stats = fetch_statistics(store.as_ref(), schema, &meta[1])?;
         assert_eq!(stats.num_rows, Some(3));
         let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
         let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
@@ -510,7 +500,7 @@ mod tests {
         let config = SessionConfig::new().with_batch_size(2);
         let ctx = SessionContext::with_config(config);
         let projection = None;
-        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
         let task_ctx = ctx.task_ctx();
         let stream = exec.execute(0, task_ctx)?;
 
@@ -536,14 +526,14 @@ mod tests {
     async fn capture_bytes_scanned_metric() -> Result<()> {
         let config = SessionConfig::new().with_batch_size(2);
         let ctx = SessionContext::with_config(config);
-        let projection = None;
 
         // Read the full file
-        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+        let projection = None;
+        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
 
         // Read only one column. This should scan less data.
-        let exec_projected =
-            get_exec("alltypes_plain.parquet", &Some(vec![0]), None).await?;
+        let projection = Some(vec![0]);
+        let exec_projected = get_exec("alltypes_plain.parquet", projection, None).await?;
 
         let task_ctx = ctx.task_ctx();
 
@@ -561,7 +551,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = None;
-        let exec = get_exec("alltypes_plain.parquet", &projection, Some(1)).await?;
+        let exec = get_exec("alltypes_plain.parquet", projection, Some(1)).await?;
 
         // note: even if the limit is set, the executor rounds up to the batch size
         assert_eq!(exec.statistics().num_rows, Some(8));
@@ -580,7 +570,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = None;
-        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
 
         let x: Vec<String> = exec
             .schema()
@@ -618,7 +608,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![1]);
-        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
@@ -648,7 +638,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![0]);
-        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
@@ -675,7 +665,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![10]);
-        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
@@ -702,7 +692,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![6]);
-        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
@@ -732,7 +722,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![7]);
-        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
@@ -762,7 +752,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let projection = Some(vec![9]);
-        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+        let exec = get_exec("alltypes_plain.parquet", projection, None).await?;
 
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
@@ -800,35 +790,11 @@ mod tests {
 
     async fn get_exec(
         file_name: &str,
-        projection: &Option<Vec<usize>>,
+        projection: Option<Vec<usize>>,
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let testdata = crate::test_util::parquet_test_data();
-        let filename = format!("{}/{}", testdata, file_name);
         let format = ParquetFormat::default();
-        let file_schema = format
-            .infer_schema(local_object_reader_stream(vec![filename.clone()]))
-            .await
-            .expect("Schema inference");
-        let statistics = format
-            .infer_stats(local_object_reader(filename.clone()), file_schema.clone())
-            .await
-            .expect("Stats inference");
-        let file_groups = vec![vec![local_unpartitioned_file(filename.clone())]];
-        let exec = format
-            .create_physical_plan(
-                FileScanConfig {
-                    object_store: Arc::new(LocalFileSystem {}),
-                    file_schema,
-                    file_groups,
-                    statistics,
-                    projection: projection.clone(),
-                    limit,
-                    table_partition_cols: vec![],
-                },
-                &[],
-            )
-            .await?;
-        Ok(exec)
+        scan_format(&format, &testdata, file_name, projection, limit).await
     }
 }
diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs
index d7932b38f..0f0a7d20e 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -22,7 +22,7 @@ mod helpers;
 mod table;
 
 use datafusion_common::ScalarValue;
-use datafusion_data_access::{object_store::local, FileMeta, Result, SizedFile};
+use datafusion_data_access::{FileMeta, Result, SizedFile};
 use futures::Stream;
 use std::pin::Pin;
 
@@ -88,11 +88,12 @@ impl std::fmt::Display for PartitionedFile {
     }
 }
 
-/// Helper method to fetch the file size and date at given path and create a `FileMeta`
-pub fn local_unpartitioned_file(file: String) -> PartitionedFile {
-    PartitionedFile {
-        file_meta: local::local_unpartitioned_file(file),
-        partition_values: vec![],
-        range: None,
+impl From<FileMeta> for PartitionedFile {
+    fn from(file_meta: FileMeta) -> Self {
+        PartitionedFile {
+            file_meta,
+            partition_values: vec![],
+            range: None,
+        }
     }
 }
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index bde88b659..1dceb8b35 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -21,7 +21,7 @@ use std::{any::Any, sync::Arc};
 
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use async_trait::async_trait;
-use futures::StreamExt;
+use futures::{StreamExt, TryStreamExt};
 
 use crate::datasource::{
     file_format::{
@@ -212,15 +212,13 @@ impl ListingOptions {
     /// locally or ask a remote service to do it (e.g a scheduler).
     pub async fn infer_schema<'a>(
         &'a self,
-        object_store: Arc<dyn ObjectStore>,
+        store: Arc<dyn ObjectStore>,
         path: &'a str,
     ) -> Result<SchemaRef> {
-        let file_stream = object_store
-            .glob_file_with_suffix(path, &self.file_extension)
-            .await?
-            .map(move |file_meta| object_store.file_reader(file_meta?.sized_file));
-        let file_schema = self.format.infer_schema(Box::pin(file_stream)).await?;
-        Ok(file_schema)
+        let extension = &self.file_extension;
+        let list_stream = store.glob_file_with_suffix(path, extension).await?;
+        let files: Vec<_> = list_stream.try_collect().await?;
+        self.format.infer_schema(&store, &files).await
     }
 }
 
@@ -377,11 +375,13 @@ impl ListingTable {
             async move {
                 let part_file = part_file?;
                 let statistics = if self.options.collect_stat {
-                    let object_reader = object_store
-                        .file_reader(part_file.file_meta.sized_file.clone())?;
                     self.options
                         .format
-                        .infer_stats(object_reader, self.file_schema.clone())
+                        .infer_stats(
+                            &object_store,
+                            self.file_schema.clone(),
+                            &part_file.file_meta,
+                        )
                         .await?
                 } else {
                     Statistics::default()
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index a25ad60e1..fc56ce1d8 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -165,15 +165,13 @@ impl ExecutionPlan for AvroExec {
 #[cfg(test)]
 #[cfg(feature = "avro")]
 mod tests {
-    use crate::datasource::{
-        file_format::{avro::AvroFormat, FileFormat},
-        listing::local_unpartitioned_file,
-    };
+    use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
+    use crate::datasource::listing::PartitionedFile;
     use crate::prelude::SessionContext;
     use crate::scalar::ScalarValue;
     use arrow::datatypes::{DataType, Field, Schema};
     use datafusion_data_access::object_store::local::{
-        local_object_reader_stream, LocalFileSystem,
+        local_unpartitioned_file, LocalFileSystem,
     };
     use futures::StreamExt;
 
@@ -183,12 +181,15 @@ mod tests {
     async fn avro_exec_without_partition() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let filename = format!("{}/avro/alltypes_plain.avro", testdata);
+        let store = Arc::new(LocalFileSystem {}) as _;
+        let meta = local_unpartitioned_file(filename);
+
+        let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?;
+
         let avro_exec = AvroExec::new(FileScanConfig {
             object_store: Arc::new(LocalFileSystem {}),
-            file_groups: vec![vec![local_unpartitioned_file(filename.clone())]],
-            file_schema: AvroFormat {}
-                .infer_schema(local_object_reader_stream(vec![filename]))
-                .await?,
+            file_groups: vec![vec![meta.into()]],
+            file_schema,
             statistics: Statistics::default(),
             projection: Some(vec![0, 1, 2]),
             limit: None,
@@ -240,9 +241,9 @@ mod tests {
     async fn avro_exec_missing_column() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let filename = format!("{}/avro/alltypes_plain.avro", testdata);
-        let actual_schema = AvroFormat {}
-            .infer_schema(local_object_reader_stream(vec![filename.clone()]))
-            .await?;
+        let store = Arc::new(LocalFileSystem {}) as _;
+        let meta = local_unpartitioned_file(filename);
+        let actual_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?;
 
         let mut fields = actual_schema.fields().clone();
         fields.push(Field::new("missing_col", DataType::Int32, true));
@@ -252,8 +253,8 @@ mod tests {
         let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
 
         let avro_exec = AvroExec::new(FileScanConfig {
-            object_store: Arc::new(LocalFileSystem {}),
-            file_groups: vec![vec![local_unpartitioned_file(filename.clone())]],
+            object_store: store,
+            file_groups: vec![vec![meta.into()]],
             file_schema,
             statistics: Statistics::default(),
             projection,
@@ -306,18 +307,19 @@ mod tests {
     async fn avro_exec_with_partition() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let filename = format!("{}/avro/alltypes_plain.avro", testdata);
-        let mut partitioned_file = local_unpartitioned_file(filename.clone());
+        let store = Arc::new(LocalFileSystem {}) as _;
+        let meta = local_unpartitioned_file(filename);
+        let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?;
+
+        let mut partitioned_file = PartitionedFile::from(meta);
         partitioned_file.partition_values =
             vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];
-        let file_schema = AvroFormat {}
-            .infer_schema(local_object_reader_stream(vec![filename]))
-            .await?;
 
         let avro_exec = AvroExec::new(FileScanConfig {
             // select specific columns of the files as well as the partitioning
             // column which is supposed to be the last column in the table schema.
             projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
-            object_store: Arc::new(LocalFileSystem {}),
+            object_store: store,
             file_groups: vec![vec![partitioned_file]],
             file_schema,
             statistics: Statistics::default(),
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index 818496d13..5470f6d57 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -192,25 +192,30 @@ mod tests {
     use arrow::datatypes::{Field, Schema};
     use futures::StreamExt;
 
-    use crate::datafusion_data_access::object_store::local::{
-        local_object_reader_stream, LocalFileSystem,
-    };
-    use crate::datasource::{
-        file_format::{json::JsonFormat, FileFormat},
-        listing::local_unpartitioned_file,
-    };
+    use crate::datafusion_data_access::object_store::local::LocalFileSystem;
+    use crate::datasource::file_format::{json::JsonFormat, FileFormat};
+    use crate::datasource::listing::PartitionedFile;
     use crate::prelude::NdJsonReadOptions;
     use crate::prelude::*;
+    use datafusion_data_access::object_store::local::local_unpartitioned_file;
+    use datafusion_data_access::object_store::ObjectStore;
     use tempfile::TempDir;
 
     use super::*;
 
     const TEST_DATA_BASE: &str = "tests/jsons";
 
-    async fn infer_schema(path: String) -> Result<SchemaRef> {
-        JsonFormat::default()
-            .infer_schema(local_object_reader_stream(vec![path]))
+    async fn prepare_store(
+    ) -> (Arc<dyn ObjectStore>, Vec<Vec<PartitionedFile>>, SchemaRef) {
+        let store = Arc::new(LocalFileSystem {}) as _;
+        let path = format!("{}/1.json", TEST_DATA_BASE);
+        let meta = local_unpartitioned_file(path);
+        let schema = JsonFormat::default()
+            .infer_schema(&store, &[meta.clone()])
             .await
+            .unwrap();
+
+        (store, vec![vec![meta.into()]], schema)
     }
 
     #[tokio::test]
@@ -218,11 +223,12 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         use arrow::datatypes::DataType;
-        let path = format!("{}/1.json", TEST_DATA_BASE);
+        let (object_store, file_groups, file_schema) = prepare_store().await;
+
         let exec = NdJsonExec::new(FileScanConfig {
-            object_store: Arc::new(LocalFileSystem {}),
-            file_groups: vec![vec![local_unpartitioned_file(path.clone())]],
-            file_schema: infer_schema(path).await?,
+            object_store,
+            file_groups,
+            file_schema,
             statistics: Statistics::default(),
             projection: None,
             limit: Some(3),
@@ -274,9 +280,7 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         use arrow::datatypes::DataType;
-        let path = format!("{}/1.json", TEST_DATA_BASE);
-
-        let actual_schema = infer_schema(path.clone()).await?;
+        let (object_store, file_groups, actual_schema) = prepare_store().await;
 
         let mut fields = actual_schema.fields().clone();
         fields.push(Field::new("missing_col", DataType::Int32, true));
@@ -285,8 +289,8 @@ mod tests {
         let file_schema = Arc::new(Schema::new(fields));
 
         let exec = NdJsonExec::new(FileScanConfig {
-            object_store: Arc::new(LocalFileSystem {}),
-            file_groups: vec![vec![local_unpartitioned_file(path.clone())]],
+            object_store,
+            file_groups,
             file_schema,
             statistics: Statistics::default(),
             projection: None,
@@ -315,11 +319,12 @@ mod tests {
     async fn nd_json_exec_file_projection() -> Result<()> {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
-        let path = format!("{}/1.json", TEST_DATA_BASE);
+        let (object_store, file_groups, file_schema) = prepare_store().await;
+
         let exec = NdJsonExec::new(FileScanConfig {
-            object_store: Arc::new(LocalFileSystem {}),
-            file_groups: vec![vec![local_unpartitioned_file(path.clone())]],
-            file_schema: infer_schema(path).await?,
+            object_store,
+            file_groups,
+            file_schema,
             statistics: Statistics::default(),
             projection: Some(vec![0, 2]),
             limit: None,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index ab4ab3fbe..0931484ef 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -630,17 +630,15 @@ mod tests {
     use crate::{
         assert_batches_sorted_eq, assert_contains,
         datafusion_data_access::{
-            object_store::local::{local_object_reader_stream, LocalFileSystem},
-            FileMeta, SizedFile,
-        },
-        datasource::{
-            file_format::{parquet::ParquetFormat, FileFormat},
-            listing::local_unpartitioned_file,
+            object_store::local::LocalFileSystem, FileMeta, SizedFile,
         },
+        datasource::file_format::{parquet::ParquetFormat, FileFormat},
         physical_plan::collect,
     };
 
     use super::*;
+    use crate::datasource::file_format::parquet::test_util::store_parquet;
+    use crate::datasource::file_format::test_util::scan_format;
     use crate::datasource::listing::FileRange;
     use crate::execution::options::CsvReadOptions;
     use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
@@ -649,16 +647,12 @@ mod tests {
         array::{Int64Array, Int8Array, StringArray},
         datatypes::{DataType, Field},
     };
-    use datafusion_data_access::object_store::local;
+    use datafusion_data_access::object_store::local::local_unpartitioned_file;
     use datafusion_expr::{col, lit};
     use futures::StreamExt;
     use parquet::{
-        arrow::ArrowWriter,
         basic::Type as PhysicalType,
-        file::{
-            metadata::RowGroupMetaData, properties::WriterProperties,
-            statistics::Statistics as ParquetStatistics,
-        },
+        file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
         schema::types::SchemaDescPtr,
     };
     use std::fs::File;
@@ -673,45 +667,16 @@ mod tests {
         schema: Option<SchemaRef>,
         predicate: Option<Expr>,
     ) -> Result<Vec<RecordBatch>> {
-        // When vec is dropped, temp files are deleted
-        let files: Vec<_> = batches
-            .into_iter()
-            .map(|batch| {
-                let output = tempfile::NamedTempFile::new().expect("creating temp file");
-
-                let props = WriterProperties::builder().build();
-                let file: std::fs::File = (*output.as_file())
-                    .try_clone()
-                    .expect("cloning file descriptor");
-                let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))
-                    .expect("creating writer");
-
-                writer.write(&batch).expect("Writing batch");
-                writer.close().unwrap();
-                output
-            })
-            .collect();
-
-        let file_names: Vec<_> = files
-            .iter()
-            .map(|t| t.path().to_string_lossy().to_string())
-            .collect();
-
-        // Now, read the files back in
-        let file_groups: Vec<_> = file_names
-            .iter()
-            .map(|name| local_unpartitioned_file(name.clone()))
-            .collect();
-
-        // Infer the schema (if not provided)
         let file_schema = match schema {
-            Some(provided_schema) => provided_schema,
-            None => ParquetFormat::default()
-                .infer_schema(local_object_reader_stream(file_names))
-                .await
-                .expect("inferring schema"),
+            Some(schema) => schema,
+            None => Arc::new(Schema::try_merge(
+                batches.iter().map(|b| b.schema().as_ref().clone()),
+            )?),
         };
 
+        let (meta, _files) = store_parquet(batches).await?;
+        let file_groups = meta.into_iter().map(Into::into).collect();
+
         // prepare the scan
         let parquet_exec = ParquetExec::new(
             FileScanConfig {
@@ -1050,26 +1015,16 @@ mod tests {
 
     #[tokio::test]
     async fn parquet_exec_with_projection() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
         let testdata = crate::test_util::parquet_test_data();
-        let filename = format!("{}/alltypes_plain.parquet", testdata);
-        let parquet_exec = ParquetExec::new(
-            FileScanConfig {
-                object_store: Arc::new(LocalFileSystem {}),
-                file_groups: vec![vec![local_unpartitioned_file(filename.clone())]],
-                file_schema: ParquetFormat::default()
-                    .infer_schema(local_object_reader_stream(vec![filename]))
-                    .await?,
-                statistics: Statistics::default(),
-                projection: Some(vec![0, 1, 2]),
-                limit: None,
-                table_partition_cols: vec![],
-            },
-            None,
-        );
+        let filename = "alltypes_plain.parquet";
+        let format = ParquetFormat::default();
+        let parquet_exec =
+            scan_format(&format, &testdata, filename, Some(vec![0, 1, 2]), None)
+                .await
+                .unwrap();
         assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
 
+        let task_ctx = SessionContext::new().task_ctx();
         let mut results = parquet_exec.execute(0, task_ctx)?;
         let batch = results.next().await.unwrap()?;
 
@@ -1095,9 +1050,9 @@ mod tests {
 
     #[tokio::test]
     async fn parquet_exec_with_range() -> Result<()> {
-        fn file_range(file: String, start: i64, end: i64) -> PartitionedFile {
+        fn file_range(meta: &FileMeta, start: i64, end: i64) -> PartitionedFile {
             PartitionedFile {
-                file_meta: local::local_unpartitioned_file(file),
+                file_meta: meta.clone(),
                 partition_values: vec![],
                 range: Some(FileRange { start, end }),
             }
@@ -1137,15 +1092,19 @@ mod tests {
         let session_ctx = SessionContext::new();
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/alltypes_plain.parquet", testdata);
+
+        let meta = local_unpartitioned_file(filename);
+
+        let store = Arc::new(LocalFileSystem {}) as _;
         let file_schema = ParquetFormat::default()
-            .infer_schema(local_object_reader_stream(vec![filename.clone()]))
+            .infer_schema(&store, &[meta.clone()])
             .await?;
 
-        let group_empty = vec![vec![file_range(filename.clone(), 0, 5)]];
-        let group_contain = vec![vec![file_range(filename.clone(), 5, i64::MAX)]];
+        let group_empty = vec![vec![file_range(&meta, 0, 5)]];
+        let group_contain = vec![vec![file_range(&meta, 5, i64::MAX)]];
         let group_all = vec![vec![
-            file_range(filename.clone(), 0, 5),
-            file_range(filename.clone(), 5, i64::MAX),
+            file_range(&meta, 0, 5),
+            file_range(&meta, 5, i64::MAX),
         ]];
 
         assert_parquet_read(
@@ -1174,19 +1133,30 @@ mod tests {
         let task_ctx = session_ctx.task_ctx();
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/alltypes_plain.parquet", testdata);
-        let mut partitioned_file = local_unpartitioned_file(filename.clone());
-        partitioned_file.partition_values = vec![
-            ScalarValue::Utf8(Some("2021".to_owned())),
-            ScalarValue::Utf8(Some("10".to_owned())),
-            ScalarValue::Utf8(Some("26".to_owned())),
-        ];
+        let store = Arc::new(LocalFileSystem {}) as _;
+
+        let meta = local_unpartitioned_file(filename);
+
+        let schema = ParquetFormat::default()
+            .infer_schema(&store, &[meta.clone()])
+            .await
+            .unwrap();
+
+        let partitioned_file = PartitionedFile {
+            file_meta: meta,
+            partition_values: vec![
+                ScalarValue::Utf8(Some("2021".to_owned())),
+                ScalarValue::Utf8(Some("10".to_owned())),
+                ScalarValue::Utf8(Some("26".to_owned())),
+            ],
+            range: None,
+        };
+
         let parquet_exec = ParquetExec::new(
             FileScanConfig {
-                object_store: Arc::new(LocalFileSystem {}),
+                object_store: store,
                 file_groups: vec![vec![partitioned_file]],
-                file_schema: ParquetFormat::default()
-                    .infer_schema(local_object_reader_stream(vec![filename]))
-                    .await?,
+                file_schema: schema,
                 statistics: Statistics::default(),
                 // file has 10 cols so index 12 should be month
                 projection: Some(vec![0, 1, 2, 12]),
@@ -1229,8 +1199,6 @@ mod tests {
     async fn parquet_exec_with_error() -> Result<()> {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
-        let testdata = crate::test_util::parquet_test_data();
-        let filename = format!("{}/alltypes_plain.parquet", testdata);
         let partitioned_file = PartitionedFile {
             file_meta: FileMeta {
                 sized_file: SizedFile {
@@ -1247,9 +1215,7 @@ mod tests {
             FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
                 file_groups: vec![vec![partitioned_file]],
-                file_schema: ParquetFormat::default()
-                    .infer_schema(local_object_reader_stream(vec![filename]))
-                    .await?,
+                file_schema: Arc::new(Schema::empty()),
                 statistics: Statistics::default(),
                 projection: None,
                 limit: None,
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 1798232b3..815379de4 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -18,7 +18,7 @@
 //! Common unit test utility methods
 
 use crate::arrow::array::UInt32Array;
-use crate::datasource::{listing::local_unpartitioned_file, MemTable, TableProvider};
+use crate::datasource::{MemTable, TableProvider};
 use crate::error::Result;
 use crate::from_slice::FromSlice;
 use crate::logical_plan::LogicalPlan;
@@ -28,7 +28,9 @@ use array::{Array, ArrayRef};
 use arrow::array::{self, DecimalBuilder, Int32Array};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
-use datafusion_data_access::object_store::local::LocalFileSystem;
+use datafusion_data_access::object_store::local::{
+    local_unpartitioned_file, LocalFileSystem,
+};
 use futures::{Future, FutureExt};
 use std::fs::File;
 use std::io::prelude::*;
@@ -108,10 +110,12 @@ pub fn partitioned_csv_config(
 
         files
             .into_iter()
-            .map(|f| vec![local_unpartitioned_file(f.to_str().unwrap().to_owned())])
+            .map(
+                |f| vec![local_unpartitioned_file(f.to_str().unwrap().to_owned()).into()],
+            )
             .collect::<Vec<_>>()
     } else {
-        vec![vec![local_unpartitioned_file(path)]]
+        vec![vec![local_unpartitioned_file(path).into()]]
     };
 
     Ok(FileScanConfig {
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index bda107f13..51ddff2fb 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -17,14 +17,12 @@
 
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
-use datafusion::datasource::listing::local_unpartitioned_file;
 use datafusion::error::Result;
 use datafusion::physical_plan::file_format::FileScanConfig;
 use datafusion::physical_plan::{collect, ExecutionPlan};
 use datafusion::prelude::SessionContext;
-use datafusion_data_access::object_store::local::LocalFileSystem;
 use datafusion_data_access::object_store::local::{
-    local_object_reader, local_object_reader_stream,
+    local_unpartitioned_file, LocalFileSystem,
 };
 use datafusion_row::layout::RowType::{Compact, WordAligned};
 use datafusion_row::reader::read_as_batch;
@@ -80,20 +78,24 @@ async fn get_exec(
 ) -> Result<Arc<dyn ExecutionPlan>> {
     let testdata = datafusion::test_util::parquet_test_data();
     let filename = format!("{}/{}", testdata, file_name);
+    let meta = local_unpartitioned_file(filename);
+
     let format = ParquetFormat::default();
+    let store = Arc::new(LocalFileSystem {}) as _;
+
     let file_schema = format
-        .infer_schema(local_object_reader_stream(vec![filename.clone()]))
+        .infer_schema(&store, &[meta.clone()])
         .await
         .expect("Schema inference");
     let statistics = format
-        .infer_stats(local_object_reader(filename.clone()), file_schema.clone())
+        .infer_stats(&store, file_schema.clone(), &meta)
         .await
         .expect("Stats inference");
-    let file_groups = vec![vec![local_unpartitioned_file(filename.clone())]];
+    let file_groups = vec![vec![meta.into()]];
     let exec = format
         .create_physical_plan(
             FileScanConfig {
-                object_store: Arc::new(LocalFileSystem {}),
+                object_store: store,
                 file_schema,
                 file_groups,
                 statistics,
diff --git a/dev/build-arrow-ballista.sh b/dev/build-arrow-ballista.sh
index 0c3922627..ebc5fd736 100755
--- a/dev/build-arrow-ballista.sh
+++ b/dev/build-arrow-ballista.sh
@@ -23,8 +23,7 @@ set -e
 rm -rf arrow-ballista 2>/dev/null
 
 # clone the repo
-# TODO make repo/branch configurable
-git clone https://github.com/apache/arrow-ballista
+git clone https://github.com/tustvold/arrow-ballista -b fix-file-format
 
 # update dependencies to local crates
 python ./dev/make-ballista-deps-local.py