You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/12/21 13:27:21 UTC

(arrow-datafusion) branch main updated: Remove unbounded_input from FileSinkOptions (#8605)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 96c5b8afcd Remove unbounded_input from FileSinkOptions (#8605)
96c5b8afcd is described below

commit 96c5b8afcda12f95ce6852102c5387021f907ca6
Author: Devin D'Angelo <de...@gmail.com>
AuthorDate: Thu Dec 21 08:27:12 2023 -0500

    Remove unbounded_input from FileSinkOptions (#8605)
    
    * regen protoc
    
    * remove proto flag
---
 .../src/datasource/file_format/write/orchestration.rs  | 17 ++---------------
 datafusion/core/src/datasource/listing/table.rs        |  9 +--------
 datafusion/core/src/datasource/physical_plan/mod.rs    | 18 ------------------
 datafusion/core/src/physical_planner.rs                |  1 -
 datafusion/proto/proto/datafusion.proto                |  5 ++---
 datafusion/proto/src/generated/pbjson.rs               | 18 ------------------
 datafusion/proto/src/generated/prost.rs                |  4 +---
 datafusion/proto/src/physical_plan/from_proto.rs       |  1 -
 datafusion/proto/src/physical_plan/to_proto.rs         |  1 -
 .../proto/tests/cases/roundtrip_physical_plan.rs       |  1 -
 10 files changed, 6 insertions(+), 69 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs
index 2ae6b70ed1..120e27ecf6 100644
--- a/datafusion/core/src/datasource/file_format/write/orchestration.rs
+++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs
@@ -52,7 +52,6 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
     mut data_rx: Receiver<RecordBatch>,
     mut serializer: Box<dyn BatchSerializer>,
     mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
-    unbounded_input: bool,
 ) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
     let (tx, mut rx) =
         mpsc::channel::<JoinHandle<Result<(usize, Bytes), DataFusionError>>>(100);
@@ -71,9 +70,6 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
                             "Unknown error writing to object store".into(),
                         )
                     })?;
-                    if unbounded_input {
-                        tokio::task::yield_now().await;
-                    }
                 }
                 Err(_) => {
                     return Err(DataFusionError::Internal(
@@ -140,7 +136,6 @@ type FileWriteBundle = (Receiver<RecordBatch>, SerializerType, WriterType);
 pub(crate) async fn stateless_serialize_and_write_files(
     mut rx: Receiver<FileWriteBundle>,
     tx: tokio::sync::oneshot::Sender<u64>,
-    unbounded_input: bool,
 ) -> Result<()> {
     let mut row_count = 0;
     // tracks if any writers encountered an error triggering the need to abort
@@ -153,13 +148,7 @@ pub(crate) async fn stateless_serialize_and_write_files(
     let mut join_set = JoinSet::new();
     while let Some((data_rx, serializer, writer)) = rx.recv().await {
         join_set.spawn(async move {
-            serialize_rb_stream_to_object_store(
-                data_rx,
-                serializer,
-                writer,
-                unbounded_input,
-            )
-            .await
+            serialize_rb_stream_to_object_store(data_rx, serializer, writer).await
         });
     }
     let mut finished_writers = Vec::new();
@@ -241,7 +230,6 @@ pub(crate) async fn stateless_multipart_put(
 
     let single_file_output = config.single_file_output;
     let base_output_path = &config.table_paths[0];
-    let unbounded_input = config.unbounded_input;
     let part_cols = if !config.table_partition_cols.is_empty() {
         Some(config.table_partition_cols.clone())
     } else {
@@ -266,8 +254,7 @@ pub(crate) async fn stateless_multipart_put(
     let (tx_file_bundle, rx_file_bundle) = tokio::sync::mpsc::channel(rb_buffer_size / 2);
     let (tx_row_cnt, rx_row_cnt) = tokio::sync::oneshot::channel();
     let write_coordinater_task = tokio::spawn(async move {
-        stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, unbounded_input)
-            .await
+        stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt).await
     });
     while let Some((location, rb_stream)) = file_stream_rx.recv().await {
         let serializer = get_serializer();
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 4c13d9d443..21d43dcd56 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -38,7 +38,7 @@ use crate::datasource::{
     },
     get_statistics_with_limit,
     listing::ListingTableUrl,
-    physical_plan::{is_plan_streaming, FileScanConfig, FileSinkConfig},
+    physical_plan::{FileScanConfig, FileSinkConfig},
     TableProvider, TableType,
 };
 use crate::{
@@ -790,13 +790,6 @@ impl TableProvider for ListingTable {
             file_groups,
             output_schema: self.schema(),
             table_partition_cols: self.options.table_partition_cols.clone(),
-            // A plan can produce finite number of rows even if it has unbounded sources, like LIMIT
-            // queries. Thus, we can check if the plan is streaming to ensure file sink input is
-            // unbounded. When `unbounded_input` flag is `true` for sink, we occasionally call `yield_now`
-            // to consume data at the input. When `unbounded_input` flag is `false` (e.g non-streaming data),
-            // all of the data at the input is sink after execution finishes. See discussion for rationale:
-            // https://github.com/apache/arrow-datafusion/pull/7610#issuecomment-1728979918
-            unbounded_input: is_plan_streaming(&input)?,
             single_file_output: self.options.single_file,
             overwrite,
             file_type_writer_options,
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs
index 9d1c373aee..4a6ebeab09 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -69,7 +69,6 @@ use arrow::{
 use datafusion_common::{file_options::FileTypeWriterOptions, plan_err};
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::PhysicalSortExpr;
-use datafusion_physical_plan::ExecutionPlan;
 
 use log::debug;
 use object_store::path::Path;
@@ -93,8 +92,6 @@ pub struct FileSinkConfig {
     /// regardless of input partitioning. Otherwise, each table path is assumed to be a directory
     /// to which each output partition is written to its own output file.
     pub single_file_output: bool,
-    /// If input is unbounded, tokio tasks need to yield to not block execution forever
-    pub unbounded_input: bool,
     /// Controls whether existing data should be overwritten by this sink
     pub overwrite: bool,
     /// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size
@@ -510,21 +507,6 @@ fn get_projected_output_ordering(
     all_orderings
 }
 
-// Get output (un)boundedness information for the given `plan`.
-pub(crate) fn is_plan_streaming(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
-    let result = if plan.children().is_empty() {
-        plan.unbounded_output(&[])
-    } else {
-        let children_unbounded_output = plan
-            .children()
-            .iter()
-            .map(is_plan_streaming)
-            .collect::<Result<Vec<_>>>();
-        plan.unbounded_output(&children_unbounded_output?)
-    };
-    result
-}
-
 #[cfg(test)]
 mod tests {
     use arrow_array::cast::AsArray;
diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs
index e5816eb49e..31d50be10f 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -593,7 +593,6 @@ impl DefaultPhysicalPlanner {
                         file_groups: vec![],
                         output_schema: Arc::new(schema),
                         table_partition_cols: vec![],
-                        unbounded_input: false,
                         single_file_output: *single_file_output,
                         overwrite: false,
                         file_type_writer_options
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 76fe449d2f..cc802ee957 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1201,9 +1201,8 @@ message FileSinkConfig {
   Schema output_schema = 4;
   repeated PartitionColumn table_partition_cols = 5;
   bool single_file_output = 7;
-  bool unbounded_input = 8;
-  bool overwrite = 9;
-  FileTypeWriterOptions file_type_writer_options = 10;
+  bool overwrite = 8;
+  FileTypeWriterOptions file_type_writer_options = 9;
 }
 
 message JsonSink {
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 0671757ad4..fb3a3ad91d 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -7500,9 +7500,6 @@ impl serde::Serialize for FileSinkConfig {
         if self.single_file_output {
             len += 1;
         }
-        if self.unbounded_input {
-            len += 1;
-        }
         if self.overwrite {
             len += 1;
         }
@@ -7528,9 +7525,6 @@ impl serde::Serialize for FileSinkConfig {
         if self.single_file_output {
             struct_ser.serialize_field("singleFileOutput", &self.single_file_output)?;
         }
-        if self.unbounded_input {
-            struct_ser.serialize_field("unboundedInput", &self.unbounded_input)?;
-        }
         if self.overwrite {
             struct_ser.serialize_field("overwrite", &self.overwrite)?;
         }
@@ -7559,8 +7553,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
             "tablePartitionCols",
             "single_file_output",
             "singleFileOutput",
-            "unbounded_input",
-            "unboundedInput",
             "overwrite",
             "file_type_writer_options",
             "fileTypeWriterOptions",
@@ -7574,7 +7566,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
             OutputSchema,
             TablePartitionCols,
             SingleFileOutput,
-            UnboundedInput,
             Overwrite,
             FileTypeWriterOptions,
         }
@@ -7604,7 +7595,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                             "outputSchema" | "output_schema" => Ok(GeneratedField::OutputSchema),
                             "tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols),
                             "singleFileOutput" | "single_file_output" => Ok(GeneratedField::SingleFileOutput),
-                            "unboundedInput" | "unbounded_input" => Ok(GeneratedField::UnboundedInput),
                             "overwrite" => Ok(GeneratedField::Overwrite),
                             "fileTypeWriterOptions" | "file_type_writer_options" => Ok(GeneratedField::FileTypeWriterOptions),
                             _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
@@ -7632,7 +7622,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                 let mut output_schema__ = None;
                 let mut table_partition_cols__ = None;
                 let mut single_file_output__ = None;
-                let mut unbounded_input__ = None;
                 let mut overwrite__ = None;
                 let mut file_type_writer_options__ = None;
                 while let Some(k) = map_.next_key()? {
@@ -7673,12 +7662,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                             }
                             single_file_output__ = Some(map_.next_value()?);
                         }
-                        GeneratedField::UnboundedInput => {
-                            if unbounded_input__.is_some() {
-                                return Err(serde::de::Error::duplicate_field("unboundedInput"));
-                            }
-                            unbounded_input__ = Some(map_.next_value()?);
-                        }
                         GeneratedField::Overwrite => {
                             if overwrite__.is_some() {
                                 return Err(serde::de::Error::duplicate_field("overwrite"));
@@ -7700,7 +7683,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                     output_schema: output_schema__,
                     table_partition_cols: table_partition_cols__.unwrap_or_default(),
                     single_file_output: single_file_output__.unwrap_or_default(),
-                    unbounded_input: unbounded_input__.unwrap_or_default(),
                     overwrite: overwrite__.unwrap_or_default(),
                     file_type_writer_options: file_type_writer_options__,
                 })
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 771bd715d3..9030e90a24 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1635,10 +1635,8 @@ pub struct FileSinkConfig {
     #[prost(bool, tag = "7")]
     pub single_file_output: bool,
     #[prost(bool, tag = "8")]
-    pub unbounded_input: bool,
-    #[prost(bool, tag = "9")]
     pub overwrite: bool,
-    #[prost(message, optional, tag = "10")]
+    #[prost(message, optional, tag = "9")]
     pub file_type_writer_options: ::core::option::Option<FileTypeWriterOptions>,
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index 5c0ef615ca..65f9f139a8 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -739,7 +739,6 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
             output_schema: Arc::new(convert_required!(conf.output_schema)?),
             table_partition_cols,
             single_file_output: conf.single_file_output,
-            unbounded_input: conf.unbounded_input,
             overwrite: conf.overwrite,
             file_type_writer_options: convert_required!(conf.file_type_writer_options)?,
         })
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs
index ea00b726b9..e9cdb34cf1 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -846,7 +846,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
             output_schema: Some(conf.output_schema.as_ref().try_into()?),
             table_partition_cols,
             single_file_output: conf.single_file_output,
-            unbounded_input: conf.unbounded_input,
             overwrite: conf.overwrite,
             file_type_writer_options: Some(file_type_writer_options.try_into()?),
         })
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 9a9827f2a0..2eb04ab6cb 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -733,7 +733,6 @@ fn roundtrip_json_sink() -> Result<()> {
         output_schema: schema.clone(),
         table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
         single_file_output: true,
-        unbounded_input: false,
         overwrite: true,
         file_type_writer_options: FileTypeWriterOptions::JSON(JsonWriterOptions::new(
             CompressionTypeVariant::UNCOMPRESSED,