You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/06 21:20:56 UTC

[arrow-datafusion] branch main updated: Move `physical_plan::file_format` to `datasource::plan` (#6516)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 786f222e23 Move `physical_plan::file_format` to `datasource::plan` (#6516)
786f222e23 is described below

commit 786f222e237694a9f7a6e74f274d0990e3474f14
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Jun 6 17:20:51 2023 -0400

    Move `physical_plan::file_format` to `datasource::plan` (#6516)
    
    * Move `physical_plan::file_format` to `datasource::plan`
    
    * fix doclinks
---
 datafusion-examples/examples/csv_opener.rs         |  9 ++++----
 datafusion-examples/examples/json_opener.rs        |  9 ++++----
 datafusion/core/src/dataframe.rs                   |  2 +-
 .../core/src/datasource/file_format/arrow.rs       |  2 +-
 datafusion/core/src/datasource/file_format/avro.rs |  2 +-
 datafusion/core/src/datasource/file_format/csv.rs  |  6 +++---
 datafusion/core/src/datasource/file_format/json.rs |  2 +-
 datafusion/core/src/datasource/file_format/mod.rs  |  2 +-
 .../core/src/datasource/file_format/parquet.rs     |  6 +++---
 datafusion/core/src/datasource/listing/mod.rs      |  4 ++--
 datafusion/core/src/datasource/listing/table.rs    |  4 ++--
 datafusion/core/src/datasource/mod.rs              |  1 +
 .../physical_plan}/arrow_file.rs                   |  4 ++--
 .../physical_plan}/avro.rs                         |  6 +++---
 .../physical_plan}/chunked_store.rs                |  0
 .../physical_plan}/csv.rs                          | 10 ++++-----
 .../physical_plan}/file_stream.rs                  |  6 +++---
 .../physical_plan}/json.rs                         | 10 ++++-----
 .../physical_plan}/mod.rs                          |  0
 .../physical_plan}/parquet.rs                      | 24 +++++++++++-----------
 .../physical_plan}/parquet/metrics.rs              |  0
 .../physical_plan}/parquet/page_filter.rs          |  4 ++--
 .../physical_plan}/parquet/row_filter.rs           |  0
 .../physical_plan}/parquet/row_groups.rs           |  2 +-
 datafusion/core/src/execution/context.rs           |  2 +-
 .../combine_partial_final_agg.rs                   |  2 +-
 .../src/physical_optimizer/dist_enforcement.rs     |  2 +-
 .../core/src/physical_optimizer/repartition.rs     |  6 +++---
 .../src/physical_optimizer/sort_enforcement.rs     |  2 +-
 datafusion/core/src/physical_plan/mod.rs           |  1 -
 datafusion/core/src/physical_plan/windows/mod.rs   |  2 +-
 datafusion/core/src/test/mod.rs                    |  2 +-
 datafusion/core/src/test_util/parquet.rs           |  2 +-
 datafusion/core/tests/parquet/custom_reader.rs     |  2 +-
 datafusion/core/tests/parquet/mod.rs               |  7 ++-----
 datafusion/core/tests/parquet/page_pruning.rs      |  2 +-
 datafusion/core/tests/parquet/schema_coercion.rs   |  2 +-
 datafusion/core/tests/row.rs                       |  2 +-
 datafusion/proto/src/physical_plan/from_proto.rs   |  2 +-
 datafusion/proto/src/physical_plan/mod.rs          |  8 +++++---
 datafusion/proto/src/physical_plan/to_proto.rs     |  2 +-
 datafusion/substrait/src/physical_plan/consumer.rs |  2 +-
 datafusion/substrait/src/physical_plan/producer.rs |  2 +-
 .../substrait/tests/roundtrip_physical_plan.rs     |  2 +-
 44 files changed, 83 insertions(+), 86 deletions(-)

diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs
index 351f95cd2c..f2982522a7 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -20,14 +20,13 @@ use std::{sync::Arc, vec};
 use datafusion::{
     assert_batches_eq,
     datasource::{
-        file_format::file_type::FileCompressionType, listing::PartitionedFile,
+        file_format::file_type::FileCompressionType,
+        listing::PartitionedFile,
         object_store::ObjectStoreUrl,
+        physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
     },
     error::Result,
-    physical_plan::{
-        file_format::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
-        metrics::ExecutionPlanMetricsSet,
-    },
+    physical_plan::metrics::ExecutionPlanMetricsSet,
     test_util::aggr_test_schema,
 };
 use futures::StreamExt;
diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs
index 6c7d5ae3d7..39013455da 100644
--- a/datafusion-examples/examples/json_opener.rs
+++ b/datafusion-examples/examples/json_opener.rs
@@ -21,14 +21,13 @@ use arrow_schema::{DataType, Field, Schema};
 use datafusion::{
     assert_batches_eq,
     datasource::{
-        file_format::file_type::FileCompressionType, listing::PartitionedFile,
+        file_format::file_type::FileCompressionType,
+        listing::PartitionedFile,
         object_store::ObjectStoreUrl,
+        physical_plan::{FileScanConfig, FileStream, JsonOpener},
     },
     error::Result,
-    physical_plan::{
-        file_format::{FileScanConfig, FileStream, JsonOpener},
-        metrics::ExecutionPlanMetricsSet,
-    },
+    physical_plan::metrics::ExecutionPlanMetricsSet,
 };
 use futures::StreamExt;
 use object_store::ObjectStore;
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 760ce7c0f5..1b285107b3 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -38,6 +38,7 @@ use crate::arrow::datatypes::Schema;
 use crate::arrow::datatypes::SchemaRef;
 use crate::arrow::record_batch::RecordBatch;
 use crate::arrow::util::pretty;
+use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet};
 use crate::datasource::{provider_as_source, MemTable, TableProvider};
 use crate::error::Result;
 use crate::execution::{
@@ -48,7 +49,6 @@ use crate::logical_expr::{
     col, utils::find_window_exprs, Expr, JoinType, LogicalPlan, LogicalPlanBuilder,
     Partitioning, TableType,
 };
-use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
 use crate::physical_plan::SendableRecordBatchStream;
 use crate::physical_plan::{collect, collect_partitioned};
 use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs
index 2a27468f45..2b3ef7ee4e 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -20,9 +20,9 @@
 //! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
 
 use crate::datasource::file_format::FileFormat;
+use crate::datasource::physical_plan::{ArrowExec, FileScanConfig};
 use crate::error::Result;
 use crate::execution::context::SessionState;
-use crate::physical_plan::file_format::{ArrowExec, FileScanConfig};
 use crate::physical_plan::ExecutionPlan;
 use arrow::ipc::reader::FileReader;
 use arrow_schema::{Schema, SchemaRef};
diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs
index 374ef18970..ab9f1f5dd0 100644
--- a/datafusion/core/src/datasource/file_format/avro.rs
+++ b/datafusion/core/src/datasource/file_format/avro.rs
@@ -28,9 +28,9 @@ use object_store::{GetResult, ObjectMeta, ObjectStore};
 
 use super::FileFormat;
 use crate::avro_to_arrow::read_avro_schema_from_reader;
+use crate::datasource::physical_plan::{AvroExec, FileScanConfig};
 use crate::error::Result;
 use crate::execution::context::SessionState;
-use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::Statistics;
 
diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs
index 69d1c0089f..01bf76ccf4 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -45,11 +45,11 @@ use crate::datasource::file_format::{
     AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart,
     DEFAULT_SCHEMA_INFER_MAX_RECORD,
 };
-use crate::error::Result;
-use crate::execution::context::SessionState;
-use crate::physical_plan::file_format::{
+use crate::datasource::physical_plan::{
     CsvExec, FileGroupDisplay, FileMeta, FileScanConfig, FileSinkConfig,
 };
+use crate::error::Result;
+use crate::execution::context::SessionState;
 use crate::physical_plan::insert::{DataSink, InsertExec};
 use crate::physical_plan::Statistics;
 use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs
index 21cd22f070..6247e85ba8 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -36,9 +36,9 @@ use super::FileFormat;
 use super::FileScanConfig;
 use crate::datasource::file_format::file_type::FileCompressionType;
 use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
+use crate::datasource::physical_plan::NdJsonExec;
 use crate::error::Result;
 use crate::execution::context::SessionState;
-use crate::physical_plan::file_format::NdJsonExec;
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::Statistics;
 
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index 71bd8f1c07..a6848b0d12 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -36,9 +36,9 @@ use std::task::{Context, Poll};
 use std::{fmt, mem};
 
 use crate::arrow::datatypes::SchemaRef;
+use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
 use crate::error::Result;
 use crate::execution::context::SessionState;
-use crate::physical_plan::file_format::{FileScanConfig, FileSinkConfig};
 use crate::physical_plan::{ExecutionPlan, Statistics};
 
 use arrow_array::RecordBatch;
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 6957f367c9..875c58ae44 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -42,11 +42,11 @@ use crate::arrow::array::{
 use crate::arrow::datatypes::DataType;
 use crate::config::ConfigOptions;
 
+use crate::datasource::physical_plan::{ParquetExec, SchemaAdapter};
 use crate::datasource::{create_max_min_accs, get_col_stats};
 use crate::error::Result;
 use crate::execution::context::SessionState;
 use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
 use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics};
 
 /// The default file extension of parquet files
@@ -379,7 +379,7 @@ fn summarize_min_max(
 /// This component is a subject to **change** in near future and is exposed for low level integrations
 /// through [`ParquetFileReaderFactory`].
 ///
-/// [`ParquetFileReaderFactory`]: crate::physical_plan::file_format::ParquetFileReaderFactory
+/// [`ParquetFileReaderFactory`]: crate::datasource::physical_plan::ParquetFileReaderFactory
 pub async fn fetch_parquet_metadata(
     store: &dyn ObjectStore,
     meta: &ObjectMeta,
@@ -623,7 +623,7 @@ mod tests {
     use super::*;
 
     use crate::datasource::file_format::parquet::test_util::store_parquet;
-    use crate::physical_plan::file_format::get_scan_files;
+    use crate::datasource::physical_plan::get_scan_files;
     use crate::physical_plan::metrics::MetricValue;
     use crate::prelude::{SessionConfig, SessionContext};
     use arrow::array::{Array, ArrayRef, StringArray};
diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs
index a434a081e8..427cfc8501 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -62,8 +62,8 @@ pub struct PartitionedFile {
     /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type.
     ///
     ///
-    /// [`wrap_partition_type_in_dict`]: crate::physical_plan::file_format::wrap_partition_type_in_dict
-    /// [`wrap_partition_value_in_dict`]: crate::physical_plan::file_format::wrap_partition_value_in_dict
+    /// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict
+    /// [`wrap_partition_value_in_dict`]: crate::datasource::physical_plan::wrap_partition_value_in_dict
     /// [`table_partition_cols`]: table::ListingOptions::table_partition_cols
     pub partition_values: Vec<ScalarValue>,
     /// An optional file range for a more fine-grained parallel execution
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index fd316a74b2..b11fa8b063 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -33,6 +33,7 @@ use object_store::path::Path;
 use object_store::ObjectMeta;
 
 use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
+use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
 use crate::datasource::{
     file_format::{
         arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
@@ -44,7 +45,6 @@ use crate::datasource::{
 };
 use crate::logical_expr::TableProviderFilterPushDown;
 use crate::physical_plan;
-use crate::physical_plan::file_format::{FileScanConfig, FileSinkConfig};
 use crate::{
     error::{DataFusionError, Result},
     execution::context::SessionState,
@@ -357,7 +357,7 @@ impl ListingOptions {
     /// ```
     ///
     /// [Hive Partitioning]: https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.3/bk_system-admin-guide/content/hive_partitioned_tables.html
-    /// [`wrap_partition_type_in_dict`]: crate::physical_plan::file_format::wrap_partition_type_in_dict
+    /// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict
     pub fn with_table_partition_cols(
         mut self,
         table_partition_cols: Vec<(String, DataType)>,
diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs
index dba4112b1e..683afb7902 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -28,6 +28,7 @@ pub mod file_format;
 pub mod listing;
 pub mod listing_table_factory;
 pub mod memory;
+pub mod physical_plan;
 pub mod streaming;
 pub mod view;
 
diff --git a/datafusion/core/src/physical_plan/file_format/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/file_format/arrow_file.rs
rename to datafusion/core/src/datasource/physical_plan/arrow_file.rs
index 72a6d0a0b4..43074ccb77 100644
--- a/datafusion/core/src/physical_plan/file_format/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -16,10 +16,10 @@
 // under the License.
 
 //! Execution plan for reading Arrow files
-use crate::error::Result;
-use crate::physical_plan::file_format::{
+use crate::datasource::physical_plan::{
     FileMeta, FileOpenFuture, FileOpener, FileScanConfig,
 };
+use crate::error::Result;
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
     ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs
similarity index 98%
rename from datafusion/core/src/physical_plan/file_format/avro.rs
rename to datafusion/core/src/datasource/physical_plan/avro.rs
index 9adf492d7d..704a97ba7e 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -165,8 +165,8 @@ impl ExecutionPlan for AvroExec {
 #[cfg(feature = "avro")]
 mod private {
     use super::*;
-    use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener};
-    use crate::physical_plan::file_format::FileMeta;
+    use crate::datasource::physical_plan::file_stream::{FileOpenFuture, FileOpener};
+    use crate::datasource::physical_plan::FileMeta;
     use bytes::Buf;
     use futures::StreamExt;
     use object_store::{GetResult, ObjectStore};
@@ -222,7 +222,7 @@ mod tests {
     use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
-    use crate::physical_plan::file_format::chunked_store::ChunkedStore;
+    use crate::datasource::physical_plan::chunked_store::ChunkedStore;
     use crate::prelude::SessionContext;
     use crate::scalar::ScalarValue;
     use crate::test::object_store::local_unpartitioned_file;
diff --git a/datafusion/core/src/physical_plan/file_format/chunked_store.rs b/datafusion/core/src/datasource/physical_plan/chunked_store.rs
similarity index 100%
rename from datafusion/core/src/physical_plan/file_format/chunked_store.rs
rename to datafusion/core/src/datasource/physical_plan/chunked_store.rs
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/file_format/csv.rs
rename to datafusion/core/src/datasource/physical_plan/csv.rs
index cbdd626f0e..d2c76ecaf5 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -18,13 +18,13 @@
 //! Execution plan for reading CSV files
 
 use crate::datasource::file_format::file_type::FileCompressionType;
+use crate::datasource::physical_plan::file_stream::{
+    FileOpenFuture, FileOpener, FileStream,
+};
+use crate::datasource::physical_plan::FileMeta;
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::physical_plan::file_format::file_stream::{
-    FileOpenFuture, FileOpener, FileStream,
-};
-use crate::physical_plan::file_format::FileMeta;
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
     ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
@@ -369,7 +369,7 @@ pub async fn plan_to_csv(
 mod tests {
     use super::*;
     use crate::datasource::file_format::file_type::FileType;
-    use crate::physical_plan::file_format::chunked_store::ChunkedStore;
+    use crate::datasource::physical_plan::chunked_store::ChunkedStore;
     use crate::prelude::*;
     use crate::test::{partitioned_csv_config, partitioned_file_groups};
     use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data};
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/file_format/file_stream.rs
rename to datafusion/core/src/datasource/physical_plan/file_stream.rs
index 0d1dc1bee2..2c4437de0a 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -28,10 +28,10 @@ use std::task::{Context, Poll};
 use std::time::Instant;
 
 use crate::datasource::listing::PartitionedFile;
-use crate::error::Result;
-use crate::physical_plan::file_format::{
+use crate::datasource::physical_plan::{
     FileMeta, FileScanConfig, PartitionColumnProjector,
 };
+use crate::error::Result;
 use crate::physical_plan::metrics::{
     BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
 };
@@ -524,7 +524,7 @@ mod tests {
     use super::*;
     use crate::datasource::file_format::BatchSerializer;
     use crate::datasource::object_store::ObjectStoreUrl;
-    use crate::physical_plan::file_format::FileMeta;
+    use crate::datasource::physical_plan::FileMeta;
     use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
     use crate::prelude::SessionContext;
     use crate::{
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/file_format/json.rs
rename to datafusion/core/src/datasource/physical_plan/json.rs
index 10f249d4e7..8340c282a0 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -17,13 +17,13 @@
 
 //! Execution plan for reading line-delimited JSON files
 use crate::datasource::file_format::file_type::FileCompressionType;
+use crate::datasource::physical_plan::file_stream::{
+    FileOpenFuture, FileOpener, FileStream,
+};
+use crate::datasource::physical_plan::FileMeta;
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::physical_plan::file_format::file_stream::{
-    FileOpenFuture, FileOpener, FileStream,
-};
-use crate::physical_plan::file_format::FileMeta;
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
     ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
@@ -308,8 +308,8 @@ mod tests {
     use crate::datasource::file_format::{json::JsonFormat, FileFormat};
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
+    use crate::datasource::physical_plan::chunked_store::ChunkedStore;
     use crate::execution::context::SessionState;
-    use crate::physical_plan::file_format::chunked_store::ChunkedStore;
     use crate::prelude::NdJsonReadOptions;
     use crate::prelude::*;
     use crate::test::partitioned_file_groups;
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs
similarity index 100%
rename from datafusion/core/src/physical_plan/file_format/mod.rs
rename to datafusion/core/src/datasource/physical_plan/mod.rs
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/file_format/parquet.rs
rename to datafusion/core/src/datasource/physical_plan/parquet.rs
index a0c7402cc4..383a2066fc 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -17,18 +17,12 @@
 
 //! Execution plan for reading Parquet files
 
-use fmt::Debug;
-use std::any::Any;
-use std::cmp::min;
-use std::fmt;
-use std::fs;
-use std::ops::Range;
-use std::sync::Arc;
-
-use crate::physical_plan::file_format::file_stream::{
+use crate::datasource::physical_plan::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
 };
-use crate::physical_plan::file_format::parquet::page_filter::PagePruningPredicate;
+use crate::datasource::physical_plan::{
+    parquet::page_filter::PagePruningPredicate, FileMeta, FileScanConfig, SchemaAdapter,
+};
 use crate::{
     config::ConfigOptions,
     datasource::listing::FileRange,
@@ -37,13 +31,19 @@ use crate::{
     physical_optimizer::pruning::PruningPredicate,
     physical_plan::{
         common::AbortOnDropSingle,
-        expressions::PhysicalSortExpr,
-        file_format::{FileMeta, FileScanConfig, SchemaAdapter},
         metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
         ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
         Partitioning, SendableRecordBatchStream, Statistics,
     },
 };
+use datafusion_physical_expr::PhysicalSortExpr;
+use fmt::Debug;
+use std::any::Any;
+use std::cmp::min;
+use std::fmt;
+use std::fs;
+use std::ops::Range;
+use std::sync::Arc;
 
 use arrow::datatypes::{DataType, SchemaRef};
 use arrow::error::ArrowError;
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs b/datafusion/core/src/datasource/physical_plan/parquet/metrics.rs
similarity index 100%
rename from datafusion/core/src/physical_plan/file_format/parquet/metrics.rs
rename to datafusion/core/src/datasource/physical_plan/parquet/metrics.rs
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
rename to datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
index 00e55c41ad..c046de73d7 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
@@ -39,10 +39,10 @@ use parquet::{
 };
 use std::sync::Arc;
 
-use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
-use crate::physical_plan::file_format::parquet::{
+use crate::datasource::physical_plan::parquet::{
     from_bytes_to_i128, parquet_to_arrow_decimal_type,
 };
+use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
 
 use super::metrics::ParquetFileMetrics;
 
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
similarity index 100%
rename from datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
rename to datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
rename to datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 0cbb5d9465..07ef28304c 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -27,7 +27,7 @@ use parquet::file::{
     metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
 };
 
-use crate::physical_plan::file_format::parquet::{
+use crate::datasource::physical_plan::parquet::{
     from_bytes_to_i128, parquet_to_arrow_decimal_type,
 };
 use crate::{
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 078093189c..7f74e28511 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -80,9 +80,9 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
 use crate::physical_optimizer::repartition::Repartition;
 
 use crate::config::ConfigOptions;
+use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet};
 use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
 use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
-use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
 use crate::physical_plan::planner::DefaultPhysicalPlanner;
 use crate::physical_plan::udaf::AggregateUDF;
 use crate::physical_plan::udf::ScalarUDF;
diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index fdef7d54b8..3ec9e9bbd0 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -207,11 +207,11 @@ mod tests {
     use super::*;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
+    use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
     use crate::physical_plan::aggregates::{
         AggregateExec, AggregateMode, PhysicalGroupBy,
     };
     use crate::physical_plan::expressions::lit;
-    use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
     use crate::physical_plan::repartition::RepartitionExec;
     use crate::physical_plan::{displayable, Partitioning, Statistics};
 
diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index 6c425b1740..4e456450bc 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -977,12 +977,12 @@ mod tests {
     use super::*;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
+    use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
     use crate::physical_optimizer::sort_enforcement::EnforceSorting;
     use crate::physical_plan::aggregates::{
         AggregateExec, AggregateMode, PhysicalGroupBy,
     };
     use crate::physical_plan::expressions::col;
-    use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
     use crate::physical_plan::joins::{
         utils::JoinOn, HashJoinExec, PartitionMode, SortMergeJoinExec,
     };
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 7db54eee51..fb867ff36c 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -21,11 +21,11 @@ use std::sync::Arc;
 
 use super::optimizer::PhysicalOptimizerRule;
 use crate::config::ConfigOptions;
+use crate::datasource::physical_plan::ParquetExec;
 use crate::error::Result;
 use crate::physical_plan::Partitioning::*;
 use crate::physical_plan::{
-    file_format::ParquetExec, repartition::RepartitionExec,
-    with_new_children_if_necessary, ExecutionPlan,
+    repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan,
 };
 
 /// Optimizer that introduces repartition to introduce more
@@ -325,13 +325,13 @@ mod tests {
     use super::*;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
+    use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
     use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
     use crate::physical_optimizer::sort_enforcement::EnforceSorting;
     use crate::physical_plan::aggregates::{
         AggregateExec, AggregateMode, PhysicalGroupBy,
     };
     use crate::physical_plan::expressions::{col, PhysicalSortExpr};
-    use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
     use crate::physical_plan::filter::FilterExec;
     use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
     use crate::physical_plan::projection::ProjectionExec;
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 22f7d509c8..8398b16c4d 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -967,10 +967,10 @@ mod tests {
     use super::*;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
+    use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
     use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
     use crate::physical_plan::aggregates::PhysicalGroupBy;
     use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
-    use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
     use crate::physical_plan::filter::FilterExec;
     use crate::physical_plan::joins::utils::JoinOn;
     use crate::physical_plan::joins::SortMergeJoinExec;
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index deff619b4f..ce5dc041b9 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -680,7 +680,6 @@ pub mod common;
 pub mod display;
 pub mod empty;
 pub mod explain;
-pub mod file_format;
 pub mod filter;
 pub mod insert;
 pub mod joins;
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs
index f773f3b549..a17b8ba87e 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -289,9 +289,9 @@ pub(crate) fn window_ordering_equivalence(
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::datasource::physical_plan::CsvExec;
     use crate::physical_plan::aggregates::AggregateFunction;
     use crate::physical_plan::expressions::col;
-    use crate::physical_plan::file_format::CsvExec;
     use crate::physical_plan::{collect, ExecutionPlan};
     use crate::prelude::SessionContext;
     use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 6e4d038b4a..82b55063dc 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -21,11 +21,11 @@ use crate::arrow::array::UInt32Array;
 use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
 use crate::datasource::listing::PartitionedFile;
 use crate::datasource::object_store::ObjectStoreUrl;
+use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
 use crate::datasource::{MemTable, TableProvider};
 use crate::error::Result;
 use crate::from_slice::FromSlice;
 use crate::logical_expr::LogicalPlan;
-use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
 use crate::physical_plan::memory::MemoryExec;
 use crate::physical_plan::ExecutionPlan;
 use crate::test::object_store::local_unpartitioned_file;
diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs
index ed65c4122c..d3a1f9c1ef 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -26,11 +26,11 @@ use crate::common::ToDFSchema;
 use crate::config::ConfigOptions;
 use crate::datasource::listing::{ListingTableUrl, PartitionedFile};
 use crate::datasource::object_store::ObjectStoreUrl;
+use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
 use crate::error::Result;
 use crate::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext};
 use crate::physical_expr::create_physical_expr;
 use crate::physical_expr::execution_props::ExecutionProps;
-use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
 use crate::physical_plan::filter::FilterExec;
 use crate::physical_plan::metrics::MetricsSet;
 use crate::physical_plan::ExecutionPlan;
diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs
index 302baca516..7d73b4a618 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -23,7 +23,7 @@ use datafusion::assert_batches_sorted_eq;
 use datafusion::datasource::file_format::parquet::fetch_parquet_metadata;
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::object_store::ObjectStoreUrl;
-use datafusion::physical_plan::file_format::{
+use datafusion::datasource::physical_plan::{
     FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory,
 };
 use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs
index 3fbf413032..031aab9f45 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -29,11 +29,8 @@ use arrow::{
 };
 use chrono::{Datelike, Duration};
 use datafusion::{
-    datasource::{provider_as_source, TableProvider},
-    physical_plan::{
-        accept, file_format::ParquetExec, metrics::MetricsSet, ExecutionPlan,
-        ExecutionPlanVisitor,
-    },
+    datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider},
+    physical_plan::{accept, metrics::MetricsSet, ExecutionPlan, ExecutionPlanVisitor},
     prelude::{ParquetReadOptions, SessionConfig, SessionContext},
 };
 use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index 1c912883c7..3711184ccc 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -21,8 +21,8 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
 use datafusion::execution::context::SessionState;
-use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
 use datafusion::physical_plan::metrics::MetricValue;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs
index e7b1584e21..4d028c6f1b 100644
--- a/datafusion/core/tests/parquet/schema_coercion.rs
+++ b/datafusion/core/tests/parquet/schema_coercion.rs
@@ -21,8 +21,8 @@ use arrow_array::types::Int32Type;
 use arrow_array::{ArrayRef, DictionaryArray, Float32Array, Int64Array, StringArray};
 use arrow_schema::DataType;
 use datafusion::assert_batches_sorted_eq;
+use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
 use datafusion::physical_plan::collect;
-use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
 use datafusion::prelude::SessionContext;
 use datafusion_common::Result;
 use datafusion_common::Statistics;
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index f5cb0e4804..c68b422a4f 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -18,9 +18,9 @@
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
 use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::datasource::physical_plan::FileScanConfig;
 use datafusion::error::Result;
 use datafusion::execution::context::SessionState;
-use datafusion::physical_plan::file_format::FileScanConfig;
 use datafusion::physical_plan::{collect, ExecutionPlan};
 use datafusion::prelude::SessionContext;
 use datafusion_row::reader::read_as_batch;
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index 42672e5049..7a52e5f0d0 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -24,6 +24,7 @@ use chrono::Utc;
 use datafusion::arrow::datatypes::Schema;
 use datafusion::datasource::listing::{FileRange, PartitionedFile};
 use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::datasource::physical_plan::FileScanConfig;
 use datafusion::execution::context::ExecutionProps;
 use datafusion::execution::FunctionRegistry;
 use datafusion::logical_expr::window_function::WindowFunction;
@@ -32,7 +33,6 @@ use datafusion::physical_plan::expressions::{
     date_time_interval_expr, GetIndexedFieldExpr,
 };
 use datafusion::physical_plan::expressions::{in_list, LikeExpr};
-use datafusion::physical_plan::file_format::FileScanConfig;
 use datafusion::physical_plan::{
     expressions::{
         BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, Literal,
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index 7385a4ac21..3c14981355 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -22,6 +22,7 @@ use std::sync::Arc;
 use datafusion::arrow::compute::SortOptions;
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::datasource::file_format::file_type::FileCompressionType;
+use datafusion::datasource::physical_plan::{AvroExec, CsvExec, ParquetExec};
 use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::execution::FunctionRegistry;
 use datafusion::logical_expr::WindowFrame;
@@ -32,7 +33,6 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::physical_plan::explain::ExplainExec;
 use datafusion::physical_plan::expressions::{Column, PhysicalSortExpr};
-use datafusion::physical_plan::file_format::{AvroExec, CsvExec, ParquetExec};
 use datafusion::physical_plan::filter::FilterExec;
 use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
 use datafusion::physical_plan::joins::CrossJoinExec;
@@ -1276,14 +1276,16 @@ mod roundtrip_tests {
             compute::kernels::sort::SortOptions,
             datatypes::{DataType, Field, Schema},
         },
-        datasource::listing::PartitionedFile,
+        datasource::{
+            listing::PartitionedFile,
+            physical_plan::{FileScanConfig, ParquetExec},
+        },
         logical_expr::{JoinType, Operator},
         physical_plan::{
             aggregates::{AggregateExec, AggregateMode},
             empty::EmptyExec,
             expressions::{binary, col, lit, NotExpr},
             expressions::{Avg, Column, DistinctCount, PhysicalSortExpr},
-            file_format::{FileScanConfig, ParquetExec},
             filter::FilterExec,
             joins::{HashJoinExec, PartitionMode},
             limit::{GlobalLimitExec, LocalLimitExec},
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs
index 766906d04a..0910ddaad0 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -32,7 +32,7 @@ use datafusion::physical_plan::{
 };
 
 use datafusion::datasource::listing::{FileRange, PartitionedFile};
-use datafusion::physical_plan::file_format::FileScanConfig;
+use datafusion::datasource::physical_plan::FileScanConfig;
 
 use datafusion::physical_plan::expressions::{Count, DistinctCount, Literal};
 
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs
index 1de3937c59..5d2f22b857 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -20,8 +20,8 @@ use chrono::DateTime;
 use datafusion::arrow::datatypes::Schema;
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
 use datafusion::error::{DataFusionError, Result};
-use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
 use object_store::ObjectMeta;
diff --git a/datafusion/substrait/src/physical_plan/producer.rs b/datafusion/substrait/src/physical_plan/producer.rs
index c8d739ecda..ad87d7afb0 100644
--- a/datafusion/substrait/src/physical_plan/producer.rs
+++ b/datafusion/substrait/src/physical_plan/producer.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::datasource::physical_plan::ParquetExec;
 use datafusion::error::{DataFusionError, Result};
-use datafusion::physical_plan::file_format::ParquetExec;
 use datafusion::physical_plan::{displayable, ExecutionPlan};
 use std::collections::HashMap;
 use substrait::proto::expression::MaskExpression;
diff --git a/datafusion/substrait/tests/roundtrip_physical_plan.rs b/datafusion/substrait/tests/roundtrip_physical_plan.rs
index ab77f19ea0..de549412b6 100644
--- a/datafusion/substrait/tests/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/roundtrip_physical_plan.rs
@@ -20,8 +20,8 @@ mod tests {
     use datafusion::arrow::datatypes::Schema;
     use datafusion::datasource::listing::PartitionedFile;
     use datafusion::datasource::object_store::ObjectStoreUrl;
+    use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
     use datafusion::error::Result;
-    use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
     use datafusion::physical_plan::{displayable, ExecutionPlan};
     use datafusion::prelude::SessionContext;
     use datafusion_substrait::physical_plan::{consumer, producer};