You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/07/04 10:14:45 UTC

[arrow-datafusion] branch master updated: Switch to object_store crate (#2489) (#2677)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new bf7564f62 Switch to object_store crate (#2489) (#2677)
bf7564f62 is described below

commit bf7564f62117487a9a397c25326fcda1ed22d08d
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Jul 4 11:14:39 2022 +0100

    Switch to object_store crate (#2489) (#2677)
    
    * Switch to object_store crate (#2489)
    
    * Test fixes
    
    * Update to object_store 0.2.0
    
    * More windows pacification
    
    * Fix windows test
    
    * Fix windows test_prefix_path
    
    * More windows fixes
    
    * Simplify ListingTableUrl::strip_prefix
    
    * Review feedback
    
    * Update to latest arrow-rs
    
    * Use ParquetRecordBatchStream
    
    * Simplify predicate pruning
    
    * Add host to ObjectStoreRegistry
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/common/Cargo.toml                       |   1 +
 datafusion/common/src/error.rs                     |  21 +
 datafusion/core/Cargo.toml                         |   7 +-
 datafusion/core/src/catalog/schema.rs              |   3 -
 datafusion/core/src/datasource/file_format/avro.rs |  19 +-
 datafusion/core/src/datasource/file_format/csv.rs  |  20 +-
 datafusion/core/src/datasource/file_format/json.rs |  43 +-
 datafusion/core/src/datasource/file_format/mod.rs  |  16 +-
 .../core/src/datasource/file_format/parquet.rs     | 160 +++----
 datafusion/core/src/datasource/listing/helpers.rs  | 173 +++----
 datafusion/core/src/datasource/listing/mod.rs      |  32 +-
 datafusion/core/src/datasource/listing/table.rs    |   4 +-
 datafusion/core/src/datasource/listing/url.rs      | 134 +++---
 datafusion/core/src/datasource/object_store.rs     |  34 +-
 datafusion/core/src/execution/runtime_env.rs       |   8 +-
 datafusion/core/src/lib.rs                         |   1 -
 .../core/src/physical_plan/file_format/avro.rs     | 119 +++--
 .../core/src/physical_plan/file_format/csv.rs      | 103 ++--
 .../src/physical_plan/file_format/file_stream.rs   | 314 ++++++++-----
 .../core/src/physical_plan/file_format/json.rs     |  81 ++--
 .../core/src/physical_plan/file_format/mod.rs      |   2 +-
 .../core/src/physical_plan/file_format/parquet.rs  | 522 +++++++++------------
 datafusion/core/src/physical_plan/mod.rs           |   6 +-
 datafusion/core/src/test/mod.rs                    |   6 +-
 datafusion/core/src/test/object_store.rs           | 121 +----
 datafusion/core/tests/path_partition.rs            | 124 +++--
 datafusion/core/tests/row.rs                       |  11 +-
 datafusion/core/tests/sql/mod.rs                   |  15 +-
 28 files changed, 1045 insertions(+), 1055 deletions(-)

diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index 367a8fb70..03809bcf5 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -41,6 +41,7 @@ pyarrow = ["pyo3"]
 arrow = { version = "17.0.0", features = ["prettyprint"] }
 avro-rs = { version = "0.13", features = ["snappy"], optional = true }
 cranelift-module = { version = "0.85.0", optional = true }
+object_store = { version = "0.3", optional = true }
 ordered-float = "3.0"
 parquet = { version = "17.0.0", features = ["arrow"], optional = true }
 pyo3 = { version = "0.16", optional = true }
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index 014034672..c1d0f29b1 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -49,6 +49,9 @@ pub enum DataFusionError {
     /// Wraps an error from the Avro crate
     #[cfg(feature = "avro")]
     AvroError(AvroError),
+    /// Wraps an error from the object_store crate
+    #[cfg(feature = "object_store")]
+    ObjectStore(object_store::Error),
     /// Error associated to I/O operations and associated traits.
     IoError(io::Error),
     /// Error returned when SQL is syntactically incorrect.
@@ -203,6 +206,20 @@ impl From<AvroError> for DataFusionError {
     }
 }
 
+#[cfg(feature = "object_store")]
+impl From<object_store::Error> for DataFusionError {
+    fn from(e: object_store::Error) -> Self {
+        DataFusionError::ObjectStore(e)
+    }
+}
+
+#[cfg(feature = "object_store")]
+impl From<object_store::path::Error> for DataFusionError {
+    fn from(e: object_store::path::Error) -> Self {
+        DataFusionError::ObjectStore(e.into())
+    }
+}
+
 impl From<ParserError> for DataFusionError {
     fn from(e: ParserError) -> Self {
         DataFusionError::SQL(e)
@@ -264,6 +281,10 @@ impl Display for DataFusionError {
             DataFusionError::JITError(ref desc) => {
                 write!(f, "JIT error: {}", desc)
             }
+            #[cfg(feature = "object_store")]
+            DataFusionError::ObjectStore(ref desc) => {
+                write!(f, "Object Store error: {}", desc)
+            }
         }
     }
 }
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index af49d862f..c6cf32bd7 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -58,9 +58,9 @@ ahash = { version = "0.7", default-features = false }
 arrow = { version = "17.0.0", features = ["prettyprint"] }
 async-trait = "0.1.41"
 avro-rs = { version = "0.13", features = ["snappy"], optional = true }
+bytes = "1.1"
 chrono = { version = "0.4", default-features = false }
-datafusion-common = { path = "../common", version = "9.0.0", features = ["parquet"] }
-datafusion-data-access = { path = "../data-access", version = "9.0.0" }
+datafusion-common = { path = "../common", version = "9.0.0", features = ["parquet", "object_store"] }
 datafusion-expr = { path = "../expr", version = "9.0.0" }
 datafusion-jit = { path = "../jit", version = "9.0.0", optional = true }
 datafusion-optimizer = { path = "../optimizer", version = "9.0.0" }
@@ -75,9 +75,10 @@ lazy_static = { version = "^1.4.0" }
 log = "^0.4"
 num-traits = { version = "0.2", optional = true }
 num_cpus = "1.13.0"
+object_store = "0.3.0"
 ordered-float = "3.0"
 parking_lot = "0.12"
-parquet = { version = "17.0.0", features = ["arrow"] }
+parquet = { version = "17.0.0", features = ["arrow", "async"] }
 paste = "^1.0"
 pin-project-lite = "^0.2.7"
 pyo3 = { version = "0.16", optional = true }
diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs
index db25c1edc..7634328f3 100644
--- a/datafusion/core/src/catalog/schema.rs
+++ b/datafusion/core/src/catalog/schema.rs
@@ -132,7 +132,6 @@ mod tests {
     use std::sync::Arc;
 
     use arrow::datatypes::Schema;
-    use datafusion_data_access::object_store::local::LocalFileSystem;
 
     use crate::assert_batches_eq;
     use crate::catalog::catalog::{CatalogProvider, MemoryCatalogProvider};
@@ -170,8 +169,6 @@ mod tests {
         let schema = MemorySchemaProvider::new();
 
         let ctx = SessionContext::new();
-        let store = Arc::new(LocalFileSystem {});
-        ctx.runtime_env().register_object_store("file", store);
 
         let config = ListingTableConfig::new(table_path)
             .infer(&ctx.state())
diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs
index 63781da52..dec368808 100644
--- a/datafusion/core/src/datasource/file_format/avro.rs
+++ b/datafusion/core/src/datasource/file_format/avro.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
 use arrow::datatypes::Schema;
 use arrow::{self, datatypes::SchemaRef};
 use async_trait::async_trait;
-use datafusion_data_access::FileMeta;
+use object_store::{GetResult, ObjectMeta, ObjectStore};
 
 use super::FileFormat;
 use crate::avro_to_arrow::read_avro_schema_from_reader;
@@ -32,7 +32,6 @@ use crate::logical_plan::Expr;
 use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::Statistics;
-use datafusion_data_access::object_store::ObjectStore;
 
 /// The default file extension of avro files
 pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
@@ -49,12 +48,18 @@ impl FileFormat for AvroFormat {
     async fn infer_schema(
         &self,
         store: &Arc<dyn ObjectStore>,
-        files: &[FileMeta],
+        objects: &[ObjectMeta],
     ) -> Result<SchemaRef> {
         let mut schemas = vec![];
-        for file in files {
-            let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
-            let schema = read_avro_schema_from_reader(&mut reader)?;
+        for object in objects {
+            let schema = match store.get(&object.location).await? {
+                GetResult::File(mut file, _) => read_avro_schema_from_reader(&mut file)?,
+                r @ GetResult::Stream(_) => {
+                    // TODO: Fetching entire file to get schema is potentially wasteful
+                    let data = r.bytes().await?;
+                    read_avro_schema_from_reader(&mut data.as_ref())?
+                }
+            };
             schemas.push(schema);
         }
         let merged_schema = Schema::try_merge(schemas)?;
@@ -65,7 +70,7 @@ impl FileFormat for AvroFormat {
         &self,
         _store: &Arc<dyn ObjectStore>,
         _table_schema: SchemaRef,
-        _file: &FileMeta,
+        _object: &ObjectMeta,
     ) -> Result<Statistics> {
         Ok(Statistics::default())
     }
diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs
index 0d665e9ef..d72a6e767 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -23,7 +23,9 @@ use std::sync::Arc;
 use arrow::datatypes::Schema;
 use arrow::{self, datatypes::SchemaRef};
 use async_trait::async_trait;
-use datafusion_data_access::FileMeta;
+use datafusion_common::DataFusionError;
+use futures::TryFutureExt;
+use object_store::{ObjectMeta, ObjectStore};
 
 use super::FileFormat;
 use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
@@ -32,7 +34,6 @@ use crate::logical_plan::Expr;
 use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::Statistics;
-use datafusion_data_access::object_store::ObjectStore;
 
 /// The default file extension of csv files
 pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
@@ -96,16 +97,21 @@ impl FileFormat for CsvFormat {
     async fn infer_schema(
         &self,
         store: &Arc<dyn ObjectStore>,
-        files: &[FileMeta],
+        objects: &[ObjectMeta],
     ) -> Result<SchemaRef> {
         let mut schemas = vec![];
 
         let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
 
-        for file in files {
-            let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
+        for object in objects {
+            let data = store
+                .get(&object.location)
+                .and_then(|r| r.bytes())
+                .await
+                .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
             let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
-                &mut reader,
+                &mut data.as_ref(),
                 self.delimiter,
                 Some(records_to_read),
                 self.has_header,
@@ -128,7 +134,7 @@ impl FileFormat for CsvFormat {
         &self,
         _store: &Arc<dyn ObjectStore>,
         _table_schema: SchemaRef,
-        _file: &FileMeta,
+        _object: &ObjectMeta,
     ) -> Result<Statistics> {
         Ok(Statistics::default())
     }
diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs
index e9b49c42d..9a889ab4c 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef;
 use arrow::json::reader::infer_json_schema_from_iterator;
 use arrow::json::reader::ValueIter;
 use async_trait::async_trait;
-use datafusion_data_access::{object_store::ObjectStore, FileMeta};
+use object_store::{GetResult, ObjectMeta, ObjectStore};
 
 use super::FileFormat;
 use super::FileScanConfig;
@@ -71,21 +71,33 @@ impl FileFormat for JsonFormat {
     async fn infer_schema(
         &self,
         store: &Arc<dyn ObjectStore>,
-        files: &[FileMeta],
+        objects: &[ObjectMeta],
     ) -> Result<SchemaRef> {
         let mut schemas = Vec::new();
         let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
-        for file in files {
-            let reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
-            let mut reader = BufReader::new(reader);
-            let iter = ValueIter::new(&mut reader, None);
-            let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
+        for object in objects {
+            let mut take_while = || {
                 let should_take = records_to_read > 0;
                 if should_take {
                     records_to_read -= 1;
                 }
                 should_take
-            }))?;
+            };
+
+            let schema = match store.get(&object.location).await? {
+                GetResult::File(file, _) => {
+                    let mut reader = BufReader::new(file);
+                    let iter = ValueIter::new(&mut reader, None);
+                    infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
+                }
+                r @ GetResult::Stream(_) => {
+                    let data = r.bytes().await?;
+                    let mut reader = BufReader::new(data.as_ref());
+                    let iter = ValueIter::new(&mut reader, None);
+                    infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
+                }
+            };
+
             schemas.push(schema);
             if records_to_read == 0 {
                 break;
@@ -100,7 +112,7 @@ impl FileFormat for JsonFormat {
         &self,
         _store: &Arc<dyn ObjectStore>,
         _table_schema: SchemaRef,
-        _file: &FileMeta,
+        _object: &ObjectMeta,
     ) -> Result<Statistics> {
         Ok(Statistics::default())
     }
@@ -120,15 +132,12 @@ mod tests {
     use super::super::test_util::scan_format;
     use arrow::array::Int64Array;
     use futures::StreamExt;
+    use object_store::local::LocalFileSystem;
 
     use super::*;
+    use crate::physical_plan::collect;
     use crate::prelude::{SessionConfig, SessionContext};
-    use crate::{
-        datafusion_data_access::object_store::local::{
-            local_unpartitioned_file, LocalFileSystem,
-        },
-        physical_plan::collect,
-    };
+    use crate::test::object_store::local_unpartitioned_file;
 
     #[tokio::test]
     async fn read_small_batches() -> Result<()> {
@@ -229,12 +238,12 @@ mod tests {
 
     #[tokio::test]
     async fn infer_schema_with_limit() {
-        let store = Arc::new(LocalFileSystem {}) as _;
+        let store = Arc::new(LocalFileSystem::new()) as _;
         let filename = "tests/jsons/schema_infer_limit.json";
         let format = JsonFormat::default().with_schema_infer_max_rec(Some(3));
 
         let file_schema = format
-            .infer_schema(&store, &[local_unpartitioned_file(filename.to_string())])
+            .infer_schema(&store, &[local_unpartitioned_file(filename)])
             .await
             .expect("Schema inference");
 
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index a15750394..8a0d5b97e 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -36,8 +36,7 @@ use crate::physical_plan::file_format::FileScanConfig;
 use crate::physical_plan::{ExecutionPlan, Statistics};
 
 use async_trait::async_trait;
-use datafusion_data_access::object_store::ObjectStore;
-use datafusion_data_access::FileMeta;
+use object_store::{ObjectMeta, ObjectStore};
 
 /// This trait abstracts all the file format specific implementations
 /// from the `TableProvider`. This helps code re-utilization across
@@ -55,7 +54,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
     async fn infer_schema(
         &self,
         store: &Arc<dyn ObjectStore>,
-        files: &[FileMeta],
+        objects: &[ObjectMeta],
     ) -> Result<SchemaRef>;
 
     /// Infer the statistics for the provided object. The cost and accuracy of the
@@ -69,7 +68,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
         &self,
         store: &Arc<dyn ObjectStore>,
         table_schema: SchemaRef,
-        file: &FileMeta,
+        object: &ObjectMeta,
     ) -> Result<Statistics>;
 
     /// Take a list of files and convert it to the appropriate executor
@@ -86,9 +85,8 @@ pub(crate) mod test_util {
     use super::*;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
-    use datafusion_data_access::object_store::local::{
-        local_unpartitioned_file, LocalFileSystem,
-    };
+    use crate::test::object_store::local_unpartitioned_file;
+    use object_store::local::LocalFileSystem;
 
     pub async fn scan_format(
         format: &dyn FileFormat,
@@ -97,7 +95,7 @@ pub(crate) mod test_util {
         projection: Option<Vec<usize>>,
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let store = Arc::new(LocalFileSystem {}) as _;
+        let store = Arc::new(LocalFileSystem::new()) as _;
         let meta = local_unpartitioned_file(format!("{}/{}", store_root, file_name));
 
         let file_schema = format.infer_schema(&store, &[meta.clone()]).await?;
@@ -107,7 +105,7 @@ pub(crate) mod test_util {
             .await?;
 
         let file_groups = vec![vec![PartitionedFile {
-            file_meta: meta,
+            object_meta: meta,
             partition_values: vec![],
             range: None,
         }]];
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 8825fe7e0..19794f6ca 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -18,21 +18,17 @@
 //! Parquet format abstractions
 
 use std::any::Any;
-use std::io::Read;
 use std::sync::Arc;
 
 use arrow::datatypes::Schema;
 use arrow::datatypes::SchemaRef;
 use async_trait::async_trait;
-use datafusion_data_access::FileMeta;
+use datafusion_common::DataFusionError;
 use hashbrown::HashMap;
-use parquet::arrow::ArrowReader;
-use parquet::arrow::ParquetFileArrowReader;
-use parquet::errors::ParquetError;
-use parquet::errors::Result as ParquetResult;
-use parquet::file::reader::ChunkReader;
-use parquet::file::reader::Length;
-use parquet::file::serialized_reader::SerializedFileReader;
+use object_store::{ObjectMeta, ObjectStore};
+use parquet::arrow::parquet_to_arrow_schema;
+use parquet::file::footer::{decode_footer, decode_metadata};
+use parquet::file::metadata::ParquetMetaData;
 use parquet::file::statistics::Statistics as ParquetStatistics;
 
 use super::FileFormat;
@@ -42,15 +38,12 @@ use crate::arrow::array::{
 };
 use crate::arrow::datatypes::{DataType, Field};
 use crate::datasource::{create_max_min_accs, get_col_stats};
-use crate::error::DataFusionError;
 use crate::error::Result;
 use crate::logical_plan::combine_filters;
 use crate::logical_plan::Expr;
 use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
 use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
-use crate::physical_plan::{metrics, ExecutionPlan};
-use crate::physical_plan::{Accumulator, Statistics};
-use datafusion_data_access::object_store::{ObjectReader, ObjectStore};
+use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics};
 
 /// The default file extension of parquet files
 pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
@@ -91,11 +84,11 @@ impl FileFormat for ParquetFormat {
     async fn infer_schema(
         &self,
         store: &Arc<dyn ObjectStore>,
-        files: &[FileMeta],
+        objects: &[ObjectMeta],
     ) -> Result<SchemaRef> {
-        let mut schemas = Vec::with_capacity(files.len());
-        for file in files {
-            let schema = fetch_schema(store.as_ref(), file)?;
+        let mut schemas = Vec::with_capacity(objects.len());
+        for object in objects {
+            let schema = fetch_schema(store.as_ref(), object).await?;
             schemas.push(schema)
         }
         let schema = Schema::try_merge(schemas)?;
@@ -106,9 +99,9 @@ impl FileFormat for ParquetFormat {
         &self,
         store: &Arc<dyn ObjectStore>,
         table_schema: SchemaRef,
-        file: &FileMeta,
+        object: &ObjectMeta,
     ) -> Result<Statistics> {
-        let stats = fetch_statistics(store.as_ref(), table_schema, file)?;
+        let stats = fetch_statistics(store.as_ref(), table_schema, object).await?;
         Ok(stats)
     }
 
@@ -294,37 +287,70 @@ fn summarize_min_max(
     }
 }
 
-/// Read and parse the schema of the Parquet file at location `path`
-fn fetch_schema(store: &dyn ObjectStore, file: &FileMeta) -> Result<Schema> {
-    let object_reader = store.file_reader(file.sized_file.clone())?;
-    let obj_reader = ChunkObjectReader {
-        object_reader,
-        bytes_scanned: None,
-    };
-    let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
-    let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-    let schema = arrow_reader.get_schema()?;
+pub(crate) async fn fetch_parquet_metadata(
+    store: &dyn ObjectStore,
+    meta: &ObjectMeta,
+) -> Result<ParquetMetaData> {
+    if meta.size < 8 {
+        return Err(DataFusionError::Execution(format!(
+            "file size of {} is less than footer",
+            meta.size
+        )));
+    }
 
+    let footer_start = meta.size - 8;
+    let suffix = store
+        .get_range(&meta.location, footer_start..meta.size)
+        .await?;
+
+    let mut footer = [0; 8];
+    footer.copy_from_slice(suffix.as_ref());
+
+    let length = decode_footer(&footer)?;
+
+    if meta.size < length + 8 {
+        return Err(DataFusionError::Execution(format!(
+            "file size of {} is less than footer + metadata {}",
+            meta.size,
+            length + 8
+        )));
+    }
+
+    let metadata_start = meta.size - length - 8;
+    let metadata = store
+        .get_range(&meta.location, metadata_start..footer_start)
+        .await?;
+
+    Ok(decode_metadata(metadata.as_ref())?)
+}
+
+/// Read and parse the schema of the Parquet file at location `path`
+async fn fetch_schema(store: &dyn ObjectStore, file: &ObjectMeta) -> Result<Schema> {
+    let metadata = fetch_parquet_metadata(store, file).await?;
+    let file_metadata = metadata.file_metadata();
+    let schema = parquet_to_arrow_schema(
+        file_metadata.schema_descr(),
+        file_metadata.key_value_metadata(),
+    )?;
     Ok(schema)
 }
 
 /// Read and parse the statistics of the Parquet file at location `path`
-fn fetch_statistics(
+async fn fetch_statistics(
     store: &dyn ObjectStore,
     table_schema: SchemaRef,
-    file: &FileMeta,
+    file: &ObjectMeta,
 ) -> Result<Statistics> {
-    let object_reader = store.file_reader(file.sized_file.clone())?;
-    let obj_reader = ChunkObjectReader {
-        object_reader,
-        bytes_scanned: None,
-    };
-    let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
-    let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-    let file_schema = arrow_reader.get_schema()?;
+    let metadata = fetch_parquet_metadata(store, file).await?;
+    let file_metadata = metadata.file_metadata();
+
+    let file_schema = parquet_to_arrow_schema(
+        file_metadata.schema_descr(),
+        file_metadata.key_value_metadata(),
+    )?;
+
     let num_fields = table_schema.fields().len();
     let fields = table_schema.fields().to_vec();
-    let meta_data = arrow_reader.metadata();
 
     let mut num_rows = 0;
     let mut total_byte_size = 0;
@@ -335,7 +361,7 @@ fn fetch_statistics(
 
     let (mut max_values, mut min_values) = create_max_min_accs(&table_schema);
 
-    for row_group_meta in meta_data.row_groups() {
+    for row_group_meta in metadata.row_groups() {
         num_rows += row_group_meta.num_rows();
         total_byte_size += row_group_meta.total_byte_size();
 
@@ -395,46 +421,18 @@ fn fetch_statistics(
     Ok(statistics)
 }
 
-/// A wrapper around the object reader to make it implement `ChunkReader`
-pub struct ChunkObjectReader {
-    /// The underlying object reader
-    pub object_reader: Arc<dyn ObjectReader>,
-    /// Optional counter which will track total number of bytes scanned
-    pub bytes_scanned: Option<metrics::Count>,
-}
-
-impl Length for ChunkObjectReader {
-    fn len(&self) -> u64 {
-        self.object_reader.length()
-    }
-}
-
-impl ChunkReader for ChunkObjectReader {
-    type T = Box<dyn Read + Send + Sync>;
-
-    fn get_read(&self, start: u64, length: usize) -> ParquetResult<Self::T> {
-        if let Some(m) = self.bytes_scanned.as_ref() {
-            m.add(length)
-        }
-        self.object_reader
-            .sync_chunk_reader(start, length)
-            .map_err(DataFusionError::IoError)
-            .map_err(|e| ParquetError::ArrowError(e.to_string()))
-    }
-}
-
 #[cfg(test)]
 pub(crate) mod test_util {
     use super::*;
+    use crate::test::object_store::local_unpartitioned_file;
     use arrow::record_batch::RecordBatch;
-    use datafusion_data_access::object_store::local::local_unpartitioned_file;
     use parquet::arrow::ArrowWriter;
     use parquet::file::properties::WriterProperties;
     use tempfile::NamedTempFile;
 
     pub async fn store_parquet(
         batches: Vec<RecordBatch>,
-    ) -> Result<(Vec<FileMeta>, Vec<NamedTempFile>)> {
+    ) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
         let files: Vec<_> = batches
             .into_iter()
             .map(|batch| {
@@ -451,11 +449,7 @@ pub(crate) mod test_util {
             })
             .collect();
 
-        let meta: Vec<_> = files
-            .iter()
-            .map(|f| local_unpartitioned_file(f.path().to_string_lossy().to_string()))
-            .collect();
-
+        let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
         Ok((meta, files))
     }
 }
@@ -464,7 +458,6 @@ pub(crate) mod test_util {
 mod tests {
     use super::super::test_util::scan_format;
     use crate::physical_plan::collect;
-    use datafusion_data_access::object_store::local::LocalFileSystem;
 
     use super::*;
 
@@ -478,6 +471,7 @@ mod tests {
     use arrow::record_batch::RecordBatch;
     use datafusion_common::ScalarValue;
     use futures::StreamExt;
+    use object_store::local::LocalFileSystem;
 
     #[tokio::test]
     async fn read_merged_batches() -> Result<()> {
@@ -489,13 +483,13 @@ mod tests {
         let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap();
         let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
 
-        let store = Arc::new(LocalFileSystem {}) as _;
+        let store = Arc::new(LocalFileSystem::new()) as _;
         let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
 
         let format = ParquetFormat::default();
         let schema = format.infer_schema(&store, &meta).await.unwrap();
 
-        let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0])?;
+        let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0]).await?;
 
         assert_eq!(stats.num_rows, Some(3));
         let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
@@ -503,7 +497,7 @@ mod tests {
         assert_eq!(c1_stats.null_count, Some(1));
         assert_eq!(c2_stats.null_count, Some(3));
 
-        let stats = fetch_statistics(store.as_ref(), schema, &meta[1])?;
+        let stats = fetch_statistics(store.as_ref(), schema, &meta[1]).await?;
         assert_eq!(stats.num_rows, Some(3));
         let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
         let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
@@ -560,8 +554,8 @@ mod tests {
         let _ = collect(exec.clone(), task_ctx.clone()).await?;
         let _ = collect(exec_projected.clone(), task_ctx).await?;
 
-        assert_bytes_scanned(exec, 1409);
-        assert_bytes_scanned(exec_projected, 811);
+        assert_bytes_scanned(exec, 671);
+        assert_bytes_scanned(exec_projected, 73);
 
         Ok(())
     }
@@ -580,7 +574,7 @@ mod tests {
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
         assert_eq!(11, batches[0].num_columns());
-        assert_eq!(8, batches[0].num_rows());
+        assert_eq!(1, batches[0].num_rows());
 
         Ok(())
     }
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index 4b482d968..873d005b4 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -41,8 +41,10 @@ use crate::{
 
 use super::PartitionedFile;
 use crate::datasource::listing::ListingTableUrl;
-use datafusion_data_access::{object_store::ObjectStore, FileMeta, SizedFile};
+use datafusion_common::DataFusionError;
 use datafusion_expr::Volatility;
+use object_store::path::Path;
+use object_store::{ObjectMeta, ObjectStore};
 
 const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_";
 const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_";
@@ -184,7 +186,7 @@ pub async fn pruned_partition_list<'a>(
         Ok(Box::pin(list.try_filter_map(move |file_meta| async move {
             let parsed_path = parse_partitions_for_path(
                 table_path,
-                file_meta.path(),
+                &file_meta.location,
                 table_partition_cols,
             )
             .map(|p| {
@@ -195,7 +197,7 @@ pub async fn pruned_partition_list<'a>(
 
             Ok(parsed_path.map(|partition_values| PartitionedFile {
                 partition_values,
-                file_meta,
+                object_meta: file_meta,
                 range: None,
             }))
         })))
@@ -214,10 +216,9 @@ pub async fn pruned_partition_list<'a>(
             df = df.filter(filter.clone())?;
         }
         let filtered_batches = df.collect().await?;
+        let paths = batches_to_paths(&filtered_batches)?;
 
-        Ok(Box::pin(futures::stream::iter(
-            batches_to_paths(&filtered_batches).into_iter().map(Ok),
-        )))
+        Ok(Box::pin(futures::stream::iter(paths.into_iter().map(Ok))))
     }
 }
 
@@ -231,7 +232,7 @@ pub async fn pruned_partition_list<'a>(
 fn paths_to_batch(
     table_partition_cols: &[String],
     table_path: &ListingTableUrl,
-    metas: &[FileMeta],
+    metas: &[ObjectMeta],
 ) -> Result<RecordBatch> {
     let mut key_builder = StringBuilder::new(metas.len());
     let mut length_builder = UInt64Builder::new(metas.len());
@@ -241,20 +242,19 @@ fn paths_to_batch(
         .map(|_| StringBuilder::new(metas.len()))
         .collect::<Vec<_>>();
     for file_meta in metas {
-        if let Some(partition_values) =
-            parse_partitions_for_path(table_path, file_meta.path(), table_partition_cols)
-        {
-            key_builder.append_value(file_meta.path())?;
-            length_builder.append_value(file_meta.size())?;
-            match file_meta.last_modified {
-                Some(lm) => modified_builder.append_value(lm.timestamp_millis())?,
-                None => modified_builder.append_null()?,
-            }
+        if let Some(partition_values) = parse_partitions_for_path(
+            table_path,
+            &file_meta.location,
+            table_partition_cols,
+        ) {
+            key_builder.append_value(file_meta.location.as_ref())?;
+            length_builder.append_value(file_meta.size as u64)?;
+            modified_builder.append_value(file_meta.last_modified.timestamp_millis())?;
             for (i, part_val) in partition_values.iter().enumerate() {
                 partition_builders[i].append_value(part_val)?;
             }
         } else {
-            debug!("No partitioning for path {}", file_meta.path());
+            debug!("No partitioning for path {}", file_meta.location);
         }
     }
 
@@ -283,7 +283,7 @@ fn paths_to_batch(
 }
 
 /// convert a set of record batches created by `paths_to_batch()` back to partitioned files.
-fn batches_to_paths(batches: &[RecordBatch]) -> Vec<PartitionedFile> {
+fn batches_to_paths(batches: &[RecordBatch]) -> Result<Vec<PartitionedFile>> {
     batches
         .iter()
         .flat_map(|batch| {
@@ -303,23 +303,21 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Vec<PartitionedFile> {
                 .downcast_ref::<Date64Array>()
                 .unwrap();
 
-            (0..batch.num_rows()).map(move |row| PartitionedFile {
-                file_meta: FileMeta {
-                    last_modified: match modified_array.is_null(row) {
-                        false => Some(Utc.timestamp_millis(modified_array.value(row))),
-                        true => None,
-                    },
-                    sized_file: SizedFile {
-                        path: key_array.value(row).to_owned(),
-                        size: length_array.value(row),
+            (0..batch.num_rows()).map(move |row| {
+                Ok(PartitionedFile {
+                    object_meta: ObjectMeta {
+                        location: Path::parse(key_array.value(row))
+                            .map_err(|e| DataFusionError::External(Box::new(e)))?,
+                        last_modified: Utc.timestamp_millis(modified_array.value(row)),
+                        size: length_array.value(row) as usize,
                     },
-                },
-                partition_values: (3..batch.columns().len())
-                    .map(|col| {
-                        ScalarValue::try_from_array(batch.column(col), row).unwrap()
-                    })
-                    .collect(),
-                range: None,
+                    partition_values: (3..batch.columns().len())
+                        .map(|col| {
+                            ScalarValue::try_from_array(batch.column(col), row).unwrap()
+                        })
+                        .collect(),
+                    range: None,
+                })
             })
         })
         .collect()
@@ -329,7 +327,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Vec<PartitionedFile> {
 /// associated to the partitions defined by `table_partition_cols`
 fn parse_partitions_for_path<'a>(
     table_path: &ListingTableUrl,
-    file_path: &'a str,
+    file_path: &'a Path,
     table_partition_cols: &[String],
 ) -> Option<Vec<&'a str>> {
     let subpath = table_path.strip_prefix(file_path)?;
@@ -346,10 +344,8 @@ fn parse_partitions_for_path<'a>(
 
 #[cfg(test)]
 mod tests {
-    use crate::{
-        logical_plan::{case, col, lit},
-        test::object_store::TestObjectStore,
-    };
+    use crate::logical_plan::{case, col, lit};
+    use crate::test::object_store::make_test_store;
     use futures::StreamExt;
 
     use super::*;
@@ -396,7 +392,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_pruned_partition_list_empty() {
-        let store = TestObjectStore::new_arc(&[
+        let store = make_test_store(&[
             ("tablepath/mypartition=val1/notparquetfile", 100),
             ("tablepath/file.parquet", 100),
         ]);
@@ -418,7 +414,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_pruned_partition_list() {
-        let store = TestObjectStore::new_arc(&[
+        let store = make_test_store(&[
             ("tablepath/mypartition=val1/file.parquet", 100),
             ("tablepath/mypartition=val2/file.parquet", 100),
             ("tablepath/mypartition=val1/other=val3/file.parquet", 100),
@@ -440,7 +436,7 @@ mod tests {
         assert_eq!(pruned.len(), 2);
         let f1 = &pruned[0];
         assert_eq!(
-            f1.file_meta.path(),
+            f1.object_meta.location.as_ref(),
             "tablepath/mypartition=val1/file.parquet"
         );
         assert_eq!(
@@ -449,7 +445,7 @@ mod tests {
         );
         let f2 = &pruned[1];
         assert_eq!(
-            f2.file_meta.path(),
+            f2.object_meta.location.as_ref(),
             "tablepath/mypartition=val1/other=val3/file.parquet"
         );
         assert_eq!(
@@ -460,7 +456,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_pruned_partition_list_multi() {
-        let store = TestObjectStore::new_arc(&[
+        let store = make_test_store(&[
             ("tablepath/part1=p1v1/file.parquet", 100),
             ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
             ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
@@ -487,7 +483,7 @@ mod tests {
         assert_eq!(pruned.len(), 2);
         let f1 = &pruned[0];
         assert_eq!(
-            f1.file_meta.path(),
+            f1.object_meta.location.as_ref(),
             "tablepath/part1=p1v2/part2=p2v1/file1.parquet"
         );
         assert_eq!(
@@ -499,7 +495,7 @@ mod tests {
         );
         let f2 = &pruned[1];
         assert_eq!(
-            f2.file_meta.path(),
+            f2.object_meta.location.as_ref(),
             "tablepath/part1=p1v2/part2=p2v1/file2.parquet"
         );
         assert_eq!(
@@ -517,7 +513,7 @@ mod tests {
             Some(vec![]),
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
-                "bucket/mytable/file.csv",
+                &Path::from("bucket/mytable/file.csv"),
                 &[]
             )
         );
@@ -525,7 +521,7 @@ mod tests {
             None,
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/othertable").unwrap(),
-                "bucket/mytable/file.csv",
+                &Path::from("bucket/mytable/file.csv"),
                 &[]
             )
         );
@@ -533,7 +529,7 @@ mod tests {
             None,
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
-                "bucket/mytable/file.csv",
+                &Path::from("bucket/mytable/file.csv"),
                 &[String::from("mypartition")]
             )
         );
@@ -541,7 +537,7 @@ mod tests {
             Some(vec!["v1"]),
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
-                "bucket/mytable/mypartition=v1/file.csv",
+                &Path::from("bucket/mytable/mypartition=v1/file.csv"),
                 &[String::from("mypartition")]
             )
         );
@@ -549,7 +545,7 @@ mod tests {
             Some(vec!["v1"]),
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(),
-                "bucket/mytable/mypartition=v1/file.csv",
+                &Path::from("bucket/mytable/mypartition=v1/file.csv"),
                 &[String::from("mypartition")]
             )
         );
@@ -558,7 +554,7 @@ mod tests {
             None,
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
-                "bucket/mytable/v1/file.csv",
+                &Path::from("bucket/mytable/v1/file.csv"),
                 &[String::from("mypartition")]
             )
         );
@@ -566,7 +562,7 @@ mod tests {
             Some(vec!["v1", "v2"]),
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
-                "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv",
+                &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
                 &[String::from("mypartition"), String::from("otherpartition")]
             )
         );
@@ -574,49 +570,24 @@ mod tests {
             Some(vec!["v1"]),
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
-                "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv",
-                &[String::from("mypartition")]
-            )
-        );
-    }
-
-    #[cfg(target_os = "windows")]
-    #[test]
-    fn test_parse_partitions_for_path_windows() {
-        assert_eq!(
-            Some(vec!["v1"]),
-            parse_partitions_for_path(
-                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
-                "bucket\\mytable\\mypartition=v1\\file.csv",
+                &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
                 &[String::from("mypartition")]
             )
         );
-        assert_eq!(
-            Some(vec!["v1", "v2"]),
-            parse_partitions_for_path(
-                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
-                "bucket\\mytable\\mypartition=v1\\otherpartition=v2\\file.csv",
-                &[String::from("mypartition"), String::from("otherpartition")]
-            )
-        );
     }
 
     #[test]
     fn test_path_batch_roundtrip_no_partiton() {
         let files = vec![
-            FileMeta {
-                sized_file: SizedFile {
-                    path: String::from("mybucket/tablepath/part1=val1/file.parquet"),
-                    size: 100,
-                },
-                last_modified: Some(Utc.timestamp_millis(1634722979123)),
+            ObjectMeta {
+                location: Path::from("mybucket/tablepath/part1=val1/file.parquet"),
+                last_modified: Utc.timestamp_millis(1634722979123),
+                size: 100,
             },
-            FileMeta {
-                sized_file: SizedFile {
-                    path: String::from("mybucket/tablepath/part1=val2/file.parquet"),
-                    size: 100,
-                },
-                last_modified: None,
+            ObjectMeta {
+                location: Path::from("mybucket/tablepath/part1=val2/file.parquet"),
+                last_modified: Utc.timestamp_millis(0),
+                size: 100,
             },
         ];
 
@@ -624,14 +595,14 @@ mod tests {
         let batches = paths_to_batch(&[], &table_path, &files)
             .expect("Serialization of file list to batch failed");
 
-        let parsed_files = batches_to_paths(&[batches]);
+        let parsed_files = batches_to_paths(&[batches]).unwrap();
         assert_eq!(parsed_files.len(), 2);
         assert_eq!(&parsed_files[0].partition_values, &[]);
         assert_eq!(&parsed_files[1].partition_values, &[]);
 
         let parsed_metas = parsed_files
             .into_iter()
-            .map(|pf| pf.file_meta)
+            .map(|pf| pf.object_meta)
             .collect::<Vec<_>>();
         assert_eq!(parsed_metas, files);
     }
@@ -639,19 +610,15 @@ mod tests {
     #[test]
     fn test_path_batch_roundtrip_with_partition() {
         let files = vec![
-            FileMeta {
-                sized_file: SizedFile {
-                    path: String::from("mybucket/tablepath/part1=val1/file.parquet"),
-                    size: 100,
-                },
-                last_modified: Some(Utc.timestamp_millis(1634722979123)),
+            ObjectMeta {
+                location: Path::from("mybucket/tablepath/part1=val1/file.parquet"),
+                last_modified: Utc.timestamp_millis(1634722979123),
+                size: 100,
             },
-            FileMeta {
-                sized_file: SizedFile {
-                    path: String::from("mybucket/tablepath/part1=val2/file.parquet"),
-                    size: 100,
-                },
-                last_modified: None,
+            ObjectMeta {
+                location: Path::from("mybucket/tablepath/part1=val2/file.parquet"),
+                last_modified: Utc.timestamp_millis(0),
+                size: 100,
             },
         ];
 
@@ -662,7 +629,7 @@ mod tests {
         )
         .expect("Serialization of file list to batch failed");
 
-        let parsed_files = batches_to_paths(&[batches]);
+        let parsed_files = batches_to_paths(&[batches]).unwrap();
         assert_eq!(parsed_files.len(), 2);
         assert_eq!(
             &parsed_files[0].partition_values,
@@ -675,7 +642,7 @@ mod tests {
 
         let parsed_metas = parsed_files
             .into_iter()
-            .map(|pf| pf.file_meta)
+            .map(|pf| pf.object_meta)
             .collect::<Vec<_>>();
         assert_eq!(parsed_metas, files);
     }
diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs
index c11de5f80..85d4b6f7d 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -22,9 +22,11 @@ mod helpers;
 mod table;
 mod url;
 
+use crate::error::Result;
+use chrono::TimeZone;
 use datafusion_common::ScalarValue;
-use datafusion_data_access::{FileMeta, Result, SizedFile};
 use futures::Stream;
+use object_store::{path::Path, ObjectMeta};
 use std::pin::Pin;
 
 pub use self::url::ListingTableUrl;
@@ -51,7 +53,7 @@ pub struct FileRange {
 /// and partition column values that need to be appended to each row.
 pub struct PartitionedFile {
     /// Path for the file (e.g. URL, filesystem path, etc)
-    pub file_meta: FileMeta,
+    pub object_meta: ObjectMeta,
     /// Values of partition columns to be appended to each row
     pub partition_values: Vec<ScalarValue>,
     /// An optional file range for a more fine-grained parallel execution
@@ -62,9 +64,10 @@ impl PartitionedFile {
     /// Create a simple file without metadata or partition
     pub fn new(path: String, size: u64) -> Self {
         Self {
-            file_meta: FileMeta {
-                sized_file: SizedFile { path, size },
-                last_modified: None,
+            object_meta: ObjectMeta {
+                location: Path::from(path),
+                last_modified: chrono::Utc.timestamp_nanos(0),
+                size: size as usize,
             },
             partition_values: vec![],
             range: None,
@@ -74,9 +77,10 @@ impl PartitionedFile {
     /// Create a file range without metadata or partition
     pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
         Self {
-            file_meta: FileMeta {
-                sized_file: SizedFile { path, size },
-                last_modified: None,
+            object_meta: ObjectMeta {
+                location: Path::from(path),
+                last_modified: chrono::Utc.timestamp_nanos(0),
+                size: size as usize,
             },
             partition_values: vec![],
             range: Some(FileRange { start, end }),
@@ -84,16 +88,10 @@ impl PartitionedFile {
     }
 }
 
-impl std::fmt::Display for PartitionedFile {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        write!(f, "{}", self.file_meta)
-    }
-}
-
-impl From<FileMeta> for PartitionedFile {
-    fn from(file_meta: FileMeta) -> Self {
+impl From<ObjectMeta> for PartitionedFile {
+    fn from(object_meta: ObjectMeta) -> Self {
         PartitionedFile {
-            file_meta,
+            object_meta,
             partition_values: vec![],
             range: None,
         }
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 1b0807085..abf5394fb 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -126,7 +126,7 @@ impl ListingTableConfig {
             .await
             .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
 
-        let file_type = file.path().rsplit('.').next().ok_or_else(|| {
+        let file_type = file.location.as_ref().rsplit('.').next().ok_or_else(|| {
             DataFusionError::Internal("Unable to infer file suffix".into())
         })?;
 
@@ -394,7 +394,7 @@ impl ListingTable {
             let statistics = if self.options.collect_stat {
                 self.options
                     .format
-                    .infer_stats(&store, self.file_schema.clone(), &part_file.file_meta)
+                    .infer_stats(&store, self.file_schema.clone(), &part_file.object_meta)
                     .await?
             } else {
                 Statistics::default()
diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs
index 041a6ab7f..8676f2118 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -17,13 +17,12 @@
 
 use crate::datasource::object_store::ObjectStoreUrl;
 use datafusion_common::{DataFusionError, Result};
-use datafusion_data_access::object_store::ObjectStore;
-use datafusion_data_access::FileMeta;
 use futures::stream::BoxStream;
 use futures::{StreamExt, TryStreamExt};
 use glob::Pattern;
 use itertools::Itertools;
-use std::path::is_separator;
+use object_store::path::Path;
+use object_store::{ObjectMeta, ObjectStore};
 use url::Url;
 
 /// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
@@ -32,6 +31,8 @@ use url::Url;
 pub struct ListingTableUrl {
     /// A URL that identifies a file or directory to list files from
     url: Url,
+    /// The path prefix
+    prefix: Path,
     /// An optional glob expression used to filter files
     glob: Option<Pattern>,
 }
@@ -79,7 +80,7 @@ impl ListingTableUrl {
         }
 
         match Url::parse(s) {
-            Ok(url) => Ok(Self { url, glob: None }),
+            Ok(url) => Ok(Self::new(url, None)),
             Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
             Err(e) => Err(DataFusionError::External(Box::new(e))),
         }
@@ -102,7 +103,13 @@ impl ListingTableUrl {
             false => Url::from_directory_path(path).unwrap(),
         };
 
-        Ok(Self { url, glob })
+        Ok(Self::new(url, glob))
+    }
+
+    /// Creates a new [`ListingTableUrl`] from a url and optional glob expression
+    fn new(url: Url, glob: Option<Pattern>) -> Self {
+        let prefix = Path::parse(url.path()).expect("should be URL safe");
+        Self { url, prefix, glob }
     }
 
     /// Returns the URL scheme
@@ -110,46 +117,19 @@ impl ListingTableUrl {
         self.url.scheme()
     }
 
-    /// Returns the path as expected by [`ObjectStore`]
-    ///
-    /// In particular for file scheme URLs, this is an absolute
-    /// on the local filesystem in the OS-specific path representation
-    ///
-    /// For other URLs, this is a the host and path of the URL,
-    /// delimited by `/`, and with no leading `/`
-    ///
-    /// TODO: Handle paths consistently (#2489)
-    fn prefix(&self) -> &str {
-        match self.scheme() {
-            "file" => match cfg!(target_family = "windows") {
-                true => self.url.path().strip_prefix('/').unwrap(),
-                false => self.url.path(),
-            },
-            _ => &self.url[url::Position::BeforeHost..url::Position::AfterPath],
-        }
-    }
-
     /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
     /// an iterator of the remaining path segments
-    ///
-    /// TODO: Handle paths consistently (#2489)
     pub(crate) fn strip_prefix<'a, 'b: 'a>(
         &'a self,
-        path: &'b str,
+        path: &'b Path,
     ) -> Option<impl Iterator<Item = &'b str> + 'a> {
-        let prefix = self.prefix();
-        // Ignore empty path segments
-        let diff = itertools::diff_with(
-            path.split(is_separator).filter(|s| !s.is_empty()),
-            prefix.split(is_separator).filter(|s| !s.is_empty()),
-            |a, b| a == b,
-        );
-
-        match diff {
-            // Match with remaining
-            Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath),
-            _ => None,
-        }
+        use object_store::path::DELIMITER;
+        let path: &str = path.as_ref();
+        let stripped = match self.prefix.as_ref() {
+            "" => path,
+            p => path.strip_prefix(p)?.strip_prefix(DELIMITER)?,
+        };
+        Some(stripped.split(DELIMITER))
     }
 
     /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
@@ -157,31 +137,34 @@ impl ListingTableUrl {
         &'a self,
         store: &'a dyn ObjectStore,
         file_extension: &'a str,
-    ) -> BoxStream<'a, Result<FileMeta>> {
-        futures::stream::once(async move {
-            let prefix = self.prefix();
-            store.list_file(prefix.as_ref()).await
-        })
-        .try_flatten()
-        .map_err(DataFusionError::IoError)
-        .try_filter(move |meta| {
-            let path = meta.path();
-
-            let extension_match = path.ends_with(file_extension);
-            let glob_match = match &self.glob {
-                Some(glob) => match self.strip_prefix(path) {
-                    Some(mut segments) => {
-                        let stripped = segments.join("/");
-                        glob.matches(&stripped)
-                    }
-                    None => false,
-                },
-                None => true,
-            };
-
-            futures::future::ready(extension_match && glob_match)
-        })
-        .boxed()
+    ) -> BoxStream<'a, Result<ObjectMeta>> {
+        // If the prefix is a file, use a head request, otherwise list
+        let is_dir = self.url.as_str().ends_with('/');
+        let list = match is_dir {
+            true => futures::stream::once(store.list(Some(&self.prefix)))
+                .try_flatten()
+                .boxed(),
+            false => futures::stream::once(store.head(&self.prefix)).boxed(),
+        };
+
+        list.map_err(Into::into)
+            .try_filter(move |meta| {
+                let path = &meta.location;
+                let extension_match = path.as_ref().ends_with(file_extension);
+                let glob_match = match &self.glob {
+                    Some(glob) => match self.strip_prefix(path) {
+                        Some(mut segments) => {
+                            let stripped = segments.join("/");
+                            glob.matches(&stripped)
+                        }
+                        None => false,
+                    },
+                    None => true,
+                };
+
+                futures::future::ready(extension_match && glob_match)
+            })
+            .boxed()
     }
 
     /// Returns this [`ListingTableUrl`] as a string
@@ -250,23 +233,32 @@ mod tests {
         let root = root.to_string_lossy();
 
         let url = ListingTableUrl::parse(&root).unwrap();
-        let child = format!("{}/partition/file", root);
+        let child = url.prefix.child("partition").child("file");
 
         let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
         assert_eq!(prefix, vec!["partition", "file"]);
+
+        let url = ListingTableUrl::parse("file:///").unwrap();
+        let child = Path::parse("/foo/bar").unwrap();
+        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
+        assert_eq!(prefix, vec!["foo", "bar"]);
+
+        let url = ListingTableUrl::parse("file:///foo").unwrap();
+        let child = Path::parse("/foob/bar").unwrap();
+        assert!(url.strip_prefix(&child).is_none());
     }
 
     #[test]
     fn test_prefix_s3() {
         let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
-        assert_eq!(url.prefix(), "bucket/foo/bar");
+        assert_eq!(url.prefix.as_ref(), "foo/bar");
 
-        let path = "bucket/foo/bar/partition/foo.parquet";
-        let prefix: Vec<_> = url.strip_prefix(path).unwrap().collect();
+        let path = Path::from("foo/bar/partition/foo.parquet");
+        let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
         assert_eq!(prefix, vec!["partition", "foo.parquet"]);
 
-        let path = "other-bucket/foo/bar/partition/foo.parquet";
-        assert!(url.strip_prefix(path).is_none());
+        let path = Path::from("other/bar/partition/foo.parquet");
+        assert!(url.strip_prefix(&path).is_none());
     }
 
     #[test]
diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs
index ac7e1a847..aca5b0ca4 100644
--- a/datafusion/core/src/datasource/object_store.rs
+++ b/datafusion/core/src/datasource/object_store.rs
@@ -20,8 +20,8 @@
 //! and query data inside these systems.
 
 use datafusion_common::{DataFusionError, Result};
-use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME};
-use datafusion_data_access::object_store::ObjectStore;
+use object_store::local::LocalFileSystem;
+use object_store::ObjectStore;
 use parking_lot::RwLock;
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -84,7 +84,7 @@ impl std::fmt::Display for ObjectStoreUrl {
 /// Object store registry
 pub struct ObjectStoreRegistry {
     /// A map from scheme to object store that serve list / read operations for the store
-    pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+    object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
 }
 
 impl std::fmt::Debug for ObjectStoreRegistry {
@@ -109,8 +109,7 @@ impl ObjectStoreRegistry {
     /// ['LocalFileSystem'] store is registered in by default to support read local files natively.
     pub fn new() -> Self {
         let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
-        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
-
+        map.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
         Self {
             object_stores: RwLock::new(map),
         }
@@ -120,34 +119,32 @@ impl ObjectStoreRegistry {
     /// If a store of the same prefix existed before, it is replaced in the registry and returned.
     pub fn register_store(
         &self,
-        scheme: String,
+        scheme: impl AsRef<str>,
+        host: impl AsRef<str>,
         store: Arc<dyn ObjectStore>,
     ) -> Option<Arc<dyn ObjectStore>> {
         let mut stores = self.object_stores.write();
-        stores.insert(scheme, store)
-    }
-
-    /// Get the store registered for scheme
-    pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
-        let stores = self.object_stores.read();
-        stores.get(scheme).cloned()
+        let s = format!("{}://{}", scheme.as_ref(), host.as_ref());
+        stores.insert(s, store)
     }
 
     /// Get a suitable store for the provided URL. For example:
     ///
-    /// - URL with scheme `file://` or no schema will return the default LocalFS store
-    /// - URL with scheme `s3://` will return the S3 store if it's registered
+    /// - URL with scheme `file:///` or no schema will return the default LocalFS store
+    /// - URL with scheme `s3://bucket/` will return the S3 store if it's registered
     ///
     pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
         let url = url.as_ref();
-        let store = self.get(url.scheme()).ok_or_else(|| {
+        let s = &url[url::Position::BeforeScheme..url::Position::AfterHost];
+        let stores = self.object_stores.read();
+        let store = stores.get(s).ok_or_else(|| {
             DataFusionError::Internal(format!(
                 "No suitable object store found for {}",
                 url
             ))
         })?;
 
-        Ok(store)
+        Ok(store.clone())
     }
 }
 
@@ -155,7 +152,6 @@ impl ObjectStoreRegistry {
 mod tests {
     use super::*;
     use crate::datasource::listing::ListingTableUrl;
-    use datafusion_data_access::object_store::local::LocalFileSystem;
     use std::sync::Arc;
 
     #[test]
@@ -197,7 +193,7 @@ mod tests {
     #[test]
     fn test_get_by_url_s3() {
         let sut = ObjectStoreRegistry::default();
-        sut.register_store("s3".to_string(), Arc::new(LocalFileSystem {}));
+        sut.register_store("s3", "bucket", Arc::new(LocalFileSystem::new()));
         let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
         sut.get_by_url(&url).unwrap();
     }
diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs
index 2f134990f..d810c882f 100644
--- a/datafusion/core/src/execution/runtime_env.rs
+++ b/datafusion/core/src/execution/runtime_env.rs
@@ -28,7 +28,7 @@ use crate::{
 
 use crate::datasource::object_store::ObjectStoreRegistry;
 use datafusion_common::DataFusionError;
-use datafusion_data_access::object_store::ObjectStore;
+use object_store::ObjectStore;
 use std::fmt::{Debug, Formatter};
 use std::path::PathBuf;
 use std::sync::Arc;
@@ -92,12 +92,12 @@ impl RuntimeEnv {
     /// Returns the `ObjectStore` previously registered for this scheme, if any
     pub fn register_object_store(
         &self,
-        scheme: impl Into<String>,
+        scheme: impl AsRef<str>,
+        host: impl AsRef<str>,
         object_store: Arc<dyn ObjectStore>,
     ) -> Option<Arc<dyn ObjectStore>> {
-        let scheme = scheme.into();
         self.object_store_registry
-            .register_store(scheme, object_store)
+            .register_store(scheme, host, object_store)
     }
 
     /// Retrieves a `ObjectStore` instance for a url
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index 33501a10f..c100f4224 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -228,7 +228,6 @@ pub use parquet;
 
 // re-export DataFusion crates
 pub use datafusion_common as common;
-pub use datafusion_data_access;
 pub use datafusion_expr as logical_expr;
 pub use datafusion_optimizer as optimizer;
 pub use datafusion_physical_expr as physical_expr;
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index d03951990..56d78fb9e 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -16,23 +16,17 @@
 // under the License.
 
 //! Execution plan for reading line-delimited Avro files
-#[cfg(feature = "avro")]
-use crate::avro_to_arrow;
 use crate::error::Result;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::{
     DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
 };
 use arrow::datatypes::SchemaRef;
-#[cfg(feature = "avro")]
-use arrow::error::ArrowError;
 
 use crate::execution::context::TaskContext;
 use std::any::Any;
 use std::sync::Arc;
 
-#[cfg(feature = "avro")]
-use super::file_stream::{BatchIter, FileStream};
 use super::FileScanConfig;
 
 /// Execution plan for scanning Avro data source
@@ -109,39 +103,17 @@ impl ExecutionPlan for AvroExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        let proj = self.base_config.projected_file_column_names();
-
-        let batch_size = context.session_config().batch_size();
-        let file_schema = Arc::clone(&self.base_config.file_schema);
-
-        // The avro reader cannot limit the number of records, so `remaining` is ignored.
-        let fun = move |file, _remaining: &Option<usize>| {
-            let reader_res = avro_to_arrow::Reader::try_new(
-                file,
-                Arc::clone(&file_schema),
-                batch_size,
-                proj.clone(),
-            );
-            match reader_res {
-                Ok(r) => Box::new(r) as BatchIter,
-                Err(e) => Box::new(
-                    vec![Err(ArrowError::ExternalError(Box::new(e)))].into_iter(),
-                ),
-            }
-        };
-
-        let object_store = context
-            .runtime_env()
-            .object_store(&self.base_config.object_store_url)?;
-
-        Ok(Box::pin(FileStream::new(
-            object_store,
-            self.base_config.file_groups[partition].clone(),
-            fun,
-            Arc::clone(&self.projected_schema),
-            self.base_config.limit,
-            self.base_config.table_partition_cols.clone(),
-        )))
+        use super::file_stream::FileStream;
+
+        let config = Arc::new(avro::AvroConfig {
+            schema: Arc::clone(&self.base_config.file_schema),
+            batch_size: context.session_config().batch_size(),
+            projection: self.base_config.projected_file_column_names(),
+        });
+        let opener = avro::AvroOpener { config };
+
+        let stream = FileStream::new(&self.base_config, partition, context, opener)?;
+        Ok(Box::pin(stream))
     }
 
     fn fmt_as(
@@ -166,6 +138,64 @@ impl ExecutionPlan for AvroExec {
     }
 }
 
+#[cfg(feature = "avro")]
+mod avro {
+    use super::*;
+    use crate::datasource::listing::FileRange;
+    use crate::physical_plan::file_format::file_stream::{FormatReader, ReaderFuture};
+    use bytes::Buf;
+    use futures::StreamExt;
+    use object_store::{GetResult, ObjectMeta, ObjectStore};
+
+    pub struct AvroConfig {
+        pub schema: SchemaRef,
+        pub batch_size: usize,
+        pub projection: Option<Vec<String>>,
+    }
+
+    impl AvroConfig {
+        fn open<R: std::io::Read>(
+            &self,
+            reader: R,
+        ) -> Result<crate::avro_to_arrow::Reader<'static, R>> {
+            crate::avro_to_arrow::Reader::try_new(
+                reader,
+                self.schema.clone(),
+                self.batch_size,
+                self.projection.clone(),
+            )
+        }
+    }
+
+    pub struct AvroOpener {
+        pub config: Arc<AvroConfig>,
+    }
+
+    impl FormatReader for AvroOpener {
+        fn open(
+            &self,
+            store: Arc<dyn ObjectStore>,
+            file: ObjectMeta,
+            _range: Option<FileRange>,
+        ) -> ReaderFuture {
+            let config = self.config.clone();
+            Box::pin(async move {
+                match store.get(&file.location).await? {
+                    GetResult::File(file, _) => {
+                        let reader = config.open(file)?;
+                        Ok(futures::stream::iter(reader).boxed())
+                    }
+                    r @ GetResult::Stream(_) => {
+                        let bytes = r.bytes().await?;
+                        let reader = config.open(bytes.reader())?;
+                        Ok(futures::stream::iter(reader).boxed())
+                    }
+                }
+            })
+        }
+    }
+}
+
 #[cfg(test)]
 #[cfg(feature = "avro")]
 mod tests {
@@ -174,11 +204,10 @@ mod tests {
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::prelude::SessionContext;
     use crate::scalar::ScalarValue;
+    use crate::test::object_store::local_unpartitioned_file;
     use arrow::datatypes::{DataType, Field, Schema};
-    use datafusion_data_access::object_store::local::{
-        local_unpartitioned_file, LocalFileSystem,
-    };
     use futures::StreamExt;
+    use object_store::local::LocalFileSystem;
 
     use super::*;
 
@@ -186,7 +215,7 @@ mod tests {
     async fn avro_exec_without_partition() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let filename = format!("{}/avro/alltypes_plain.avro", testdata);
-        let store = Arc::new(LocalFileSystem {}) as _;
+        let store = Arc::new(LocalFileSystem::new()) as _;
         let meta = local_unpartitioned_file(filename);
 
         let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?;
@@ -246,7 +275,7 @@ mod tests {
     async fn avro_exec_missing_column() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let filename = format!("{}/avro/alltypes_plain.avro", testdata);
-        let object_store = Arc::new(LocalFileSystem {}) as _;
+        let object_store = Arc::new(LocalFileSystem::new()) as _;
         let object_store_url = ObjectStoreUrl::local_filesystem();
         let meta = local_unpartitioned_file(filename);
         let actual_schema = AvroFormat {}
@@ -315,7 +344,7 @@ mod tests {
     async fn avro_exec_with_partition() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let filename = format!("{}/avro/alltypes_plain.avro", testdata);
-        let object_store = Arc::new(LocalFileSystem {}) as _;
+        let object_store = Arc::new(LocalFileSystem::new()) as _;
         let object_store_url = ObjectStoreUrl::local_filesystem();
         let meta = local_unpartitioned_file(filename);
         let file_schema = AvroFormat {}
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 0f8273ec1..186da2ad6 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -24,16 +24,21 @@ use crate::physical_plan::{
     DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
 };
 
+use crate::datasource::listing::FileRange;
+use crate::physical_plan::file_format::file_stream::{
+    FileStream, FormatReader, ReaderFuture,
+};
 use arrow::csv;
 use arrow::datatypes::SchemaRef;
+use bytes::Buf;
 use futures::{StreamExt, TryStreamExt};
+use object_store::{GetResult, ObjectMeta, ObjectStore};
 use std::any::Any;
 use std::fs;
 use std::path::Path;
 use std::sync::Arc;
 use tokio::task::{self, JoinHandle};
 
-use super::file_stream::{BatchIter, FileStream};
 use super::FileScanConfig;
 
 /// Execution plan for scanning a CSV file
@@ -115,40 +120,17 @@ impl ExecutionPlan for CsvExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        let batch_size = context.session_config().batch_size();
-        let file_schema = Arc::clone(&self.base_config.file_schema);
-        let file_projection = self.base_config.file_column_projection_indices();
-        let has_header = self.has_header;
-        let delimiter = self.delimiter;
-        let start_line = if has_header { 1 } else { 0 };
-
-        let fun = move |file, remaining: &Option<usize>| {
-            let bounds = remaining.map(|x| (0, x + start_line));
-            let datetime_format = None;
-            Box::new(csv::Reader::new(
-                file,
-                Arc::clone(&file_schema),
-                has_header,
-                Some(delimiter),
-                batch_size,
-                bounds,
-                file_projection.clone(),
-                datetime_format,
-            )) as BatchIter
-        };
-
-        let object_store = context
-            .runtime_env()
-            .object_store(&self.base_config.object_store_url)?;
-
-        Ok(Box::pin(FileStream::new(
-            object_store,
-            self.base_config.file_groups[partition].clone(),
-            fun,
-            Arc::clone(&self.projected_schema),
-            self.base_config.limit,
-            self.base_config.table_partition_cols.clone(),
-        )))
+        let config = Arc::new(CsvConfig {
+            batch_size: context.session_config().batch_size(),
+            file_schema: Arc::clone(&self.base_config.file_schema),
+            file_projection: self.base_config.file_column_projection_indices(),
+            has_header: self.has_header,
+            delimiter: self.delimiter,
+        });
+
+        let opener = CsvOpener { config };
+        let stream = FileStream::new(&self.base_config, partition, context, opener)?;
+        Ok(Box::pin(stream) as SendableRecordBatchStream)
     }
 
     fn fmt_as(
@@ -175,6 +157,57 @@ impl ExecutionPlan for CsvExec {
     }
 }
 
+#[derive(Debug, Clone)]
+struct CsvConfig {
+    batch_size: usize,
+    file_schema: SchemaRef,
+    file_projection: Option<Vec<usize>>,
+    has_header: bool,
+    delimiter: u8,
+}
+
+impl CsvConfig {
+    fn open<R: std::io::Read>(&self, reader: R) -> csv::Reader<R> {
+        let datetime_format = None;
+        csv::Reader::new(
+            reader,
+            Arc::clone(&self.file_schema),
+            self.has_header,
+            Some(self.delimiter),
+            self.batch_size,
+            None,
+            self.file_projection.clone(),
+            datetime_format,
+        )
+    }
+}
+
+struct CsvOpener {
+    config: Arc<CsvConfig>,
+}
+
+impl FormatReader for CsvOpener {
+    fn open(
+        &self,
+        store: Arc<dyn ObjectStore>,
+        file: ObjectMeta,
+        _range: Option<FileRange>,
+    ) -> ReaderFuture {
+        let config = self.config.clone();
+        Box::pin(async move {
+            match store.get(&file.location).await? {
+                GetResult::File(file, _) => {
+                    Ok(futures::stream::iter(config.open(file)).boxed())
+                }
+                r @ GetResult::Stream(_) => {
+                    let bytes = r.bytes().await?;
+                    Ok(futures::stream::iter(config.open(bytes.reader())).boxed())
+                }
+            }
+        })
+    }
+}
+
 pub async fn plan_to_csv(
     state: &SessionState,
     plan: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 97a95b568..ca57028ab 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -21,51 +21,43 @@
 //! Note: Most traits here need to be marked `Sync + Send` to be
 //! compliant with the `SendableRecordBatchStream` trait.
 
-use crate::datasource::listing::PartitionedFile;
-use crate::{physical_plan::RecordBatchStream, scalar::ScalarValue};
-use arrow::{
-    datatypes::SchemaRef,
-    error::{ArrowError, Result as ArrowResult},
-    record_batch::RecordBatch,
-};
-use datafusion_data_access::object_store::ObjectStore;
-use futures::Stream;
-use std::{
-    io::Read,
-    iter,
-    pin::Pin,
-    sync::Arc,
-    task::{Context, Poll},
-};
-
-use super::PartitionColumnProjector;
-
-pub type FileIter = Box<dyn Iterator<Item = PartitionedFile> + Send + Sync>;
-pub type BatchIter = Box<dyn Iterator<Item = ArrowResult<RecordBatch>> + Send + Sync>;
-
-/// A closure that creates a file format reader (iterator over `RecordBatch`) from a `Read` object
-/// and an optional number of required records.
-pub trait FormatReaderOpener:
-    FnMut(Box<dyn Read + Send + Sync>, &Option<usize>) -> BatchIter + Send + Unpin + 'static
-{
-}
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+use futures::{ready, FutureExt, Stream, StreamExt};
+use object_store::{ObjectMeta, ObjectStore};
+
+use datafusion_common::ScalarValue;
+
+use crate::datasource::listing::{FileRange, PartitionedFile};
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::file_format::{FileScanConfig, PartitionColumnProjector};
+use crate::physical_plan::RecordBatchStream;
+
+/// A fallible future that resolves to a stream of [`RecordBatch`]
+pub type ReaderFuture =
+    BoxFuture<'static, Result<BoxStream<'static, ArrowResult<RecordBatch>>>>;
 
-impl<T> FormatReaderOpener for T where
-    T: FnMut(Box<dyn Read + Send + Sync>, &Option<usize>) -> BatchIter
-        + Send
-        + Unpin
-        + 'static
-{
+pub trait FormatReader: Unpin {
+    fn open(
+        &self,
+        store: Arc<dyn ObjectStore>,
+        file: ObjectMeta,
+        range: Option<FileRange>,
+    ) -> ReaderFuture;
 }
 
 /// A stream that iterates record batch by record batch, file over file.
-pub struct FileStream<F: FormatReaderOpener> {
-    /// An iterator over record batches of the last file returned by file_iter
-    batch_iter: BatchIter,
-    /// Partitioning column values for the current batch_iter
-    partition_values: Vec<ScalarValue>,
+pub struct FileStream<F: FormatReader> {
     /// An iterator over input files.
-    file_iter: FileIter,
+    file_iter: VecDeque<PartitionedFile>,
     /// The stream schema (file schema including partition columns and after
     /// projection).
     projected_schema: SchemaRef,
@@ -80,104 +72,155 @@ pub struct FileStream<F: FormatReaderOpener> {
     pc_projector: PartitionColumnProjector,
     /// the store from which to source the files.
     object_store: Arc<dyn ObjectStore>,
+    /// The stream state
+    state: FileStreamState,
 }
 
-impl<F: FormatReaderOpener> FileStream<F> {
+enum FileStreamState {
+    /// The idle state, no file is currently being read
+    Idle,
+    /// Currently performing asynchronous IO to obtain a stream of RecordBatch
+    /// for a given parquet file
+    Open {
+        /// A [`ReaderFuture`] returned by [`FormatReader::open`]
+        future: ReaderFuture,
+        /// The partition values for this file
+        partition_values: Vec<ScalarValue>,
+    },
+    /// Scanning the [`BoxStream`] returned by the completion of a [`ReaderFuture`]
+    /// returned by [`FormatReader::open`]
+    Scan {
+        /// Partitioning column values for the current batch_iter
+        partition_values: Vec<ScalarValue>,
+        /// The reader instance
+        reader: BoxStream<'static, ArrowResult<RecordBatch>>,
+    },
+    /// Encountered an error
+    Error,
+    /// Reached the row limit
+    Limit,
+}
+
+impl<F: FormatReader> FileStream<F> {
     pub fn new(
-        object_store: Arc<dyn ObjectStore>,
-        files: Vec<PartitionedFile>,
+        config: &FileScanConfig,
+        partition: usize,
+        context: Arc<TaskContext>,
         file_reader: F,
-        projected_schema: SchemaRef,
-        limit: Option<usize>,
-        table_partition_cols: Vec<String>,
-    ) -> Self {
+    ) -> Result<Self> {
+        let (projected_schema, _) = config.project();
         let pc_projector = PartitionColumnProjector::new(
-            Arc::clone(&projected_schema),
-            &table_partition_cols,
+            projected_schema.clone(),
+            &config.table_partition_cols,
         );
 
-        Self {
-            file_iter: Box::new(files.into_iter()),
-            batch_iter: Box::new(iter::empty()),
-            partition_values: vec![],
-            remain: limit,
+        let files = config.file_groups[partition].clone();
+
+        let object_store = context
+            .runtime_env()
+            .object_store(&config.object_store_url)?;
+
+        Ok(Self {
+            file_iter: files.into(),
             projected_schema,
+            remain: config.limit,
             file_reader,
             pc_projector,
             object_store,
-        }
+            state: FileStreamState::Idle,
+        })
     }
 
-    /// Acts as a flat_map of record batches over files. Adds the partitioning
-    /// Columns to the returned record batches.
-    fn next_batch(&mut self) -> Option<ArrowResult<RecordBatch>> {
-        match self.batch_iter.next() {
-            Some(Ok(batch)) => {
-                Some(self.pc_projector.project(batch, &self.partition_values))
-            }
-            Some(Err(e)) => Some(Err(e)),
-            None => match self.file_iter.next() {
-                Some(f) => {
-                    self.partition_values = f.partition_values;
-                    self.object_store
-                        .file_reader(f.file_meta.sized_file)
-                        .and_then(|r| r.sync_reader())
-                        .map_err(|e| ArrowError::ExternalError(Box::new(e)))
-                        .and_then(|f| {
-                            self.batch_iter = (self.file_reader)(f, &self.remain);
-                            self.next_batch().transpose()
-                        })
-                        .transpose()
+    fn poll_inner(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<ArrowResult<RecordBatch>>> {
+        loop {
+            match &mut self.state {
+                FileStreamState::Idle => {
+                    let file = match self.file_iter.pop_front() {
+                        Some(file) => file,
+                        None => return Poll::Ready(None),
+                    };
+
+                    let future = self.file_reader.open(
+                        self.object_store.clone(),
+                        file.object_meta,
+                        file.range,
+                    );
+
+                    self.state = FileStreamState::Open {
+                        future,
+                        partition_values: file.partition_values,
+                    }
                 }
-                None => None,
-            },
+                FileStreamState::Open {
+                    future,
+                    partition_values,
+                } => match ready!(future.poll_unpin(cx)) {
+                    Ok(reader) => {
+                        self.state = FileStreamState::Scan {
+                            partition_values: std::mem::take(partition_values),
+                            reader,
+                        };
+                    }
+                    Err(e) => {
+                        self.state = FileStreamState::Error;
+                        return Poll::Ready(Some(Err(e.into())));
+                    }
+                },
+                FileStreamState::Scan {
+                    reader,
+                    partition_values,
+                } => match ready!(reader.poll_next_unpin(cx)) {
+                    Some(result) => {
+                        let result = result
+                            .and_then(|b| self.pc_projector.project(b, partition_values))
+                            .map(|batch| match &mut self.remain {
+                                Some(remain) => {
+                                    if *remain > batch.num_rows() {
+                                        *remain -= batch.num_rows();
+                                        batch
+                                    } else {
+                                        let batch = batch.slice(0, *remain);
+                                        self.state = FileStreamState::Limit;
+                                        *remain = 0;
+                                        batch
+                                    }
+                                }
+                                None => batch,
+                            });
+
+                        if result.is_err() {
+                            self.state = FileStreamState::Error
+                        }
+
+                        return Poll::Ready(Some(result));
+                    }
+                    None => self.state = FileStreamState::Idle,
+                },
+                FileStreamState::Error | FileStreamState::Limit => {
+                    return Poll::Ready(None)
+                }
+            }
         }
     }
 }
 
-impl<F: FormatReaderOpener> Stream for FileStream<F> {
+impl<F: FormatReader> Stream for FileStream<F> {
     type Item = ArrowResult<RecordBatch>;
 
     fn poll_next(
         mut self: Pin<&mut Self>,
-        _cx: &mut Context<'_>,
+        cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        // check if finished or no limit
-        match self.remain {
-            Some(r) if r == 0 => return Poll::Ready(None),
-            None => return Poll::Ready(self.get_mut().next_batch()),
-            Some(r) => r,
-        };
-
-        Poll::Ready(match self.as_mut().next_batch() {
-            Some(Ok(item)) => {
-                if let Some(remain) = self.remain.as_mut() {
-                    if *remain >= item.num_rows() {
-                        *remain -= item.num_rows();
-                        Some(Ok(item))
-                    } else {
-                        let len = *remain;
-                        *remain = 0;
-                        Some(Ok(RecordBatch::try_new(
-                            item.schema(),
-                            item.columns()
-                                .iter()
-                                .map(|column| column.slice(0, len))
-                                .collect(),
-                        )?))
-                    }
-                } else {
-                    Some(Ok(item))
-                }
-            }
-            other => other,
-        })
+        self.poll_inner(cx)
     }
 }
 
-impl<F: FormatReaderOpener> RecordBatchStream for FileStream<F> {
+impl<F: FormatReader> RecordBatchStream for FileStream<F> {
     fn schema(&self) -> SchemaRef {
-        Arc::clone(&self.projected_schema)
+        self.projected_schema.clone()
     }
 }
 
@@ -186,33 +229,54 @@ mod tests {
     use futures::StreamExt;
 
     use super::*;
+    use crate::datasource::object_store::ObjectStoreUrl;
+    use crate::prelude::SessionContext;
     use crate::{
         error::Result,
-        test::{make_partition, object_store::TestObjectStore},
+        test::{make_partition, object_store::register_test_store},
     };
 
+    struct TestOpener {
+        records: Vec<RecordBatch>,
+    }
+
+    impl FormatReader for TestOpener {
+        fn open(
+            &self,
+            _store: Arc<dyn ObjectStore>,
+            _file: ObjectMeta,
+            _range: Option<FileRange>,
+        ) -> ReaderFuture {
+            let iterator = self.records.clone().into_iter().map(Ok);
+            let stream = futures::stream::iter(iterator).boxed();
+            futures::future::ready(Ok(stream)).boxed()
+        }
+    }
+
     /// helper that creates a stream of 2 files with the same pair of batches in each ([0,1,2] and [0,1])
     async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
         let records = vec![make_partition(3), make_partition(2)];
+        let file_schema = records[0].schema();
 
-        let source_schema = records[0].schema();
+        let reader = TestOpener { records };
 
-        let reader = move |_file, _remain: &Option<usize>| {
-            // this reader returns the same batch regardless of the file
-            Box::new(records.clone().into_iter().map(Ok)) as BatchIter
-        };
+        let ctx = SessionContext::new();
+        register_test_store(&ctx, &[("mock_file1", 10), ("mock_file2", 20)]);
 
-        let file_stream = FileStream::new(
-            TestObjectStore::new_arc(&[("mock_file1", 10), ("mock_file2", 20)]),
-            vec![
+        let config = FileScanConfig {
+            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+            file_schema,
+            file_groups: vec![vec![
                 PartitionedFile::new("mock_file1".to_owned(), 10),
                 PartitionedFile::new("mock_file2".to_owned(), 20),
-            ],
-            reader,
-            source_schema,
+            ]],
+            statistics: Default::default(),
+            projection: None,
             limit,
-            vec![],
-        );
+            table_partition_cols: vec![],
+        };
+
+        let file_stream = FileStream::new(&config, 0, ctx.task_ctx(), reader).unwrap();
 
         file_stream
             .map(|b| b.expect("No error expected in stream"))
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index a829218f4..385ac427c 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -18,22 +18,27 @@
 //! Execution plan for reading line-delimited JSON files
 use arrow::json::reader::DecoderOptions;
 
+use crate::datasource::listing::FileRange;
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::SessionState;
 use crate::execution::context::TaskContext;
 use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::file_format::file_stream::{
+    FileStream, FormatReader, ReaderFuture,
+};
 use crate::physical_plan::{
     DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
 };
 use arrow::{datatypes::SchemaRef, json};
+use bytes::Buf;
 use futures::{StreamExt, TryStreamExt};
+use object_store::{GetResult, ObjectMeta, ObjectStore};
 use std::any::Any;
 use std::fs;
 use std::path::Path;
 use std::sync::Arc;
 use tokio::task::{self, JoinHandle};
 
-use super::file_stream::{BatchIter, FileStream};
 use super::FileScanConfig;
 
 /// Execution plan for scanning NdJson data source
@@ -99,35 +104,21 @@ impl ExecutionPlan for NdJsonExec {
         let batch_size = context.session_config().batch_size();
         let file_schema = Arc::clone(&self.base_config.file_schema);
 
-        // The json reader cannot limit the number of records, so `remaining` is ignored.
-        let fun = move |file, _remaining: &Option<usize>| {
-            // TODO: make DecoderOptions implement Clone so we can
-            // clone here rather than recreating the options each time
-            // https://github.com/apache/arrow-rs/issues/1580
-            let options = DecoderOptions::new().with_batch_size(batch_size);
-
-            let options = if let Some(proj) = proj.clone() {
-                options.with_projection(proj)
-            } else {
-                options
-            };
-
-            Box::new(json::Reader::new(file, Arc::clone(&file_schema), options))
-                as BatchIter
+        let options = DecoderOptions::new().with_batch_size(batch_size);
+        let options = if let Some(proj) = proj {
+            options.with_projection(proj)
+        } else {
+            options
+        };
+
+        let opener = JsonOpener {
+            file_schema,
+            options,
         };
 
-        let object_store = context
-            .runtime_env()
-            .object_store(&self.base_config.object_store_url)?;
-
-        Ok(Box::pin(FileStream::new(
-            object_store,
-            self.base_config.file_groups[partition].clone(),
-            fun,
-            Arc::clone(&self.projected_schema),
-            self.base_config.limit,
-            self.base_config.table_partition_cols.clone(),
-        )))
+        let stream = FileStream::new(&self.base_config, partition, context, opener)?;
+
+        Ok(Box::pin(stream) as SendableRecordBatchStream)
     }
 
     fn fmt_as(
@@ -152,6 +143,38 @@ impl ExecutionPlan for NdJsonExec {
     }
 }
 
+struct JsonOpener {
+    options: DecoderOptions,
+    file_schema: SchemaRef,
+}
+
+impl FormatReader for JsonOpener {
+    fn open(
+        &self,
+        store: Arc<dyn ObjectStore>,
+        file: ObjectMeta,
+        _range: Option<FileRange>,
+    ) -> ReaderFuture {
+        let options = self.options.clone();
+        let schema = self.file_schema.clone();
+        Box::pin(async move {
+            match store.get(&file.location).await? {
+                GetResult::File(file, _) => {
+                    let reader = json::Reader::new(file, schema.clone(), options);
+                    Ok(futures::stream::iter(reader).boxed())
+                }
+                r @ GetResult::Stream(_) => {
+                    let bytes = r.bytes().await?;
+                    let reader =
+                        json::Reader::new(bytes.reader(), schema.clone(), options);
+
+                    Ok(futures::stream::iter(reader).boxed())
+                }
+            }
+        })
+    }
+}
+
 pub async fn plan_to_json(
     state: &SessionState,
     plan: Arc<dyn ExecutionPlan>,
@@ -201,7 +224,7 @@ mod tests {
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::prelude::NdJsonReadOptions;
     use crate::prelude::*;
-    use datafusion_data_access::object_store::local::local_unpartitioned_file;
+    use crate::test::object_store::local_unpartitioned_file;
     use tempfile::TempDir;
 
     use super::*;
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 86015c472..e59e8248f 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -163,7 +163,7 @@ impl<'a> Display for FileGroupsDisplay<'a> {
             .iter()
             .map(|pp| {
                 pp.iter()
-                    .map(|pf| pf.file_meta.path())
+                    .map(|pf| pf.object_meta.location.as_ref())
                     .collect::<Vec<_>>()
                     .join(", ")
             })
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index e719499c3..cae92ddbb 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -18,42 +18,40 @@
 //! Execution plan for reading Parquet files
 
 use fmt::Debug;
-use std::collections::VecDeque;
 use std::fmt;
 use std::fs;
-use std::path::Path;
-use std::pin::Pin;
+use std::ops::Range;
 use std::sync::Arc;
-use std::task::{Context, Poll};
 use std::{any::Any, convert::TryInto};
 
 use arrow::{
     array::ArrayRef,
     datatypes::{Schema, SchemaRef},
-    error::{ArrowError, Result as ArrowResult},
-    record_batch::RecordBatch,
+    error::ArrowError,
 };
-use futures::{Stream, StreamExt, TryStreamExt};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
 use log::debug;
-use parquet::arrow::{
-    arrow_reader::ParquetRecordBatchReader, ArrowReader, ArrowWriter,
-    ParquetFileArrowReader, ProjectionMask,
-};
-use parquet::file::reader::FileReader;
+use object_store::{ObjectMeta, ObjectStore};
+use parquet::arrow::async_reader::AsyncFileReader;
+use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
+use parquet::errors::ParquetError;
 use parquet::file::{
-    metadata::RowGroupMetaData, properties::WriterProperties,
-    reader::SerializedFileReader, serialized_reader::ReadOptionsBuilder,
+    metadata::{ParquetMetaData, RowGroupMetaData},
+    properties::WriterProperties,
     statistics::Statistics as ParquetStatistics,
 };
 
 use datafusion_common::Column;
-use datafusion_data_access::object_store::ObjectStore;
 use datafusion_expr::Expr;
 
-use crate::physical_plan::metrics::BaselineMetrics;
-use crate::physical_plan::stream::RecordBatchReceiverStream;
+use crate::datasource::file_format::parquet::fetch_parquet_metadata;
+use crate::datasource::listing::FileRange;
+use crate::physical_plan::file_format::file_stream::{
+    FileStream, FormatReader, ReaderFuture,
+};
 use crate::{
-    datasource::{file_format::parquet::ChunkObjectReader, listing::PartitionedFile},
     error::{DataFusionError, Result},
     execution::context::{SessionState, TaskContext},
     physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
@@ -61,14 +59,12 @@ use crate::{
         expressions::PhysicalSortExpr,
         file_format::{FileScanConfig, SchemaAdapter},
         metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
-        DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
-        SendableRecordBatchStream, Statistics,
+        DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+        Statistics,
     },
     scalar::ScalarValue,
 };
 
-use super::PartitionColumnProjector;
-
 /// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
@@ -94,7 +90,6 @@ struct ParquetFileMetrics {
 
 impl ParquetExec {
     /// Create a new Parquet reader execution plan provided file list and schema.
-    /// Even if `limit` is set, ParquetExec rounds up the number of records to the next `batch_size`.
     pub fn new(base_config: FileScanConfig, predicate: Option<Expr>) -> Self {
         debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
         base_config.file_groups, base_config.projection, predicate, base_config.limit);
@@ -210,52 +205,20 @@ impl ExecutionPlan for ParquetExec {
             Some(proj) => proj,
             None => (0..self.base_config.file_schema.fields().len()).collect(),
         };
-        let partition_col_proj = PartitionColumnProjector::new(
-            Arc::clone(&self.projected_schema),
-            &self.base_config.table_partition_cols,
-        );
 
-        let object_store = context
-            .runtime_env()
-            .object_store(&self.base_config.object_store_url)?;
-
-        let stream = ParquetExecStream {
-            error: false,
+        let opener = ParquetOpener {
             partition_index,
-            metrics: self.metrics.clone(),
-            object_store,
-            pruning_predicate: self.pruning_predicate.clone(),
+            projection: Arc::from(projection),
             batch_size: context.session_config().batch_size(),
-            schema: self.projected_schema.clone(),
-            projection,
-            remaining_rows: self.base_config.limit,
-            reader: None,
-            files: self.base_config.file_groups[partition_index].clone().into(),
-            projector: partition_col_proj,
-            adapter: SchemaAdapter::new(self.base_config.file_schema.clone()),
-            baseline_metrics: BaselineMetrics::new(&self.metrics, partition_index),
+            pruning_predicate: self.pruning_predicate.clone(),
+            table_schema: self.base_config.file_schema.clone(),
+            metrics: self.metrics.clone(),
         };
 
-        // Use spawn_blocking only if running from a tokio context (#2201)
-        match tokio::runtime::Handle::try_current() {
-            Ok(handle) => {
-                let (response_tx, response_rx) = tokio::sync::mpsc::channel(2);
-                let schema = stream.schema();
-                let join_handle = handle.spawn_blocking(move || {
-                    for result in stream {
-                        if response_tx.blocking_send(result).is_err() {
-                            break;
-                        }
-                    }
-                });
-                Ok(RecordBatchReceiverStream::create(
-                    &schema,
-                    response_rx,
-                    join_handle,
-                ))
-            }
-            Err(_) => Ok(Box::pin(stream)),
-        }
+        let stream =
+            FileStream::new(&self.base_config, partition_index, context, opener)?;
+
+        Ok(Box::pin(stream))
     }
 
     fn fmt_as(
@@ -296,164 +259,110 @@ impl ExecutionPlan for ParquetExec {
     }
 }
 
-/// Implements [`RecordBatchStream`] for a collection of [`PartitionedFile`]
-///
-/// NB: This will perform blocking IO synchronously without yielding which may
-/// be problematic in certain contexts (e.g. a tokio runtime that also performs
-/// network IO)
-struct ParquetExecStream {
-    error: bool,
+/// Implements [`FormatReader`] for a parquet file
+struct ParquetOpener {
     partition_index: usize,
-    metrics: ExecutionPlanMetricsSet,
-    object_store: Arc<dyn ObjectStore>,
-    pruning_predicate: Option<PruningPredicate>,
+    projection: Arc<[usize]>,
     batch_size: usize,
-    schema: SchemaRef,
-    projection: Vec<usize>,
-    remaining_rows: Option<usize>,
-    reader: Option<(ParquetRecordBatchReader, PartitionedFile)>,
-    files: VecDeque<PartitionedFile>,
-    projector: PartitionColumnProjector,
-    adapter: SchemaAdapter,
-    baseline_metrics: BaselineMetrics,
+    pruning_predicate: Option<PruningPredicate>,
+    table_schema: SchemaRef,
+    metrics: ExecutionPlanMetricsSet,
 }
 
-impl ParquetExecStream {
-    fn create_reader(
-        &mut self,
-        file: &PartitionedFile,
-    ) -> Result<ParquetRecordBatchReader> {
-        let file_metrics = ParquetFileMetrics::new(
+impl FormatReader for ParquetOpener {
+    fn open(
+        &self,
+        store: Arc<dyn ObjectStore>,
+        meta: ObjectMeta,
+        range: Option<FileRange>,
+    ) -> ReaderFuture {
+        let metrics = ParquetFileMetrics::new(
             self.partition_index,
-            file.file_meta.path(),
+            meta.location.as_ref(),
             &self.metrics,
         );
-        let bytes_scanned = file_metrics.bytes_scanned.clone();
-        let object_reader = self
-            .object_store
-            .file_reader(file.file_meta.sized_file.clone())?;
-
-        let mut opt = ReadOptionsBuilder::new();
-        if let Some(pruning_predicate) = &self.pruning_predicate {
-            opt = opt.with_predicate(build_row_group_predicate(
-                pruning_predicate,
-                file_metrics,
-            ));
-        }
-        if let Some(range) = &file.range {
-            assert!(
-                range.start >= 0 && range.end > 0 && range.end > range.start,
-                "invalid range specified: {:?}",
-                range
-            );
-            opt = opt.with_range(range.start, range.end);
-        }
-
-        let file_reader = SerializedFileReader::new_with_options(
-            ChunkObjectReader {
-                object_reader,
-                bytes_scanned: Some(bytes_scanned),
-            },
-            opt.build(),
-        )?;
-
-        let file_metadata = file_reader.metadata().file_metadata();
-        let parquet_schema = file_metadata.schema_descr_ptr();
-
-        let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
-        let arrow_schema = arrow_reader.get_schema()?;
-
-        let adapted_projections = self
-            .adapter
-            .map_projections(&arrow_schema, &self.projection)?;
 
-        let mask = ProjectionMask::roots(&parquet_schema, adapted_projections);
-        let reader = arrow_reader.get_record_reader_by_columns(mask, self.batch_size)?;
-
-        Ok(reader)
-    }
-}
-
-impl Iterator for ParquetExecStream {
-    type Item = ArrowResult<RecordBatch>;
+        let reader = ParquetFileReader {
+            store,
+            meta,
+            metrics: metrics.clone(),
+        };
 
-    fn next(&mut self) -> Option<Self::Item> {
-        let cloned_time = self.baseline_metrics.elapsed_compute().clone();
-        // records time on drop
-        let _timer = cloned_time.timer();
+        let schema_adapter = SchemaAdapter::new(self.table_schema.clone());
+        let batch_size = self.batch_size;
+        let projection = self.projection.clone();
+        let pruning_predicate = self.pruning_predicate.clone();
 
-        if self.error || matches!(self.remaining_rows, Some(0)) {
-            return None;
-        }
+        Box::pin(async move {
+            let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
+            let adapted_projections =
+                schema_adapter.map_projections(builder.schema(), &projection)?;
 
-        // TODO: Split this out into separate operators (#2079)
-        loop {
-            let (reader, file) = match self.reader.as_mut() {
-                Some(current) => current,
-                None => match self.files.pop_front() {
-                    None => return None,
-                    Some(file) => match self.create_reader(&file) {
-                        Ok(reader) => self.reader.insert((reader, file)),
-                        Err(e) => {
-                            self.error = true;
-                            return Some(Err(ArrowError::ExternalError(Box::new(e))));
-                        }
-                    },
-                },
-            };
+            let mask = ProjectionMask::roots(
+                builder.parquet_schema(),
+                adapted_projections.iter().cloned(),
+            );
 
-            let result = reader.next().map(|result| {
-                result
-                    .and_then(|batch| {
-                        self.adapter
-                            .adapt_batch(batch, &self.projection)
-                            .map_err(|e| ArrowError::ExternalError(Box::new(e)))
-                    })
-                    .and_then(|batch| {
-                        self.projector.project(batch, &file.partition_values)
+            let groups = builder.metadata().row_groups();
+            let row_groups = prune_row_groups(groups, range, pruning_predicate, &metrics);
+
+            let stream = builder
+                .with_projection(mask)
+                .with_batch_size(batch_size)
+                .with_row_groups(row_groups)
+                .build()?;
+
+            let adapted = stream
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))
+                .map(move |maybe_batch| {
+                    maybe_batch.and_then(|b| {
+                        schema_adapter
+                            .adapt_batch(b, &projection)
+                            .map_err(Into::into)
                     })
-            });
-
-            let result = match result {
-                Some(result) => result,
-                None => {
-                    self.reader = None;
-                    continue;
-                }
-            };
-
-            match (&result, self.remaining_rows.as_mut()) {
-                (Ok(batch), Some(remaining_rows)) => {
-                    *remaining_rows = remaining_rows.saturating_sub(batch.num_rows());
-                }
-                _ => self.error = result.is_err(),
-            }
-
-            //record output rows in parquetExec
-            if let Ok(batch) = &result {
-                self.baseline_metrics.record_output(batch.num_rows());
-            }
+                });
 
-            return Some(result);
-        }
+            Ok(adapted.boxed())
+        })
     }
 }
 
-impl Stream for ParquetExecStream {
-    type Item = ArrowResult<RecordBatch>;
+/// Implements [`AsyncFileReader`] for a parquet file in object storage
+struct ParquetFileReader {
+    store: Arc<dyn ObjectStore>,
+    meta: ObjectMeta,
+    metrics: ParquetFileMetrics,
+}
 
-    fn poll_next(
-        mut self: Pin<&mut Self>,
-        _cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        let poll = Poll::Ready(Iterator::next(&mut *self));
-        self.baseline_metrics.record_poll(poll)
+impl AsyncFileReader for ParquetFileReader {
+    fn get_bytes(
+        &mut self,
+        range: Range<usize>,
+    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
+        self.metrics.bytes_scanned.add(range.end - range.start);
+
+        self.store
+            .get_range(&self.meta.location, range)
+            .map_err(|e| {
+                ParquetError::General(format!("AsyncChunkReader::get_bytes error: {}", e))
+            })
+            .boxed()
     }
-}
 
-impl RecordBatchStream for ParquetExecStream {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
+    fn get_metadata(
+        &mut self,
+    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
+        Box::pin(async move {
+            let metadata = fetch_parquet_metadata(self.store.as_ref(), &self.meta)
+                .await
+                .map_err(|e| {
+                    ParquetError::General(format!(
+                        "AsyncChunkReader::get_metadata error: {}",
+                        e
+                    ))
+                })?;
+            Ok(Arc::new(metadata))
+        })
     }
 }
 
@@ -556,36 +465,47 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
     }
 }
 
-fn build_row_group_predicate(
-    pruning_predicate: &PruningPredicate,
-    metrics: ParquetFileMetrics,
-) -> Box<dyn FnMut(&RowGroupMetaData, usize) -> bool> {
-    let pruning_predicate = pruning_predicate.clone();
-    Box::new(
-        move |row_group_metadata: &RowGroupMetaData, _i: usize| -> bool {
-            let parquet_schema = pruning_predicate.schema().as_ref();
+fn prune_row_groups(
+    groups: &[RowGroupMetaData],
+    range: Option<FileRange>,
+    predicate: Option<PruningPredicate>,
+    metrics: &ParquetFileMetrics,
+) -> Vec<usize> {
+    // TODO: Columnar pruning
+    let mut filtered = Vec::with_capacity(groups.len());
+    for (idx, metadata) in groups.iter().enumerate() {
+        if let Some(range) = &range {
+            let offset = metadata.column(0).file_offset();
+            if offset < range.start || offset >= range.end {
+                continue;
+            }
+        }
+
+        if let Some(predicate) = &predicate {
             let pruning_stats = RowGroupPruningStatistics {
-                row_group_metadata,
-                parquet_schema,
+                row_group_metadata: metadata,
+                parquet_schema: predicate.schema().as_ref(),
             };
-            let predicate_values = pruning_predicate.prune(&pruning_stats);
-            match predicate_values {
+            match predicate.prune(&pruning_stats) {
                 Ok(values) => {
                     // NB: false means don't scan row group
-                    let num_pruned = values.iter().filter(|&v| !*v).count();
-                    metrics.row_groups_pruned.add(num_pruned);
-                    values[0]
+                    if !values[0] {
+                        metrics.row_groups_pruned.add(1);
+                        continue;
+                    }
                 }
                 // stats filter array could not be built
                 // return a closure which will not filter out any row groups
                 Err(e) => {
                     debug!("Error evaluating row group predicate values {}", e);
                     metrics.predicate_evaluation_errors.add(1);
-                    true
                 }
             }
-        },
-    )
+        }
+
+        filtered.push(idx)
+    }
+    filtered
 }
 
 /// Executes a query and writes the results to a partitioned Parquet file.
@@ -597,7 +517,7 @@ pub async fn plan_to_parquet(
 ) -> Result<()> {
     let path = path.as_ref();
     // create directory to contain the Parquet files (one per partition)
-    let fs_path = Path::new(path);
+    let fs_path = std::path::Path::new(path);
     match fs::create_dir(fs_path) {
         Ok(()) => {
             let mut tasks = vec![];
@@ -635,9 +555,6 @@ pub async fn plan_to_parquet(
 mod tests {
     use crate::{
         assert_batches_sorted_eq, assert_contains,
-        datafusion_data_access::{
-            object_store::local::LocalFileSystem, FileMeta, SizedFile,
-        },
         datasource::file_format::{parquet::ParquetFormat, FileFormat},
         physical_plan::collect,
     };
@@ -645,18 +562,23 @@ mod tests {
     use super::*;
     use crate::datasource::file_format::parquet::test_util::store_parquet;
     use crate::datasource::file_format::test_util::scan_format;
-    use crate::datasource::listing::FileRange;
+    use crate::datasource::listing::{FileRange, PartitionedFile};
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::execution::options::CsvReadOptions;
     use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
+    use crate::test::object_store::local_unpartitioned_file;
     use arrow::array::Float32Array;
+    use arrow::record_batch::RecordBatch;
     use arrow::{
         array::{Int64Array, Int8Array, StringArray},
         datatypes::{DataType, Field},
     };
-    use datafusion_data_access::object_store::local::local_unpartitioned_file;
+    use chrono::{TimeZone, Utc};
     use datafusion_expr::{col, lit};
     use futures::StreamExt;
+    use object_store::local::LocalFileSystem;
+    use object_store::path::Path;
+    use object_store::ObjectMeta;
     use parquet::{
         basic::Type as PhysicalType,
         file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
@@ -1057,9 +979,9 @@ mod tests {
 
     #[tokio::test]
     async fn parquet_exec_with_range() -> Result<()> {
-        fn file_range(meta: &FileMeta, start: i64, end: i64) -> PartitionedFile {
+        fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile {
             PartitionedFile {
-                file_meta: meta.clone(),
+                object_meta: meta.clone(),
                 partition_values: vec![],
                 range: Some(FileRange { start, end }),
             }
@@ -1102,7 +1024,7 @@ mod tests {
 
         let meta = local_unpartitioned_file(filename);
 
-        let store = Arc::new(LocalFileSystem {}) as _;
+        let store = Arc::new(LocalFileSystem::new()) as _;
         let file_schema = ParquetFormat::default()
             .infer_schema(&store, &[meta.clone()])
             .await?;
@@ -1156,7 +1078,7 @@ mod tests {
             .unwrap();
 
         let partitioned_file = PartitionedFile {
-            file_meta: meta,
+            object_meta: meta,
             partition_values: vec![
                 ScalarValue::Utf8(Some("2021".to_owned())),
                 ScalarValue::Utf8(Some("10".to_owned())),
@@ -1212,13 +1134,15 @@ mod tests {
     async fn parquet_exec_with_error() -> Result<()> {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
+        let location = Path::from_filesystem_path(".")
+            .unwrap()
+            .child("invalid.parquet");
+
         let partitioned_file = PartitionedFile {
-            file_meta: FileMeta {
-                sized_file: SizedFile {
-                    size: 1337,
-                    path: "invalid".into(),
-                },
-                last_modified: None,
+            object_meta: ObjectMeta {
+                location,
+                last_modified: Utc.timestamp_nanos(0),
+                size: 1337,
             },
             partition_values: vec![],
             range: None,
@@ -1240,10 +1164,7 @@ mod tests {
         let mut results = parquet_exec.execute(0, task_ctx)?;
         let batch = results.next().await.unwrap();
         // invalid file should produce an error to that effect
-        assert_contains!(
-            batch.unwrap_err().to_string(),
-            "External error: Parquet error: Arrow: IO error"
-        );
+        assert_contains!(batch.unwrap_err().to_string(), "invalid.parquet not found");
         assert!(results.next().await.is_none());
 
         Ok(())
@@ -1255,12 +1176,13 @@ mod tests {
     }
 
     #[test]
-    fn row_group_pruning_predicate_simple_expr() -> Result<()> {
+    fn row_group_pruning_predicate_simple_expr() {
         use datafusion_expr::{col, lit};
         // int > 1 => c1_max > 1
         let expr = col("c1").gt(lit(15));
         let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
-        let pruning_predicate = PruningPredicate::try_new(expr, Arc::new(schema))?;
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
 
         let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]);
         let rgm1 = get_row_group_meta_data(
@@ -1271,26 +1193,22 @@ mod tests {
             &schema_descr,
             vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
         );
-        let row_group_metadata = vec![rgm1, rgm2];
-        let mut row_group_predicate =
-            build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
-        let row_group_filter = row_group_metadata
-            .iter()
-            .enumerate()
-            .map(|(i, g)| row_group_predicate(g, i))
-            .collect::<Vec<_>>();
-        assert_eq!(row_group_filter, vec![false, true]);
 
-        Ok(())
+        let metrics = parquet_file_metrics();
+        assert_eq!(
+            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+            vec![1]
+        );
     }
 
     #[test]
-    fn row_group_pruning_predicate_missing_stats() -> Result<()> {
+    fn row_group_pruning_predicate_missing_stats() {
         use datafusion_expr::{col, lit};
         // int > 1 => c1_max > 1
         let expr = col("c1").gt(lit(15));
         let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
-        let pruning_predicate = PruningPredicate::try_new(expr, Arc::new(schema))?;
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
 
         let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]);
         let rgm1 = get_row_group_meta_data(
@@ -1301,23 +1219,17 @@ mod tests {
             &schema_descr,
             vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
         );
-        let row_group_metadata = vec![rgm1, rgm2];
-        let mut row_group_predicate =
-            build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
-        let row_group_filter = row_group_metadata
-            .iter()
-            .enumerate()
-            .map(|(i, g)| row_group_predicate(g, i))
-            .collect::<Vec<_>>();
+        let metrics = parquet_file_metrics();
         // missing statistics for first row group mean that the result from the predicate expression
         // is null / undefined so the first row group can't be filtered out
-        assert_eq!(row_group_filter, vec![true, true]);
-
-        Ok(())
+        assert_eq!(
+            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+            vec![0, 1]
+        );
     }
 
     #[test]
-    fn row_group_pruning_predicate_partial_expr() -> Result<()> {
+    fn row_group_pruning_predicate_partial_expr() {
         use datafusion_expr::{col, lit};
         // test row group predicate with partially supported expression
         // int > 1 and int % 2 => c1_max > 1 and true
@@ -1326,7 +1238,7 @@ mod tests {
             Field::new("c1", DataType::Int32, false),
             Field::new("c2", DataType::Int32, false),
         ]));
-        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?;
+        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
 
         let schema_descr = get_test_schema_descr(vec![
             ("c1", PhysicalType::INT32),
@@ -1346,32 +1258,27 @@ mod tests {
                 ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
             ],
         );
-        let row_group_metadata = vec![rgm1, rgm2];
-        let mut row_group_predicate =
-            build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
-        let row_group_filter = row_group_metadata
-            .iter()
-            .enumerate()
-            .map(|(i, g)| row_group_predicate(g, i))
-            .collect::<Vec<_>>();
+
+        let metrics = parquet_file_metrics();
+        let groups = &[rgm1, rgm2];
         // the first row group is still filtered out because the predicate expression can be partially evaluated
         // when conditions are joined using AND
-        assert_eq!(row_group_filter, vec![false, true]);
+        assert_eq!(
+            prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
+            vec![1]
+        );
 
         // if conditions in predicate are joined with OR and an unsupported expression is used
         // this bypasses the entire predicate expression and no row groups are filtered out
         let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
-        let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
-        let mut row_group_predicate =
-            build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
-        let row_group_filter = row_group_metadata
-            .iter()
-            .enumerate()
-            .map(|(i, g)| row_group_predicate(g, i))
-            .collect::<Vec<_>>();
-        assert_eq!(row_group_filter, vec![true, true]);
+        let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
 
-        Ok(())
+        // if conditions in predicate are joined with OR and an unsupported expression is used
+        // this bypasses the entire predicate expression and no row groups are filtered out
+        assert_eq!(
+            prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
+            vec![0, 1]
+        );
     }
 
     fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
@@ -1397,7 +1304,7 @@ mod tests {
     }
 
     #[test]
-    fn row_group_pruning_predicate_null_expr() -> Result<()> {
+    fn row_group_pruning_predicate_null_expr() {
         use datafusion_expr::{col, lit};
         // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
         let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
@@ -1405,24 +1312,19 @@ mod tests {
             Field::new("c1", DataType::Int32, false),
             Field::new("c2", DataType::Boolean, false),
         ]));
-        let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
-        let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate();
+        let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+        let groups = gen_row_group_meta_data_for_pruning_predicate();
 
-        let mut row_group_predicate =
-            build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
-        let row_group_filter = row_group_metadata
-            .iter()
-            .enumerate()
-            .map(|(i, g)| row_group_predicate(g, i))
-            .collect::<Vec<_>>();
+        let metrics = parquet_file_metrics();
         // First row group was filtered out because it contains no null value on "c2".
-        assert_eq!(row_group_filter, vec![false, true]);
-
-        Ok(())
+        assert_eq!(
+            prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
+            vec![1]
+        );
     }
 
     #[test]
-    fn row_group_pruning_predicate_eq_null_expr() -> Result<()> {
+    fn row_group_pruning_predicate_eq_null_expr() {
         use datafusion_expr::{col, lit};
         // test row group predicate with an unknown (Null) expr
         //
@@ -1434,22 +1336,16 @@ mod tests {
             Field::new("c1", DataType::Int32, false),
             Field::new("c2", DataType::Boolean, false),
         ]));
-        let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
-        let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate();
-
-        let mut row_group_predicate =
-            build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
-        let row_group_filter = row_group_metadata
-            .iter()
-            .enumerate()
-            .map(|(i, g)| row_group_predicate(g, i))
-            .collect::<Vec<_>>();
+        let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+        let groups = gen_row_group_meta_data_for_pruning_predicate();
 
+        let metrics = parquet_file_metrics();
         // bool = NULL always evaluates to NULL (and thus will not
         // pass predicates. Ideally these should both be false
-        assert_eq!(row_group_filter, vec![false, true]);
-
-        Ok(())
+        assert_eq!(
+            prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
+            vec![1]
+        );
     }
 
     fn get_row_group_meta_data(
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index c66b8dc4a..691845785 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -288,7 +288,7 @@ pub fn with_new_children_if_necessary(
 /// ```
 /// use datafusion::prelude::*;
 /// use datafusion::physical_plan::displayable;
-/// use std::path::is_separator;
+/// use object_store::path::Path;
 ///
 /// #[tokio::main]
 /// async fn main() {
@@ -312,8 +312,8 @@ pub fn with_new_children_if_necessary(
 ///   let plan_string = format!("{}", displayable_plan.indent());
 ///
 ///   let working_directory = std::env::current_dir().unwrap();
-///   let normalized = working_directory.to_string_lossy().replace(is_separator, "/");
-///   let plan_string = plan_string.replace(&normalized, "WORKING_DIR");
+///   let normalized = Path::from_filesystem_path(working_directory).unwrap();
+///   let plan_string = plan_string.replace(normalized.as_ref(), "WORKING_DIR");
 ///
 ///   assert_eq!("ProjectionExec: expr=[a@0 as a]\
 ///              \n  CoalesceBatchesExec: target_batch_size=4096\
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 304c1b376..598d81879 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -24,12 +24,12 @@ use crate::error::Result;
 use crate::from_slice::FromSlice;
 use crate::logical_plan::LogicalPlan;
 use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
+use crate::test::object_store::local_unpartitioned_file;
 use crate::test_util::aggr_test_schema;
 use array::{Array, ArrayRef};
 use arrow::array::{self, DecimalBuilder, Int32Array};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
-use datafusion_data_access::object_store::local::local_unpartitioned_file;
 use futures::{Future, FutureExt};
 use std::fs::File;
 use std::io::prelude::*;
@@ -109,9 +109,7 @@ pub fn partitioned_csv_config(
 
         files
             .into_iter()
-            .map(
-                |f| vec![local_unpartitioned_file(f.to_str().unwrap().to_owned()).into()],
-            )
+            .map(|f| vec![local_unpartitioned_file(f).into()])
             .collect::<Vec<_>>()
     } else {
         vec![vec![local_unpartitioned_file(path).into()]]
diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs
index fdd053346..e61653e88 100644
--- a/datafusion/core/src/test/object_store.rs
+++ b/datafusion/core/src/test/object_store.rs
@@ -14,113 +14,40 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//! Object store implem used for testing
-
-use std::{
-    io,
-    io::{Cursor, Read},
-    sync::Arc,
-};
-
-use crate::datafusion_data_access::{
-    object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore},
-    FileMeta, Result, SizedFile,
-};
+//! Object store implementation used for testing
 use crate::prelude::SessionContext;
-use async_trait::async_trait;
-use futures::{stream, AsyncRead, StreamExt};
+use futures::FutureExt;
+use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
+use std::sync::Arc;
 
 /// Returns a test object store with the provided `ctx`
-pub(crate) fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
+pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
     ctx.runtime_env()
-        .register_object_store("test", TestObjectStore::new_arc(files));
-}
-
-#[derive(Debug)]
-/// An object store implem that is useful for testing.
-/// `ObjectReader`s are filled with zero bytes.
-pub struct TestObjectStore {
-    /// The `(path,size)` of the files that "exist" in the store
-    files: Vec<(String, u64)>,
-}
-
-impl TestObjectStore {
-    pub fn new_arc(files: &[(&str, u64)]) -> Arc<dyn ObjectStore> {
-        Arc::new(Self {
-            files: files.iter().map(|f| (f.0.to_owned(), f.1)).collect(),
-        })
-    }
+        .register_object_store("test", "", make_test_store(files));
 }
 
-#[async_trait]
-impl ObjectStore for TestObjectStore {
-    async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
-        let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_string();
-        Ok(Box::pin(
-            stream::iter(
-                self.files
-                    .clone()
-                    .into_iter()
-                    .filter(move |f| f.0.starts_with(&prefix)),
-            )
-            .map(|f| {
-                Ok(FileMeta {
-                    sized_file: SizedFile {
-                        path: f.0.clone(),
-                        size: f.1,
-                    },
-                    last_modified: None,
-                })
-            }),
-        ))
-    }
+/// Create a test object store with the provided files
+pub fn make_test_store(files: &[(&str, u64)]) -> Arc<dyn ObjectStore> {
+    let memory = InMemory::new();
 
-    async fn list_dir(
-        &self,
-        _prefix: &str,
-        _delimiter: Option<String>,
-    ) -> Result<ListEntryStream> {
-        unimplemented!()
+    for (name, size) in files {
+        memory
+            .put(&Path::from(*name), vec![0; *size as usize].into())
+            .now_or_never()
+            .unwrap()
+            .unwrap();
     }
 
-    fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
-        match self.files.iter().find(|item| file.path == item.0) {
-            Some((_, size)) if *size == file.size => {
-                Ok(Arc::new(EmptyObjectReader(*size)))
-            }
-            Some(_) => Err(io::Error::new(
-                io::ErrorKind::NotFound,
-                "found in test list but wrong size",
-            )),
-            None => Err(io::Error::new(
-                io::ErrorKind::NotFound,
-                "not in provided test list",
-            )),
-        }
-    }
+    Arc::new(memory)
 }
 
-struct EmptyObjectReader(u64);
-
-#[async_trait]
-impl ObjectReader for EmptyObjectReader {
-    async fn chunk_reader(
-        &self,
-        _start: u64,
-        _length: usize,
-    ) -> Result<Box<dyn AsyncRead>> {
-        unimplemented!()
-    }
-
-    fn sync_chunk_reader(
-        &self,
-        _start: u64,
-        _length: usize,
-    ) -> Result<Box<dyn Read + Send + Sync>> {
-        Ok(Box::new(Cursor::new(vec![0; self.0 as usize])))
-    }
-
-    fn length(&self) -> u64 {
-        self.0
+/// Helper method to fetch the file size and date at given path and create a `ObjectMeta`
+pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta {
+    let location = Path::from_filesystem_path(path.as_ref()).unwrap();
+    let metadata = std::fs::metadata(path).expect("Local file metadata");
+    ObjectMeta {
+        location,
+        last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
+        size: metadata.len() as usize,
     }
 }
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index e3da9d986..a88445d78 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -17,19 +17,17 @@
 
 //! Test queries on partitioned datasets
 
-use std::{fs, io, sync::Arc};
+use std::fs::File;
+use std::io::{Read, Seek, SeekFrom};
+use std::ops::Range;
+use std::sync::Arc;
 
 use async_trait::async_trait;
+use bytes::Bytes;
+use chrono::{TimeZone, Utc};
 use datafusion::datasource::listing::ListingTableUrl;
 use datafusion::{
     assert_batches_sorted_eq,
-    datafusion_data_access::{
-        object_store::{
-            local::LocalFileSystem, FileMetaStream, ListEntryStream, ObjectReader,
-            ObjectStore,
-        },
-        FileMeta, SizedFile,
-    },
     datasource::{
         file_format::{csv::CsvFormat, parquet::ParquetFormat},
         listing::{ListingOptions, ListingTable, ListingTableConfig},
@@ -40,7 +38,9 @@ use datafusion::{
     test_util::{self, arrow_test_data, parquet_test_data},
 };
 use datafusion_common::ScalarValue;
+use futures::stream::BoxStream;
 use futures::{stream, StreamExt};
+use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore};
 
 #[tokio::test]
 async fn parquet_distinct_partition_col() -> Result<()> {
@@ -183,7 +183,7 @@ async fn csv_filter_with_file_col() -> Result<()> {
             "mytable/date=2021-10-28/file.csv",
         ],
         &["date"],
-        "mirror:///mytable",
+        "mirror:///mytable/",
     );
 
     let result = ctx
@@ -219,7 +219,7 @@ async fn csv_projection_on_partition() -> Result<()> {
             "mytable/date=2021-10-28/file.csv",
         ],
         &["date"],
-        "mirror:///mytable",
+        "mirror:///mytable/",
     );
 
     let result = ctx
@@ -256,7 +256,7 @@ async fn csv_grouping_by_partition() -> Result<()> {
             "mytable/date=2021-10-28/file.csv",
         ],
         &["date"],
-        "mirror:///mytable",
+        "mirror:///mytable/",
     );
 
     let result = ctx
@@ -417,6 +417,7 @@ fn register_partitioned_aggregate_csv(
     let file_schema = test_util::aggr_test_schema();
     ctx.runtime_env().register_object_store(
         "mirror",
+        "",
         MirroringObjectStore::new_arc(csv_file_path, store_paths),
     );
 
@@ -444,6 +445,7 @@ async fn register_partitioned_alltypes_parquet(
     let parquet_file_path = format!("{}/{}", testdata, source_file);
     ctx.runtime_env().register_object_store(
         "mirror",
+        "",
         MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths),
     );
 
@@ -481,9 +483,15 @@ pub struct MirroringObjectStore {
     file_size: u64,
 }
 
+impl std::fmt::Display for MirroringObjectStore {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{:?}", self)
+    }
+}
+
 impl MirroringObjectStore {
     pub fn new_arc(mirrored_file: String, paths: &[&str]) -> Arc<dyn ObjectStore> {
-        let metadata = fs::metadata(&mirrored_file).expect("Local file metadata");
+        let metadata = std::fs::metadata(&mirrored_file).expect("Local file metadata");
         Arc::new(Self {
             files: paths.iter().map(|&f| f.to_owned()).collect(),
             mirrored_file,
@@ -494,12 +502,54 @@ impl MirroringObjectStore {
 
 #[async_trait]
 impl ObjectStore for MirroringObjectStore {
-    async fn list_file(
+    async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> {
+        unimplemented!()
+    }
+
+    async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
+        self.files.iter().find(|x| *x == location.as_ref()).unwrap();
+        let path = std::path::PathBuf::from(&self.mirrored_file);
+        let file = File::open(&path).unwrap();
+        Ok(GetResult::File(file, path))
+    }
+
+    async fn get_range(
+        &self,
+        location: &Path,
+        range: Range<usize>,
+    ) -> object_store::Result<Bytes> {
+        self.files.iter().find(|x| *x == location.as_ref()).unwrap();
+        let path = std::path::PathBuf::from(&self.mirrored_file);
+        let mut file = File::open(&path).unwrap();
+        file.seek(SeekFrom::Start(range.start as u64)).unwrap();
+
+        let to_read = range.end - range.start;
+        let mut data = Vec::with_capacity(to_read);
+        let read = file.take(to_read as u64).read_to_end(&mut data).unwrap();
+        assert_eq!(read, to_read);
+
+        Ok(data.into())
+    }
+
+    async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
+        self.files.iter().find(|x| *x == location.as_ref()).unwrap();
+        Ok(ObjectMeta {
+            location: location.clone(),
+            last_modified: Utc.timestamp_nanos(0),
+            size: self.file_size as usize,
+        })
+    }
+
+    async fn delete(&self, _location: &Path) -> object_store::Result<()> {
+        unimplemented!()
+    }
+
+    async fn list(
         &self,
-        prefix: &str,
-    ) -> datafusion_data_access::Result<FileMetaStream> {
-        let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_string();
-        let size = self.file_size;
+        prefix: Option<&Path>,
+    ) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> {
+        let prefix = prefix.map(|p| p.as_ref()).unwrap_or("").to_string();
+        let size = self.file_size as usize;
         Ok(Box::pin(
             stream::iter(
                 self.files
@@ -508,39 +558,31 @@ impl ObjectStore for MirroringObjectStore {
                     .filter(move |f| f.starts_with(&prefix)),
             )
             .map(move |f| {
-                Ok(FileMeta {
-                    sized_file: SizedFile { path: f, size },
-                    last_modified: None,
+                Ok(ObjectMeta {
+                    location: Path::parse(f)?,
+                    last_modified: Utc.timestamp_nanos(0),
+                    size,
                 })
             }),
         ))
     }
 
-    async fn list_dir(
+    async fn list_with_delimiter(
         &self,
-        _prefix: &str,
-        _delimiter: Option<String>,
-    ) -> datafusion_data_access::Result<ListEntryStream> {
+        _prefix: Option<&Path>,
+    ) -> object_store::Result<ListResult> {
         unimplemented!()
     }
 
-    fn file_reader(
+    async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
+        unimplemented!()
+    }
+
+    async fn copy_if_not_exists(
         &self,
-        file: SizedFile,
-    ) -> datafusion_data_access::Result<Arc<dyn ObjectReader>> {
-        assert_eq!(
-            self.file_size, file.size,
-            "Requested files should have the same size as the mirrored file"
-        );
-        match self.files.iter().find(|&item| &file.path == item) {
-            Some(_) => Ok(LocalFileSystem {}.file_reader(SizedFile {
-                path: self.mirrored_file.clone(),
-                size: self.file_size,
-            })?),
-            None => Err(io::Error::new(
-                io::ErrorKind::NotFound,
-                "not in provided test list",
-            )),
-        }
+        _from: &Path,
+        _to: &Path,
+    ) -> object_store::Result<()> {
+        unimplemented!()
     }
 }
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 1de6af06a..2c840321f 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -22,12 +22,10 @@ use datafusion::error::Result;
 use datafusion::physical_plan::file_format::FileScanConfig;
 use datafusion::physical_plan::{collect, ExecutionPlan};
 use datafusion::prelude::SessionContext;
-use datafusion_data_access::object_store::local::{
-    local_unpartitioned_file, LocalFileSystem,
-};
 use datafusion_row::layout::RowType::{Compact, WordAligned};
 use datafusion_row::reader::read_as_batch;
 use datafusion_row::writer::write_batch_unchecked;
+use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
 use std::sync::Arc;
 
 #[tokio::test]
@@ -79,12 +77,15 @@ async fn get_exec(
 ) -> Result<Arc<dyn ExecutionPlan>> {
     let testdata = datafusion::test_util::parquet_test_data();
     let filename = format!("{}/{}", testdata, file_name);
-    let meta = local_unpartitioned_file(filename);
+
+    let path = Path::from_filesystem_path(filename).unwrap();
 
     let format = ParquetFormat::default();
-    let object_store = Arc::new(LocalFileSystem {}) as _;
+    let object_store = Arc::new(LocalFileSystem::new()) as Arc<dyn ObjectStore>;
     let object_store_url = ObjectStoreUrl::local_filesystem();
 
+    let meta = object_store.head(&path).await.unwrap();
+
     let file_schema = format
         .infer_schema(&object_store, &[meta.clone()])
         .await
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 0e3e08873..a7f4cabe9 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -46,11 +46,11 @@ use datafusion::{
 };
 use datafusion::{execution::context::SessionContext, physical_plan::displayable};
 use datafusion_expr::Volatility;
+use object_store::path::Path;
 use std::fs::File;
 use std::io::Write;
 use std::path::PathBuf;
 use tempfile::TempDir;
-use url::Url;
 
 /// A macro to assert that some particular line contains two substrings
 ///
@@ -897,16 +897,9 @@ impl ExplainNormalizer {
             // Push path as is
             replacements.push((path.to_string_lossy().to_string(), key.to_string()));
 
-            // Push canonical version of path
-            let canonical = path.canonicalize().unwrap();
-            replacements.push((canonical.to_string_lossy().to_string(), key.to_string()));
-
-            if cfg!(target_family = "windows") {
-                // Push URL representation of path, to handle windows
-                let url = Url::from_file_path(canonical).unwrap();
-                let path = url.path().strip_prefix('/').unwrap();
-                replacements.push((path.to_string(), key.to_string()));
-            }
+            // Push URL representation of path
+            let path = Path::from_filesystem_path(path).unwrap();
+            replacements.push((path.to_string(), key.to_string()));
         };
 
         push_path(test_util::arrow_test_data().into(), "ARROW_TEST_DATA");