You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/12/21 13:27:21 UTC
(arrow-datafusion) branch main updated: Remove unbounded_input from FileSinkOptions (#8605)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 96c5b8afcd Remove unbounded_input from FileSinkOptions (#8605)
96c5b8afcd is described below
commit 96c5b8afcda12f95ce6852102c5387021f907ca6
Author: Devin D'Angelo <de...@gmail.com>
AuthorDate: Thu Dec 21 08:27:12 2023 -0500
Remove unbounded_input from FileSinkOptions (#8605)
* regen protoc
* remove proto flag
---
.../src/datasource/file_format/write/orchestration.rs | 17 ++---------------
datafusion/core/src/datasource/listing/table.rs | 9 +--------
datafusion/core/src/datasource/physical_plan/mod.rs | 18 ------------------
datafusion/core/src/physical_planner.rs | 1 -
datafusion/proto/proto/datafusion.proto | 5 ++---
datafusion/proto/src/generated/pbjson.rs | 18 ------------------
datafusion/proto/src/generated/prost.rs | 4 +---
datafusion/proto/src/physical_plan/from_proto.rs | 1 -
datafusion/proto/src/physical_plan/to_proto.rs | 1 -
.../proto/tests/cases/roundtrip_physical_plan.rs | 1 -
10 files changed, 6 insertions(+), 69 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs
index 2ae6b70ed1..120e27ecf6 100644
--- a/datafusion/core/src/datasource/file_format/write/orchestration.rs
+++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs
@@ -52,7 +52,6 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
mut serializer: Box<dyn BatchSerializer>,
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
- unbounded_input: bool,
) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
let (tx, mut rx) =
mpsc::channel::<JoinHandle<Result<(usize, Bytes), DataFusionError>>>(100);
@@ -71,9 +70,6 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
"Unknown error writing to object store".into(),
)
})?;
- if unbounded_input {
- tokio::task::yield_now().await;
- }
}
Err(_) => {
return Err(DataFusionError::Internal(
@@ -140,7 +136,6 @@ type FileWriteBundle = (Receiver<RecordBatch>, SerializerType, WriterType);
pub(crate) async fn stateless_serialize_and_write_files(
mut rx: Receiver<FileWriteBundle>,
tx: tokio::sync::oneshot::Sender<u64>,
- unbounded_input: bool,
) -> Result<()> {
let mut row_count = 0;
// tracks if any writers encountered an error triggering the need to abort
@@ -153,13 +148,7 @@ pub(crate) async fn stateless_serialize_and_write_files(
let mut join_set = JoinSet::new();
while let Some((data_rx, serializer, writer)) = rx.recv().await {
join_set.spawn(async move {
- serialize_rb_stream_to_object_store(
- data_rx,
- serializer,
- writer,
- unbounded_input,
- )
- .await
+ serialize_rb_stream_to_object_store(data_rx, serializer, writer).await
});
}
let mut finished_writers = Vec::new();
@@ -241,7 +230,6 @@ pub(crate) async fn stateless_multipart_put(
let single_file_output = config.single_file_output;
let base_output_path = &config.table_paths[0];
- let unbounded_input = config.unbounded_input;
let part_cols = if !config.table_partition_cols.is_empty() {
Some(config.table_partition_cols.clone())
} else {
@@ -266,8 +254,7 @@ pub(crate) async fn stateless_multipart_put(
let (tx_file_bundle, rx_file_bundle) = tokio::sync::mpsc::channel(rb_buffer_size / 2);
let (tx_row_cnt, rx_row_cnt) = tokio::sync::oneshot::channel();
let write_coordinater_task = tokio::spawn(async move {
- stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, unbounded_input)
- .await
+ stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt).await
});
while let Some((location, rb_stream)) = file_stream_rx.recv().await {
let serializer = get_serializer();
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 4c13d9d443..21d43dcd56 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -38,7 +38,7 @@ use crate::datasource::{
},
get_statistics_with_limit,
listing::ListingTableUrl,
- physical_plan::{is_plan_streaming, FileScanConfig, FileSinkConfig},
+ physical_plan::{FileScanConfig, FileSinkConfig},
TableProvider, TableType,
};
use crate::{
@@ -790,13 +790,6 @@ impl TableProvider for ListingTable {
file_groups,
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
- // A plan can produce finite number of rows even if it has unbounded sources, like LIMIT
- // queries. Thus, we can check if the plan is streaming to ensure file sink input is
- // unbounded. When `unbounded_input` flag is `true` for sink, we occasionally call `yield_now`
- // to consume data at the input. When `unbounded_input` flag is `false` (e.g non-streaming data),
- // all of the data at the input is sink after execution finishes. See discussion for rationale:
- // https://github.com/apache/arrow-datafusion/pull/7610#issuecomment-1728979918
- unbounded_input: is_plan_streaming(&input)?,
single_file_output: self.options.single_file,
overwrite,
file_type_writer_options,
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs
index 9d1c373aee..4a6ebeab09 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -69,7 +69,6 @@ use arrow::{
use datafusion_common::{file_options::FileTypeWriterOptions, plan_err};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;
-use datafusion_physical_plan::ExecutionPlan;
use log::debug;
use object_store::path::Path;
@@ -93,8 +92,6 @@ pub struct FileSinkConfig {
/// regardless of input partitioning. Otherwise, each table path is assumed to be a directory
/// to which each output partition is written to its own output file.
pub single_file_output: bool,
- /// If input is unbounded, tokio tasks need to yield to not block execution forever
- pub unbounded_input: bool,
/// Controls whether existing data should be overwritten by this sink
pub overwrite: bool,
/// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size
@@ -510,21 +507,6 @@ fn get_projected_output_ordering(
all_orderings
}
-// Get output (un)boundedness information for the given `plan`.
-pub(crate) fn is_plan_streaming(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
- let result = if plan.children().is_empty() {
- plan.unbounded_output(&[])
- } else {
- let children_unbounded_output = plan
- .children()
- .iter()
- .map(is_plan_streaming)
- .collect::<Result<Vec<_>>>();
- plan.unbounded_output(&children_unbounded_output?)
- };
- result
-}
-
#[cfg(test)]
mod tests {
use arrow_array::cast::AsArray;
diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs
index e5816eb49e..31d50be10f 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -593,7 +593,6 @@ impl DefaultPhysicalPlanner {
file_groups: vec![],
output_schema: Arc::new(schema),
table_partition_cols: vec![],
- unbounded_input: false,
single_file_output: *single_file_output,
overwrite: false,
file_type_writer_options
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 76fe449d2f..cc802ee957 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1201,9 +1201,8 @@ message FileSinkConfig {
Schema output_schema = 4;
repeated PartitionColumn table_partition_cols = 5;
bool single_file_output = 7;
- bool unbounded_input = 8;
- bool overwrite = 9;
- FileTypeWriterOptions file_type_writer_options = 10;
+ bool overwrite = 8;
+ FileTypeWriterOptions file_type_writer_options = 9;
}
message JsonSink {
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 0671757ad4..fb3a3ad91d 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -7500,9 +7500,6 @@ impl serde::Serialize for FileSinkConfig {
if self.single_file_output {
len += 1;
}
- if self.unbounded_input {
- len += 1;
- }
if self.overwrite {
len += 1;
}
@@ -7528,9 +7525,6 @@ impl serde::Serialize for FileSinkConfig {
if self.single_file_output {
struct_ser.serialize_field("singleFileOutput", &self.single_file_output)?;
}
- if self.unbounded_input {
- struct_ser.serialize_field("unboundedInput", &self.unbounded_input)?;
- }
if self.overwrite {
struct_ser.serialize_field("overwrite", &self.overwrite)?;
}
@@ -7559,8 +7553,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
"tablePartitionCols",
"single_file_output",
"singleFileOutput",
- "unbounded_input",
- "unboundedInput",
"overwrite",
"file_type_writer_options",
"fileTypeWriterOptions",
@@ -7574,7 +7566,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
OutputSchema,
TablePartitionCols,
SingleFileOutput,
- UnboundedInput,
Overwrite,
FileTypeWriterOptions,
}
@@ -7604,7 +7595,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
"outputSchema" | "output_schema" => Ok(GeneratedField::OutputSchema),
"tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols),
"singleFileOutput" | "single_file_output" => Ok(GeneratedField::SingleFileOutput),
- "unboundedInput" | "unbounded_input" => Ok(GeneratedField::UnboundedInput),
"overwrite" => Ok(GeneratedField::Overwrite),
"fileTypeWriterOptions" | "file_type_writer_options" => Ok(GeneratedField::FileTypeWriterOptions),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
@@ -7632,7 +7622,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
let mut output_schema__ = None;
let mut table_partition_cols__ = None;
let mut single_file_output__ = None;
- let mut unbounded_input__ = None;
let mut overwrite__ = None;
let mut file_type_writer_options__ = None;
while let Some(k) = map_.next_key()? {
@@ -7673,12 +7662,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
}
single_file_output__ = Some(map_.next_value()?);
}
- GeneratedField::UnboundedInput => {
- if unbounded_input__.is_some() {
- return Err(serde::de::Error::duplicate_field("unboundedInput"));
- }
- unbounded_input__ = Some(map_.next_value()?);
- }
GeneratedField::Overwrite => {
if overwrite__.is_some() {
return Err(serde::de::Error::duplicate_field("overwrite"));
@@ -7700,7 +7683,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
output_schema: output_schema__,
table_partition_cols: table_partition_cols__.unwrap_or_default(),
single_file_output: single_file_output__.unwrap_or_default(),
- unbounded_input: unbounded_input__.unwrap_or_default(),
overwrite: overwrite__.unwrap_or_default(),
file_type_writer_options: file_type_writer_options__,
})
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 771bd715d3..9030e90a24 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1635,10 +1635,8 @@ pub struct FileSinkConfig {
#[prost(bool, tag = "7")]
pub single_file_output: bool,
#[prost(bool, tag = "8")]
- pub unbounded_input: bool,
- #[prost(bool, tag = "9")]
pub overwrite: bool,
- #[prost(message, optional, tag = "10")]
+ #[prost(message, optional, tag = "9")]
pub file_type_writer_options: ::core::option::Option<FileTypeWriterOptions>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index 5c0ef615ca..65f9f139a8 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -739,7 +739,6 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
output_schema: Arc::new(convert_required!(conf.output_schema)?),
table_partition_cols,
single_file_output: conf.single_file_output,
- unbounded_input: conf.unbounded_input,
overwrite: conf.overwrite,
file_type_writer_options: convert_required!(conf.file_type_writer_options)?,
})
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs
index ea00b726b9..e9cdb34cf1 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -846,7 +846,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
output_schema: Some(conf.output_schema.as_ref().try_into()?),
table_partition_cols,
single_file_output: conf.single_file_output,
- unbounded_input: conf.unbounded_input,
overwrite: conf.overwrite,
file_type_writer_options: Some(file_type_writer_options.try_into()?),
})
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 9a9827f2a0..2eb04ab6cb 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -733,7 +733,6 @@ fn roundtrip_json_sink() -> Result<()> {
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
single_file_output: true,
- unbounded_input: false,
overwrite: true,
file_type_writer_options: FileTypeWriterOptions::JSON(JsonWriterOptions::new(
CompressionTypeVariant::UNCOMPRESSED,