You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/06 21:20:56 UTC
[arrow-datafusion] branch main updated: Move `physical_plan::file_format` to `datasource::plan` (#6516)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 786f222e23 Move `physical_plan::file_format` to `datasource::plan` (#6516)
786f222e23 is described below
commit 786f222e237694a9f7a6e74f274d0990e3474f14
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Jun 6 17:20:51 2023 -0400
Move `physical_plan::file_format` to `datasource::plan` (#6516)
* Move `physical_plan::file_format` to `datasource::plan`
* fix doclinks
---
datafusion-examples/examples/csv_opener.rs | 9 ++++----
datafusion-examples/examples/json_opener.rs | 9 ++++----
datafusion/core/src/dataframe.rs | 2 +-
.../core/src/datasource/file_format/arrow.rs | 2 +-
datafusion/core/src/datasource/file_format/avro.rs | 2 +-
datafusion/core/src/datasource/file_format/csv.rs | 6 +++---
datafusion/core/src/datasource/file_format/json.rs | 2 +-
datafusion/core/src/datasource/file_format/mod.rs | 2 +-
.../core/src/datasource/file_format/parquet.rs | 6 +++---
datafusion/core/src/datasource/listing/mod.rs | 4 ++--
datafusion/core/src/datasource/listing/table.rs | 4 ++--
datafusion/core/src/datasource/mod.rs | 1 +
.../physical_plan}/arrow_file.rs | 4 ++--
.../physical_plan}/avro.rs | 6 +++---
.../physical_plan}/chunked_store.rs | 0
.../physical_plan}/csv.rs | 10 ++++-----
.../physical_plan}/file_stream.rs | 6 +++---
.../physical_plan}/json.rs | 10 ++++-----
.../physical_plan}/mod.rs | 0
.../physical_plan}/parquet.rs | 24 +++++++++++-----------
.../physical_plan}/parquet/metrics.rs | 0
.../physical_plan}/parquet/page_filter.rs | 4 ++--
.../physical_plan}/parquet/row_filter.rs | 0
.../physical_plan}/parquet/row_groups.rs | 2 +-
datafusion/core/src/execution/context.rs | 2 +-
.../combine_partial_final_agg.rs | 2 +-
.../src/physical_optimizer/dist_enforcement.rs | 2 +-
.../core/src/physical_optimizer/repartition.rs | 6 +++---
.../src/physical_optimizer/sort_enforcement.rs | 2 +-
datafusion/core/src/physical_plan/mod.rs | 1 -
datafusion/core/src/physical_plan/windows/mod.rs | 2 +-
datafusion/core/src/test/mod.rs | 2 +-
datafusion/core/src/test_util/parquet.rs | 2 +-
datafusion/core/tests/parquet/custom_reader.rs | 2 +-
datafusion/core/tests/parquet/mod.rs | 7 ++-----
datafusion/core/tests/parquet/page_pruning.rs | 2 +-
datafusion/core/tests/parquet/schema_coercion.rs | 2 +-
datafusion/core/tests/row.rs | 2 +-
datafusion/proto/src/physical_plan/from_proto.rs | 2 +-
datafusion/proto/src/physical_plan/mod.rs | 8 +++++---
datafusion/proto/src/physical_plan/to_proto.rs | 2 +-
datafusion/substrait/src/physical_plan/consumer.rs | 2 +-
datafusion/substrait/src/physical_plan/producer.rs | 2 +-
.../substrait/tests/roundtrip_physical_plan.rs | 2 +-
44 files changed, 83 insertions(+), 86 deletions(-)
diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs
index 351f95cd2c..f2982522a7 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -20,14 +20,13 @@ use std::{sync::Arc, vec};
use datafusion::{
assert_batches_eq,
datasource::{
- file_format::file_type::FileCompressionType, listing::PartitionedFile,
+ file_format::file_type::FileCompressionType,
+ listing::PartitionedFile,
object_store::ObjectStoreUrl,
+ physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
},
error::Result,
- physical_plan::{
- file_format::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
- metrics::ExecutionPlanMetricsSet,
- },
+ physical_plan::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};
use futures::StreamExt;
diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs
index 6c7d5ae3d7..39013455da 100644
--- a/datafusion-examples/examples/json_opener.rs
+++ b/datafusion-examples/examples/json_opener.rs
@@ -21,14 +21,13 @@ use arrow_schema::{DataType, Field, Schema};
use datafusion::{
assert_batches_eq,
datasource::{
- file_format::file_type::FileCompressionType, listing::PartitionedFile,
+ file_format::file_type::FileCompressionType,
+ listing::PartitionedFile,
object_store::ObjectStoreUrl,
+ physical_plan::{FileScanConfig, FileStream, JsonOpener},
},
error::Result,
- physical_plan::{
- file_format::{FileScanConfig, FileStream, JsonOpener},
- metrics::ExecutionPlanMetricsSet,
- },
+ physical_plan::metrics::ExecutionPlanMetricsSet,
};
use futures::StreamExt;
use object_store::ObjectStore;
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 760ce7c0f5..1b285107b3 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -38,6 +38,7 @@ use crate::arrow::datatypes::Schema;
use crate::arrow::datatypes::SchemaRef;
use crate::arrow::record_batch::RecordBatch;
use crate::arrow::util::pretty;
+use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::datasource::{provider_as_source, MemTable, TableProvider};
use crate::error::Result;
use crate::execution::{
@@ -48,7 +49,6 @@ use crate::logical_expr::{
col, utils::find_window_exprs, Expr, JoinType, LogicalPlan, LogicalPlanBuilder,
Partitioning, TableType,
};
-use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{collect, collect_partitioned};
use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs
index 2a27468f45..2b3ef7ee4e 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -20,9 +20,9 @@
//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
use crate::datasource::file_format::FileFormat;
+use crate::datasource::physical_plan::{ArrowExec, FileScanConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
-use crate::physical_plan::file_format::{ArrowExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use arrow::ipc::reader::FileReader;
use arrow_schema::{Schema, SchemaRef};
diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs
index 374ef18970..ab9f1f5dd0 100644
--- a/datafusion/core/src/datasource/file_format/avro.rs
+++ b/datafusion/core/src/datasource/file_format/avro.rs
@@ -28,9 +28,9 @@ use object_store::{GetResult, ObjectMeta, ObjectStore};
use super::FileFormat;
use crate::avro_to_arrow::read_avro_schema_from_reader;
+use crate::datasource::physical_plan::{AvroExec, FileScanConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
-use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs
index 69d1c0089f..01bf76ccf4 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -45,11 +45,11 @@ use crate::datasource::file_format::{
AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart,
DEFAULT_SCHEMA_INFER_MAX_RECORD,
};
-use crate::error::Result;
-use crate::execution::context::SessionState;
-use crate::physical_plan::file_format::{
+use crate::datasource::physical_plan::{
CsvExec, FileGroupDisplay, FileMeta, FileScanConfig, FileSinkConfig,
};
+use crate::error::Result;
+use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, InsertExec};
use crate::physical_plan::Statistics;
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs
index 21cd22f070..6247e85ba8 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -36,9 +36,9 @@ use super::FileFormat;
use super::FileScanConfig;
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
+use crate::datasource::physical_plan::NdJsonExec;
use crate::error::Result;
use crate::execution::context::SessionState;
-use crate::physical_plan::file_format::NdJsonExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index 71bd8f1c07..a6848b0d12 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -36,9 +36,9 @@ use std::task::{Context, Poll};
use std::{fmt, mem};
use crate::arrow::datatypes::SchemaRef;
+use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
-use crate::physical_plan::file_format::{FileScanConfig, FileSinkConfig};
use crate::physical_plan::{ExecutionPlan, Statistics};
use arrow_array::RecordBatch;
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 6957f367c9..875c58ae44 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -42,11 +42,11 @@ use crate::arrow::array::{
use crate::arrow::datatypes::DataType;
use crate::config::ConfigOptions;
+use crate::datasource::physical_plan::{ParquetExec, SchemaAdapter};
use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics};
/// The default file extension of parquet files
@@ -379,7 +379,7 @@ fn summarize_min_max(
/// This component is a subject to **change** in near future and is exposed for low level integrations
/// through [`ParquetFileReaderFactory`].
///
-/// [`ParquetFileReaderFactory`]: crate::physical_plan::file_format::ParquetFileReaderFactory
+/// [`ParquetFileReaderFactory`]: crate::datasource::physical_plan::ParquetFileReaderFactory
pub async fn fetch_parquet_metadata(
store: &dyn ObjectStore,
meta: &ObjectMeta,
@@ -623,7 +623,7 @@ mod tests {
use super::*;
use crate::datasource::file_format::parquet::test_util::store_parquet;
- use crate::physical_plan::file_format::get_scan_files;
+ use crate::datasource::physical_plan::get_scan_files;
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs
index a434a081e8..427cfc8501 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -62,8 +62,8 @@ pub struct PartitionedFile {
/// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type.
///
///
- /// [`wrap_partition_type_in_dict`]: crate::physical_plan::file_format::wrap_partition_type_in_dict
- /// [`wrap_partition_value_in_dict`]: crate::physical_plan::file_format::wrap_partition_value_in_dict
+ /// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict
+ /// [`wrap_partition_value_in_dict`]: crate::datasource::physical_plan::wrap_partition_value_in_dict
/// [`table_partition_cols`]: table::ListingOptions::table_partition_cols
pub partition_values: Vec<ScalarValue>,
/// An optional file range for a more fine-grained parallel execution
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index fd316a74b2..b11fa8b063 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -33,6 +33,7 @@ use object_store::path::Path;
use object_store::ObjectMeta;
use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
+use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::datasource::{
file_format::{
arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
@@ -44,7 +45,6 @@ use crate::datasource::{
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::physical_plan;
-use crate::physical_plan::file_format::{FileScanConfig, FileSinkConfig};
use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
@@ -357,7 +357,7 @@ impl ListingOptions {
/// ```
///
/// [Hive Partitioning]: https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.3/bk_system-admin-guide/content/hive_partitioned_tables.html
- /// [`wrap_partition_type_in_dict`]: crate::physical_plan::file_format::wrap_partition_type_in_dict
+ /// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict
pub fn with_table_partition_cols(
mut self,
table_partition_cols: Vec<(String, DataType)>,
diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs
index dba4112b1e..683afb7902 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -28,6 +28,7 @@ pub mod file_format;
pub mod listing;
pub mod listing_table_factory;
pub mod memory;
+pub mod physical_plan;
pub mod streaming;
pub mod view;
diff --git a/datafusion/core/src/physical_plan/file_format/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/file_format/arrow_file.rs
rename to datafusion/core/src/datasource/physical_plan/arrow_file.rs
index 72a6d0a0b4..43074ccb77 100644
--- a/datafusion/core/src/physical_plan/file_format/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -16,10 +16,10 @@
// under the License.
//! Execution plan for reading Arrow files
-use crate::error::Result;
-use crate::physical_plan::file_format::{
+use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, FileScanConfig,
};
+use crate::error::Result;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs
similarity index 98%
rename from datafusion/core/src/physical_plan/file_format/avro.rs
rename to datafusion/core/src/datasource/physical_plan/avro.rs
index 9adf492d7d..704a97ba7e 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -165,8 +165,8 @@ impl ExecutionPlan for AvroExec {
#[cfg(feature = "avro")]
mod private {
use super::*;
- use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener};
- use crate::physical_plan::file_format::FileMeta;
+ use crate::datasource::physical_plan::file_stream::{FileOpenFuture, FileOpener};
+ use crate::datasource::physical_plan::FileMeta;
use bytes::Buf;
use futures::StreamExt;
use object_store::{GetResult, ObjectStore};
@@ -222,7 +222,7 @@ mod tests {
use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
- use crate::physical_plan::file_format::chunked_store::ChunkedStore;
+ use crate::datasource::physical_plan::chunked_store::ChunkedStore;
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
use crate::test::object_store::local_unpartitioned_file;
diff --git a/datafusion/core/src/physical_plan/file_format/chunked_store.rs b/datafusion/core/src/datasource/physical_plan/chunked_store.rs
similarity index 100%
rename from datafusion/core/src/physical_plan/file_format/chunked_store.rs
rename to datafusion/core/src/datasource/physical_plan/chunked_store.rs
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/file_format/csv.rs
rename to datafusion/core/src/datasource/physical_plan/csv.rs
index cbdd626f0e..d2c76ecaf5 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -18,13 +18,13 @@
//! Execution plan for reading CSV files
use crate::datasource::file_format::file_type::FileCompressionType;
+use crate::datasource::physical_plan::file_stream::{
+ FileOpenFuture, FileOpener, FileStream,
+};
+use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::physical_plan::file_format::file_stream::{
- FileOpenFuture, FileOpener, FileStream,
-};
-use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
@@ -369,7 +369,7 @@ pub async fn plan_to_csv(
mod tests {
use super::*;
use crate::datasource::file_format::file_type::FileType;
- use crate::physical_plan::file_format::chunked_store::ChunkedStore;
+ use crate::datasource::physical_plan::chunked_store::ChunkedStore;
use crate::prelude::*;
use crate::test::{partitioned_csv_config, partitioned_file_groups};
use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data};
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/file_format/file_stream.rs
rename to datafusion/core/src/datasource/physical_plan/file_stream.rs
index 0d1dc1bee2..2c4437de0a 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -28,10 +28,10 @@ use std::task::{Context, Poll};
use std::time::Instant;
use crate::datasource::listing::PartitionedFile;
-use crate::error::Result;
-use crate::physical_plan::file_format::{
+use crate::datasource::physical_plan::{
FileMeta, FileScanConfig, PartitionColumnProjector,
};
+use crate::error::Result;
use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
};
@@ -524,7 +524,7 @@ mod tests {
use super::*;
use crate::datasource::file_format::BatchSerializer;
use crate::datasource::object_store::ObjectStoreUrl;
- use crate::physical_plan::file_format::FileMeta;
+ use crate::datasource::physical_plan::FileMeta;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::prelude::SessionContext;
use crate::{
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/file_format/json.rs
rename to datafusion/core/src/datasource/physical_plan/json.rs
index 10f249d4e7..8340c282a0 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -17,13 +17,13 @@
//! Execution plan for reading line-delimited JSON files
use crate::datasource::file_format::file_type::FileCompressionType;
+use crate::datasource::physical_plan::file_stream::{
+ FileOpenFuture, FileOpener, FileStream,
+};
+use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::physical_plan::file_format::file_stream::{
- FileOpenFuture, FileOpener, FileStream,
-};
-use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
@@ -308,8 +308,8 @@ mod tests {
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
+ use crate::datasource::physical_plan::chunked_store::ChunkedStore;
use crate::execution::context::SessionState;
- use crate::physical_plan::file_format::chunked_store::ChunkedStore;
use crate::prelude::NdJsonReadOptions;
use crate::prelude::*;
use crate::test::partitioned_file_groups;
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs
similarity index 100%
rename from datafusion/core/src/physical_plan/file_format/mod.rs
rename to datafusion/core/src/datasource/physical_plan/mod.rs
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/file_format/parquet.rs
rename to datafusion/core/src/datasource/physical_plan/parquet.rs
index a0c7402cc4..383a2066fc 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -17,18 +17,12 @@
//! Execution plan for reading Parquet files
-use fmt::Debug;
-use std::any::Any;
-use std::cmp::min;
-use std::fmt;
-use std::fs;
-use std::ops::Range;
-use std::sync::Arc;
-
-use crate::physical_plan::file_format::file_stream::{
+use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
-use crate::physical_plan::file_format::parquet::page_filter::PagePruningPredicate;
+use crate::datasource::physical_plan::{
+ parquet::page_filter::PagePruningPredicate, FileMeta, FileScanConfig, SchemaAdapter,
+};
use crate::{
config::ConfigOptions,
datasource::listing::FileRange,
@@ -37,13 +31,19 @@ use crate::{
physical_optimizer::pruning::PruningPredicate,
physical_plan::{
common::AbortOnDropSingle,
- expressions::PhysicalSortExpr,
- file_format::{FileMeta, FileScanConfig, SchemaAdapter},
metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
},
};
+use datafusion_physical_expr::PhysicalSortExpr;
+use fmt::Debug;
+use std::any::Any;
+use std::cmp::min;
+use std::fmt;
+use std::fs;
+use std::ops::Range;
+use std::sync::Arc;
use arrow::datatypes::{DataType, SchemaRef};
use arrow::error::ArrowError;
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs b/datafusion/core/src/datasource/physical_plan/parquet/metrics.rs
similarity index 100%
rename from datafusion/core/src/physical_plan/file_format/parquet/metrics.rs
rename to datafusion/core/src/datasource/physical_plan/parquet/metrics.rs
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
rename to datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
index 00e55c41ad..c046de73d7 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
@@ -39,10 +39,10 @@ use parquet::{
};
use std::sync::Arc;
-use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
-use crate::physical_plan::file_format::parquet::{
+use crate::datasource::physical_plan::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};
+use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use super::metrics::ParquetFileMetrics;
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
similarity index 100%
rename from datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
rename to datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
rename to datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 0cbb5d9465..07ef28304c 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -27,7 +27,7 @@ use parquet::file::{
metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
};
-use crate::physical_plan::file_format::parquet::{
+use crate::datasource::physical_plan::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};
use crate::{
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 078093189c..7f74e28511 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -80,9 +80,9 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::repartition::Repartition;
use crate::config::ConfigOptions;
+use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
-use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udaf::AggregateUDF;
use crate::physical_plan::udf::ScalarUDF;
diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index fdef7d54b8..3ec9e9bbd0 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -207,11 +207,11 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
+ use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use crate::physical_plan::expressions::lit;
- use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::{displayable, Partitioning, Statistics};
diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index 6c425b1740..4e456450bc 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -977,12 +977,12 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
+ use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use crate::physical_plan::expressions::col;
- use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::joins::{
utils::JoinOn, HashJoinExec, PartitionMode, SortMergeJoinExec,
};
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 7db54eee51..fb867ff36c 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -21,11 +21,11 @@ use std::sync::Arc;
use super::optimizer::PhysicalOptimizerRule;
use crate::config::ConfigOptions;
+use crate::datasource::physical_plan::ParquetExec;
use crate::error::Result;
use crate::physical_plan::Partitioning::*;
use crate::physical_plan::{
- file_format::ParquetExec, repartition::RepartitionExec,
- with_new_children_if_necessary, ExecutionPlan,
+ repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan,
};
/// Optimizer that introduces repartition to introduce more
@@ -325,13 +325,13 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
+ use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use crate::physical_plan::expressions::{col, PhysicalSortExpr};
- use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::projection::ProjectionExec;
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 22f7d509c8..8398b16c4d 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -967,10 +967,10 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
+ use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_plan::aggregates::PhysicalGroupBy;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
- use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::utils::JoinOn;
use crate::physical_plan::joins::SortMergeJoinExec;
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index deff619b4f..ce5dc041b9 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -680,7 +680,6 @@ pub mod common;
pub mod display;
pub mod empty;
pub mod explain;
-pub mod file_format;
pub mod filter;
pub mod insert;
pub mod joins;
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs
index f773f3b549..a17b8ba87e 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -289,9 +289,9 @@ pub(crate) fn window_ordering_equivalence(
#[cfg(test)]
mod tests {
use super::*;
+ use crate::datasource::physical_plan::CsvExec;
use crate::physical_plan::aggregates::AggregateFunction;
use crate::physical_plan::expressions::col;
- use crate::physical_plan::file_format::CsvExec;
use crate::physical_plan::{collect, ExecutionPlan};
use crate::prelude::SessionContext;
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 6e4d038b4a..82b55063dc 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -21,11 +21,11 @@ use crate::arrow::array::UInt32Array;
use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
+use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
use crate::datasource::{MemTable, TableProvider};
use crate::error::Result;
use crate::from_slice::FromSlice;
use crate::logical_expr::LogicalPlan;
-use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::ExecutionPlan;
use crate::test::object_store::local_unpartitioned_file;
diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs
index ed65c4122c..d3a1f9c1ef 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -26,11 +26,11 @@ use crate::common::ToDFSchema;
use crate::config::ConfigOptions;
use crate::datasource::listing::{ListingTableUrl, PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
+use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
use crate::error::Result;
use crate::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext};
use crate::physical_expr::create_physical_expr;
use crate::physical_expr::execution_props::ExecutionProps;
-use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::metrics::MetricsSet;
use crate::physical_plan::ExecutionPlan;
diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs
index 302baca516..7d73b4a618 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -23,7 +23,7 @@ use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::file_format::parquet::fetch_parquet_metadata;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
-use datafusion::physical_plan::file_format::{
+use datafusion::datasource::physical_plan::{
FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory,
};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs
index 3fbf413032..031aab9f45 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -29,11 +29,8 @@ use arrow::{
};
use chrono::{Datelike, Duration};
use datafusion::{
- datasource::{provider_as_source, TableProvider},
- physical_plan::{
- accept, file_format::ParquetExec, metrics::MetricsSet, ExecutionPlan,
- ExecutionPlanVisitor,
- },
+ datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider},
+ physical_plan::{accept, metrics::MetricsSet, ExecutionPlan, ExecutionPlanVisitor},
prelude::{ParquetReadOptions, SessionConfig, SessionContext},
};
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index 1c912883c7..3711184ccc 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -21,8 +21,8 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::execution::context::SessionState;
-use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs
index e7b1584e21..4d028c6f1b 100644
--- a/datafusion/core/tests/parquet/schema_coercion.rs
+++ b/datafusion/core/tests/parquet/schema_coercion.rs
@@ -21,8 +21,8 @@ use arrow_array::types::Int32Type;
use arrow_array::{ArrayRef, DictionaryArray, Float32Array, Int64Array, StringArray};
use arrow_schema::DataType;
use datafusion::assert_batches_sorted_eq;
+use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::collect;
-use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::prelude::SessionContext;
use datafusion_common::Result;
use datafusion_common::Statistics;
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index f5cb0e4804..c68b422a4f 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -18,9 +18,9 @@
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
-use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_row::reader::read_as_batch;
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index 42672e5049..7a52e5f0d0 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -24,6 +24,7 @@ use chrono::Utc;
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::execution::context::ExecutionProps;
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::window_function::WindowFunction;
@@ -32,7 +33,6 @@ use datafusion::physical_plan::expressions::{
date_time_interval_expr, GetIndexedFieldExpr,
};
use datafusion::physical_plan::expressions::{in_list, LikeExpr};
-use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::{
expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, Literal,
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index 7385a4ac21..3c14981355 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -22,6 +22,7 @@ use std::sync::Arc;
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::datasource::file_format::file_type::FileCompressionType;
+use datafusion::datasource::physical_plan::{AvroExec, CsvExec, ParquetExec};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::WindowFrame;
@@ -32,7 +33,6 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::explain::ExplainExec;
use datafusion::physical_plan::expressions::{Column, PhysicalSortExpr};
-use datafusion::physical_plan::file_format::{AvroExec, CsvExec, ParquetExec};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
use datafusion::physical_plan::joins::CrossJoinExec;
@@ -1276,14 +1276,16 @@ mod roundtrip_tests {
compute::kernels::sort::SortOptions,
datatypes::{DataType, Field, Schema},
},
- datasource::listing::PartitionedFile,
+ datasource::{
+ listing::PartitionedFile,
+ physical_plan::{FileScanConfig, ParquetExec},
+ },
logical_expr::{JoinType, Operator},
physical_plan::{
aggregates::{AggregateExec, AggregateMode},
empty::EmptyExec,
expressions::{binary, col, lit, NotExpr},
expressions::{Avg, Column, DistinctCount, PhysicalSortExpr},
- file_format::{FileScanConfig, ParquetExec},
filter::FilterExec,
joins::{HashJoinExec, PartitionMode},
limit::{GlobalLimitExec, LocalLimitExec},
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs
index 766906d04a..0910ddaad0 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -32,7 +32,7 @@ use datafusion::physical_plan::{
};
use datafusion::datasource::listing::{FileRange, PartitionedFile};
-use datafusion::physical_plan::file_format::FileScanConfig;
+use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::physical_plan::expressions::{Count, DistinctCount, Literal};
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs
index 1de3937c59..5d2f22b857 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -20,8 +20,8 @@ use chrono::DateTime;
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::error::{DataFusionError, Result};
-use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use object_store::ObjectMeta;
diff --git a/datafusion/substrait/src/physical_plan/producer.rs b/datafusion/substrait/src/physical_plan/producer.rs
index c8d739ecda..ad87d7afb0 100644
--- a/datafusion/substrait/src/physical_plan/producer.rs
+++ b/datafusion/substrait/src/physical_plan/producer.rs
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::error::{DataFusionError, Result};
-use datafusion::physical_plan::file_format::ParquetExec;
use datafusion::physical_plan::{displayable, ExecutionPlan};
use std::collections::HashMap;
use substrait::proto::expression::MaskExpression;
diff --git a/datafusion/substrait/tests/roundtrip_physical_plan.rs b/datafusion/substrait/tests/roundtrip_physical_plan.rs
index ab77f19ea0..de549412b6 100644
--- a/datafusion/substrait/tests/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/roundtrip_physical_plan.rs
@@ -20,8 +20,8 @@ mod tests {
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
+ use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::error::Result;
- use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::{displayable, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_substrait::physical_plan::{consumer, producer};