You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "devinjdangelo (via GitHub)" <gi...@apache.org> on 2023/10/11 23:23:48 UTC

[PR] Implement Hive-Style Partitioned Write Support [arrow-datafusion]

devinjdangelo opened a new pull request, #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801

   Draft as builds on #7791 
   
   ## Which issue does this PR close?
   
   Closes #7744 
   
   ## Rationale for this change
   
   We support reads from hive-style partitioned tables, so it makes sense also to support insert into for these tables.
   
   ## What changes are included in this PR?
   
   Implements a new run time demux strategy which creates a stream for each unique tuple of partition values. 
   
   ## Are these changes tested?
   
   Yes, new sqllogic tests added.
   
   Need some more fine grained tests to ensure partitioning works as expected.
   
   ## Are there any user-facing changes?
   
   Inserts to partitioned tables works


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement Hive-Style Partitioned Write Support [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement Hive-Style Partitioned Write Support [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801#issuecomment-1773160664

   Thanks again @devinjdangelo  -- truly epic work


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement Hive-Style Partitioned Write Support [arrow-datafusion]

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on code in PR #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801#discussion_r1355878988


##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -390,29 +396,239 @@ async fn serialize_rb_stream_to_object_store(
             ))
         }
     };
-    Ok((serializer, writer, row_count as u64))
+    Ok((writer, row_count as u64))
 }
 
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = Receiver<(Path, RecordBatchReceiver)>;
+
+/// Dynamically partitions input stream to acheive desired maximum rows per file
+async fn row_count_demuxer(
+    tx: Sender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> Result<()> {
+    let exec_options = &context.session_config().options().execution;
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file;
+
+    let mut total_rows_current_file = 0;
+    let mut part_idx = 0;
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+    let file_path = if !single_file_output {
+        base_output_path
+            .prefix()
+            .child(format!("{}_{}.{}", write_id, part_idx, file_extension))
+    } else {
+        base_output_path.prefix().to_owned()
+    };
+
+    let (mut tx_file, mut rx_file) =
+        tokio::sync::mpsc::channel(max_buffered_recordbatches / 2);
+    tx.send((file_path, rx_file)).await.map_err(|_| {
+        DataFusionError::Execution("Error sending new file stream!".into())
+    })?;
+    part_idx += 1;
+    while let Some(rb) = input.next().await.transpose()? {
+        total_rows_current_file += rb.num_rows();
+        tx_file.send(rb).await.map_err(|_| {
+            DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+        })?;
+
+        // Once we have sent enough data to previous file stream, spawn a new one
+        if total_rows_current_file >= max_rows_per_file && !single_file_output {
+            total_rows_current_file = 0;
+
+            let file_path = base_output_path
+                .prefix()
+                .child(format!("{}_{}.{}", write_id, part_idx, file_extension));
+            (tx_file, rx_file) = tokio::sync::mpsc::channel(max_buffered_recordbatches);
+            tx.send((file_path, rx_file)).await.map_err(|_| {
+                DataFusionError::Execution("Error sending new file stream!".into())
+            })?;
+
+            part_idx += 1;
+        }
+    }
+    Ok(())
+}
+
+/// Splits an input stream based on the distinct values of a set of columns
+/// Assumes standard hive style partition paths such as
+/// /col1=val1/col2=val2/outputfile.parquet
+async fn hive_style_partitions_demuxer(

Review Comment:
   This is the key new code in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement Hive-Style Partitioned Write Support [arrow-datafusion]

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on code in PR #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801#discussion_r1357091956


##########
datafusion/sqllogictest/test_files/insert_to_external.slt:
##########
@@ -87,6 +87,126 @@ SELECT * from ordered_insert_test;
 7 8
 7 7
 
+# test partitioned insert
+
+statement ok
+CREATE EXTERNAL TABLE
+partitioned_insert_test(a string, b string, c bigint)
+STORED AS csv
+LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/'
+PARTITIONED BY (a, b) 
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+#note that partitioned cols are moved to the end so value tuples are (c, a, b)
+query ITT
+INSERT INTO partitioned_insert_test values (1, 1, 1), (1, 1, 2), (1, 2, 1), (1, 2, 2), (2, 2, 1), (2, 2, 2);
+----
+6
+
+query ITT
+select * from partitioned_insert_test order by a,b,c
+----
+1 1 1
+1 1 2
+1 2 1
+2 2 1
+1 2 2
+2 2 2
+
+statement ok
+CREATE EXTERNAL TABLE
+partitioned_insert_test_verify(c bigint)
+STORED AS csv
+LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/a=2/b=1/'
+OPTIONS(
+insert_mode 'append_new_files',
+);
+
+query I
+select * from partitioned_insert_test_verify;
+----
+1
+2
+
+statement ok
+CREATE EXTERNAL TABLE
+partitioned_insert_test_json(a string, b string)
+STORED AS json
+LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned_json/'
+PARTITIONED BY (a) 
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+query TT
+INSERT INTO partitioned_insert_test_json values (1, 2), (3, 4), (5, 6), (1, 2), (3, 4), (5, 6);
+----
+6
+
+query error DataFusion error: Arrow error: Json error: Encountered unmasked nulls in non\-nullable StructArray child: Field \{ name: "a", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}

Review Comment:
   The data written to the scratch directory looks correct to me, and the other file types are working. So, I suspect this is a bug related to support for reading partitioned JSONL tables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement Hive-Style Partitioned Write Support [arrow-datafusion]

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on code in PR #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801#discussion_r1366276360


##########
datafusion/core/src/datasource/file_format/write/demux.rs:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
+//! dividing input stream into multiple output files at execution time
+
+use std::collections::HashMap;
+
+use std::sync::Arc;
+
+use crate::datasource::listing::ListingTableUrl;
+
+use crate::error::Result;
+use crate::physical_plan::SendableRecordBatchStream;
+
+use arrow_array::builder::UInt64Builder;
+use arrow_array::cast::AsArray;
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::{DataType, Schema};
+use datafusion_common::cast::as_string_array;
+use datafusion_common::DataFusionError;
+
+use datafusion_execution::TaskContext;
+
+use futures::StreamExt;
+use object_store::path::Path;
+
+use rand::distributions::DistString;
+
+use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio::task::JoinHandle;
+
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the unique
+/// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>``
+///                                                                              ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     ┌──────▶ │  batch 1  ├────▶...──────▶│   Batch a  │    │ Output File1│
+///                                                                     │        └───────────┘               └────────────┘    └─────────────┘
+///                                                                     │
+///                                                 ┌──────────┐        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+/// ┌───────────┐               ┌────────────┐      │          │        ├──────▶ │  batch a+1├────▶...──────▶│   Batch b  │    │ Output File2│
+/// │  batch 1  ├────▶...──────▶│   Batch N  ├─────▶│  Demux   ├────────┤ ...    └───────────┘               └────────────┘    └─────────────┘
+/// └───────────┘               └────────────┘      │          │        │
+///                                                 └──────────┘        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     └──────▶ │  batch d  ├────▶...──────▶│   Batch n  │    │ Output FileN│
+///                                                                              └───────────┘               └────────────┘    └─────────────┘
+pub(crate) fn start_demuxer_task(
+    input: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    partition_by: Option<Vec<(String, DataType)>>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
+    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+    let context = context.clone();
+    let task: JoinHandle<std::result::Result<(), DataFusionError>> = match partition_by {
+        Some(parts) => {
+            // There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
+            // bound this channel without risking a deadlock.
+            tokio::spawn(async move {
+                hive_style_partitions_demuxer(
+                    tx,
+                    input,
+                    context,
+                    parts,
+                    base_output_path,
+                    file_extension,
+                )
+                .await
+            })
+        }
+        None => tokio::spawn(async move {
+            row_count_demuxer(
+                tx,
+                input,
+                context,
+                base_output_path,
+                file_extension,
+                single_file_output,
+            )
+            .await
+        }),
+    };
+
+    (task, rx)
+}
+
+/// Dynamically partitions input stream to acheive desired maximum rows per file
+async fn row_count_demuxer(
+    mut tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> Result<()> {
+    let exec_options = &context.session_config().options().execution;
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
+    let mut total_rows_current_file = 0;
+    let mut part_idx = 0;
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let mut tx_file = create_new_file_stream(
+        &base_output_path,
+        &write_id,
+        part_idx,
+        &file_extension,
+        single_file_output,
+        max_buffered_batches,
+        &mut tx,
+    )?;
+    part_idx += 1;
+
+    while let Some(rb) = input.next().await.transpose()? {
+        total_rows_current_file += rb.num_rows();
+        tx_file.send(rb).await.map_err(|_| {
+            DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+        })?;
+
+        if total_rows_current_file >= max_rows_per_file && !single_file_output {
+            total_rows_current_file = 0;
+            tx_file = create_new_file_stream(
+                &base_output_path,
+                &write_id,
+                part_idx,
+                &file_extension,
+                single_file_output,
+                max_buffered_batches,
+                &mut tx,
+            )?;
+            part_idx += 1;
+        }
+    }
+    Ok(())
+}
+
+/// Helper for row count demuxer
+fn generate_file_path(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+) -> Path {
+    if !single_file_output {
+        base_output_path
+            .prefix()
+            .child(format!("{}_{}.{}", write_id, part_idx, file_extension))
+    } else {
+        base_output_path.prefix().to_owned()
+    }
+}
+
+/// Helper for row count demuxer
+fn create_new_file_stream(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+    max_buffered_batches: usize,
+    tx: &mut UnboundedSender<(Path, Receiver<RecordBatch>)>,
+) -> Result<Sender<RecordBatch>> {
+    let file_path = generate_file_path(
+        base_output_path,
+        write_id,
+        part_idx,
+        file_extension,
+        single_file_output,
+    );
+    let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2);
+    tx.send((file_path, rx_file)).map_err(|_| {
+        DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+    })?;
+    Ok(tx_file)
+}
+
+/// Splits an input stream based on the distinct values of a set of columns
+/// Assumes standard hive style partition paths such as
+/// /col1=val1/col2=val2/outputfile.parquet
+async fn hive_style_partitions_demuxer(
+    tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    partition_by: Vec<(String, DataType)>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+) -> Result<()> {
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let exec_options = &context.session_config().options().execution;
+    let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file;
+
+    // To support non string partition col types, cast the type to &str first
+    let mut value_map: HashMap<Vec<String>, Sender<RecordBatch>> = HashMap::new();
+
+    while let Some(rb) = input.next().await.transpose()? {
+        // First compute partition key for each row of batch, e.g. (col1=val1, col2=val2, ...)
+        let all_partition_values = compute_partition_keys_by_row(&rb, &partition_by)?;
+
+        // Next compute how the batch should be split up to take each distinct key to its own batch
+        let take_map = compute_take_arrays(&rb, all_partition_values);
+
+        // Divide up the batch into distinct partition key batches and send each batch
+        for (part_key, mut builder) in take_map.into_iter() {
+            // Take method adapted from https://github.com/lancedb/lance/pull/1337/files
+            // TODO: upstream RecordBatch::take to arrow-rs
+            let take_indices = builder.finish();
+            let struct_array: StructArray = rb.clone().into();
+            let parted_batch = RecordBatch::try_from(
+                arrow::compute::take(&struct_array, &take_indices, None)?.as_struct(),
+            )
+            .map_err(|_| {
+                DataFusionError::Internal("Unexpected error partitioning batch!".into())
+            })?;
+
+            // Get or create channel for this batch
+            let part_tx = match value_map.get_mut(&part_key) {
+                Some(part_tx) => part_tx,
+                None => {
+                    // Create channel for previously unseen distinct partition key and notify consumer of new file
+                    let (part_tx, part_rx) = tokio::sync::mpsc::channel::<RecordBatch>(
+                        max_buffered_recordbatches,
+                    );
+                    let file_path = compute_hive_style_file_path(
+                        &part_key,
+                        &partition_by,
+                        &write_id,
+                        &file_extension,
+                        &base_output_path,
+                    );
+
+                    tx.send((file_path, part_rx)).map_err(|_| {
+                        DataFusionError::Execution(
+                            "Error sending new file stream!".into(),
+                        )
+                    })?;
+
+                    value_map.insert(part_key.clone(), part_tx);
+                    value_map
+                        .get_mut(&part_key)
+                        .ok_or(DataFusionError::Internal(
+                            "Key must exist since it was just inserted!".into(),
+                        ))?
+                }
+            };
+
+            // remove partitions columns
+            let final_batch_to_send =
+                remove_partition_by_columns(&parted_batch, &partition_by)?;
+
+            // Finally send the partial batch partitioned by distinct value!
+            part_tx.send(final_batch_to_send).await.map_err(|_| {
+                DataFusionError::Internal("Unexpected error sending parted batch!".into())
+            })?;
+        }
+    }
+
+    Ok(())
+}
+
+fn compute_partition_keys_by_row<'a>(
+    rb: &'a RecordBatch,
+    partition_by: &'a [(String, DataType)],
+) -> Result<Vec<Vec<&'a str>>> {
+    let mut all_partition_values = vec![];
+
+    for (col, dtype) in partition_by.iter() {
+        let mut partition_values = vec![];
+        let col_array =
+            rb.column_by_name(col)
+                .ok_or(DataFusionError::Execution(format!(
+                    "PartitionBy Column {} does not exist in source data!",
+                    col
+                )))?;
+
+        match dtype {
+            DataType::Utf8 => {
+                let array = as_string_array(col_array)?;
+                for i in 0..rb.num_rows() {
+                    partition_values.push(array.value(i));
+                }
+            }
+            _ => {
+                return Err(DataFusionError::NotImplemented(format!(
+                "it is not yet supported to write to hive partitions with datatype {}",
+                dtype
+            )))
+            }
+        }
+
+        all_partition_values.push(partition_values);
+    }
+
+    Ok(all_partition_values)
+}
+
+fn compute_take_arrays(
+    rb: &RecordBatch,
+    all_partition_values: Vec<Vec<&str>>,
+) -> HashMap<Vec<String>, UInt64Builder> {
+    let mut take_map = HashMap::new();
+    for i in 0..rb.num_rows() {
+        let mut part_key = vec![];
+        for vals in all_partition_values.iter() {
+            part_key.push(vals[i].to_owned());

Review Comment:
   The reason is that the partition keys are ultimately pushed into `value_map` latter and we need owned data there since that map outlives each `RecordBatch`, and the &str from this function is borrowed from the current `RecordBatch`.
   
   We could call to_owned later, but then we would need a loop to do something like:
   ```rust
   value_map.insert(part_key.map(|s| s.to_owned()).collect::<Vec<String>>(), part_tx);
   ```
   
   I decided it was cleaner to obtain owned data earlier to avoid the map/collect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement Hive-Style Partitioned Write Support [arrow-datafusion]

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on code in PR #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801#discussion_r1357473424


##########
datafusion/sqllogictest/test_files/insert_to_external.slt:
##########
@@ -87,6 +87,126 @@ SELECT * from ordered_insert_test;
 7 8
 7 7
 
+# test partitioned insert
+
+statement ok
+CREATE EXTERNAL TABLE
+partitioned_insert_test(a string, b string, c bigint)
+STORED AS csv
+LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/'
+PARTITIONED BY (a, b) 
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+#note that partitioned cols are moved to the end so value tuples are (c, a, b)
+query ITT
+INSERT INTO partitioned_insert_test values (1, 1, 1), (1, 1, 2), (1, 2, 1), (1, 2, 2), (2, 2, 1), (2, 2, 2);
+----
+6
+
+query ITT
+select * from partitioned_insert_test order by a,b,c
+----
+1 1 1
+1 1 2
+1 2 1
+2 2 1
+1 2 2
+2 2 2
+
+statement ok
+CREATE EXTERNAL TABLE
+partitioned_insert_test_verify(c bigint)
+STORED AS csv
+LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/a=2/b=1/'
+OPTIONS(
+insert_mode 'append_new_files',
+);
+
+query I
+select * from partitioned_insert_test_verify;
+----
+1
+2
+
+statement ok
+CREATE EXTERNAL TABLE
+partitioned_insert_test_json(a string, b string)
+STORED AS json
+LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned_json/'
+PARTITIONED BY (a) 
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+query TT
+INSERT INTO partitioned_insert_test_json values (1, 2), (3, 4), (5, 6), (1, 2), (3, 4), (5, 6);
+----
+6
+
+query error DataFusion error: Arrow error: Json error: Encountered unmasked nulls in non\-nullable StructArray child: Field \{ name: "a", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}

Review Comment:
   Issue filed #7816 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement Hive-Style Partitioned Write Support [arrow-datafusion]

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on code in PR #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801#discussion_r1366291260


##########
datafusion/core/src/datasource/file_format/write/demux.rs:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
+//! dividing input stream into multiple output files at execution time
+
+use std::collections::HashMap;
+
+use std::sync::Arc;
+
+use crate::datasource::listing::ListingTableUrl;
+
+use crate::error::Result;
+use crate::physical_plan::SendableRecordBatchStream;
+
+use arrow_array::builder::UInt64Builder;
+use arrow_array::cast::AsArray;
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::{DataType, Schema};
+use datafusion_common::cast::as_string_array;
+use datafusion_common::DataFusionError;
+
+use datafusion_execution::TaskContext;
+
+use futures::StreamExt;
+use object_store::path::Path;
+
+use rand::distributions::DistString;
+
+use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio::task::JoinHandle;
+
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the unique
+/// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>``
+///                                                                              ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     ┌──────▶ │  batch 1  ├────▶...──────▶│   Batch a  │    │ Output File1│
+///                                                                     │        └───────────┘               └────────────┘    └─────────────┘
+///                                                                     │
+///                                                 ┌──────────┐        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+/// ┌───────────┐               ┌────────────┐      │          │        ├──────▶ │  batch a+1├────▶...──────▶│   Batch b  │    │ Output File2│
+/// │  batch 1  ├────▶...──────▶│   Batch N  ├─────▶│  Demux   ├────────┤ ...    └───────────┘               └────────────┘    └─────────────┘
+/// └───────────┘               └────────────┘      │          │        │
+///                                                 └──────────┘        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     └──────▶ │  batch d  ├────▶...──────▶│   Batch n  │    │ Output FileN│
+///                                                                              └───────────┘               └────────────┘    └─────────────┘
+pub(crate) fn start_demuxer_task(
+    input: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    partition_by: Option<Vec<(String, DataType)>>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
+    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+    let context = context.clone();
+    let task: JoinHandle<std::result::Result<(), DataFusionError>> = match partition_by {
+        Some(parts) => {
+            // There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
+            // bound this channel without risking a deadlock.
+            tokio::spawn(async move {
+                hive_style_partitions_demuxer(
+                    tx,
+                    input,
+                    context,
+                    parts,
+                    base_output_path,
+                    file_extension,
+                )
+                .await
+            })
+        }
+        None => tokio::spawn(async move {
+            row_count_demuxer(
+                tx,
+                input,
+                context,
+                base_output_path,
+                file_extension,
+                single_file_output,
+            )
+            .await
+        }),
+    };
+
+    (task, rx)
+}
+
+/// Dynamically partitions input stream to acheive desired maximum rows per file
+async fn row_count_demuxer(
+    mut tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> Result<()> {
+    let exec_options = &context.session_config().options().execution;
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
+    let mut total_rows_current_file = 0;
+    let mut part_idx = 0;
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let mut tx_file = create_new_file_stream(
+        &base_output_path,
+        &write_id,
+        part_idx,
+        &file_extension,
+        single_file_output,
+        max_buffered_batches,
+        &mut tx,
+    )?;
+    part_idx += 1;
+
+    while let Some(rb) = input.next().await.transpose()? {
+        total_rows_current_file += rb.num_rows();
+        tx_file.send(rb).await.map_err(|_| {
+            DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+        })?;
+
+        if total_rows_current_file >= max_rows_per_file && !single_file_output {
+            total_rows_current_file = 0;
+            tx_file = create_new_file_stream(
+                &base_output_path,
+                &write_id,
+                part_idx,
+                &file_extension,
+                single_file_output,
+                max_buffered_batches,
+                &mut tx,
+            )?;
+            part_idx += 1;
+        }
+    }
+    Ok(())
+}
+
+/// Helper for row count demuxer
+fn generate_file_path(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+) -> Path {
+    if !single_file_output {
+        base_output_path
+            .prefix()
+            .child(format!("{}_{}.{}", write_id, part_idx, file_extension))
+    } else {
+        base_output_path.prefix().to_owned()
+    }
+}
+
+/// Helper for row count demuxer
+fn create_new_file_stream(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+    max_buffered_batches: usize,
+    tx: &mut UnboundedSender<(Path, Receiver<RecordBatch>)>,
+) -> Result<Sender<RecordBatch>> {
+    let file_path = generate_file_path(
+        base_output_path,
+        write_id,
+        part_idx,
+        file_extension,
+        single_file_output,
+    );
+    let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2);
+    tx.send((file_path, rx_file)).map_err(|_| {
+        DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+    })?;
+    Ok(tx_file)
+}
+
+/// Splits an input stream based on the distinct values of a set of columns
+/// Assumes standard hive style partition paths such as
+/// /col1=val1/col2=val2/outputfile.parquet
+async fn hive_style_partitions_demuxer(
+    tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    partition_by: Vec<(String, DataType)>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+) -> Result<()> {
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let exec_options = &context.session_config().options().execution;
+    let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file;
+
+    // To support non string partition col types, cast the type to &str first
+    let mut value_map: HashMap<Vec<String>, Sender<RecordBatch>> = HashMap::new();
+
+    while let Some(rb) = input.next().await.transpose()? {
+        // First compute partition key for each row of batch, e.g. (col1=val1, col2=val2, ...)
+        let all_partition_values = compute_partition_keys_by_row(&rb, &partition_by)?;
+
+        // Next compute how the batch should be split up to take each distinct key to its own batch
+        let take_map = compute_take_arrays(&rb, all_partition_values);
+
+        // Divide up the batch into distinct partition key batches and send each batch
+        for (part_key, mut builder) in take_map.into_iter() {
+            // Take method adapted from https://github.com/lancedb/lance/pull/1337/files
+            // TODO: upstream RecordBatch::take to arrow-rs
+            let take_indices = builder.finish();
+            let struct_array: StructArray = rb.clone().into();
+            let parted_batch = RecordBatch::try_from(
+                arrow::compute::take(&struct_array, &take_indices, None)?.as_struct(),
+            )
+            .map_err(|_| {
+                DataFusionError::Internal("Unexpected error partitioning batch!".into())
+            })?;
+
+            // Get or create channel for this batch
+            let part_tx = match value_map.get_mut(&part_key) {
+                Some(part_tx) => part_tx,
+                None => {
+                    // Create channel for previously unseen distinct partition key and notify consumer of new file
+                    let (part_tx, part_rx) = tokio::sync::mpsc::channel::<RecordBatch>(
+                        max_buffered_recordbatches,
+                    );
+                    let file_path = compute_hive_style_file_path(
+                        &part_key,
+                        &partition_by,
+                        &write_id,
+                        &file_extension,
+                        &base_output_path,
+                    );
+
+                    tx.send((file_path, part_rx)).map_err(|_| {
+                        DataFusionError::Execution(
+                            "Error sending new file stream!".into(),
+                        )
+                    })?;
+
+                    value_map.insert(part_key.clone(), part_tx);
+                    value_map
+                        .get_mut(&part_key)
+                        .ok_or(DataFusionError::Internal(
+                            "Key must exist since it was just inserted!".into(),
+                        ))?
+                }
+            };
+
+            // remove partitions columns
+            let final_batch_to_send =
+                remove_partition_by_columns(&parted_batch, &partition_by)?;
+
+            // Finally send the partial batch partitioned by distinct value!
+            part_tx.send(final_batch_to_send).await.map_err(|_| {
+                DataFusionError::Internal("Unexpected error sending parted batch!".into())
+            })?;
+        }
+    }
+
+    Ok(())
+}
+
+fn compute_partition_keys_by_row<'a>(
+    rb: &'a RecordBatch,
+    partition_by: &'a [(String, DataType)],
+) -> Result<Vec<Vec<&'a str>>> {
+    let mut all_partition_values = vec![];
+
+    for (col, dtype) in partition_by.iter() {
+        let mut partition_values = vec![];
+        let col_array =
+            rb.column_by_name(col)
+                .ok_or(DataFusionError::Execution(format!(
+                    "PartitionBy Column {} does not exist in source data!",
+                    col
+                )))?;
+
+        match dtype {
+            DataType::Utf8 => {
+                let array = as_string_array(col_array)?;
+                for i in 0..rb.num_rows() {
+                    partition_values.push(array.value(i));
+                }
+            }
+            _ => {
+                return Err(DataFusionError::NotImplemented(format!(
+                "it is not yet supported to write to hive partitions with datatype {}",
+                dtype
+            )))
+            }
+        }
+
+        all_partition_values.push(partition_values);
+    }
+
+    Ok(all_partition_values)
+}
+
+fn compute_take_arrays(
+    rb: &RecordBatch,
+    all_partition_values: Vec<Vec<&str>>,
+) -> HashMap<Vec<String>, UInt64Builder> {
+    let mut take_map = HashMap::new();
+    for i in 0..rb.num_rows() {
+        let mut part_key = vec![];
+        for vals in all_partition_values.iter() {
+            part_key.push(vals[i].to_owned());
+        }
+        let builder = take_map.entry(part_key).or_insert(UInt64Builder::new());
+        builder.append_value(i as u64);
+    }
+    take_map
+}
+
+fn remove_partition_by_columns(
+    parted_batch: &RecordBatch,
+    partition_by: &Vec<(String, DataType)>,
+) -> Result<RecordBatch> {
+    let end_idx = parted_batch.num_columns() - partition_by.len();

Review Comment:
   I refactored this function and agreed it is much cleaner now :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement Hive-Style Partitioned Write Support [arrow-datafusion]

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on code in PR #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801#discussion_r1366276360


##########
datafusion/core/src/datasource/file_format/write/demux.rs:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
+//! dividing input stream into multiple output files at execution time
+
+use std::collections::HashMap;
+
+use std::sync::Arc;
+
+use crate::datasource::listing::ListingTableUrl;
+
+use crate::error::Result;
+use crate::physical_plan::SendableRecordBatchStream;
+
+use arrow_array::builder::UInt64Builder;
+use arrow_array::cast::AsArray;
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::{DataType, Schema};
+use datafusion_common::cast::as_string_array;
+use datafusion_common::DataFusionError;
+
+use datafusion_execution::TaskContext;
+
+use futures::StreamExt;
+use object_store::path::Path;
+
+use rand::distributions::DistString;
+
+use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio::task::JoinHandle;
+
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the unique
+/// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>``
+///                                                                              ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     ┌──────▶ │  batch 1  ├────▶...──────▶│   Batch a  │    │ Output File1│
+///                                                                     │        └───────────┘               └────────────┘    └─────────────┘
+///                                                                     │
+///                                                 ┌──────────┐        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+/// ┌───────────┐               ┌────────────┐      │          │        ├──────▶ │  batch a+1├────▶...──────▶│   Batch b  │    │ Output File2│
+/// │  batch 1  ├────▶...──────▶│   Batch N  ├─────▶│  Demux   ├────────┤ ...    └───────────┘               └────────────┘    └─────────────┘
+/// └───────────┘               └────────────┘      │          │        │
+///                                                 └──────────┘        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     └──────▶ │  batch d  ├────▶...──────▶│   Batch n  │    │ Output FileN│
+///                                                                              └───────────┘               └────────────┘    └─────────────┘
+pub(crate) fn start_demuxer_task(
+    input: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    partition_by: Option<Vec<(String, DataType)>>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
+    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+    let context = context.clone();
+    let task: JoinHandle<std::result::Result<(), DataFusionError>> = match partition_by {
+        Some(parts) => {
+            // There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
+            // bound this channel without risking a deadlock.
+            tokio::spawn(async move {
+                hive_style_partitions_demuxer(
+                    tx,
+                    input,
+                    context,
+                    parts,
+                    base_output_path,
+                    file_extension,
+                )
+                .await
+            })
+        }
+        None => tokio::spawn(async move {
+            row_count_demuxer(
+                tx,
+                input,
+                context,
+                base_output_path,
+                file_extension,
+                single_file_output,
+            )
+            .await
+        }),
+    };
+
+    (task, rx)
+}
+
+/// Dynamically partitions input stream to acheive desired maximum rows per file
+async fn row_count_demuxer(
+    mut tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> Result<()> {
+    let exec_options = &context.session_config().options().execution;
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
+    let mut total_rows_current_file = 0;
+    let mut part_idx = 0;
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let mut tx_file = create_new_file_stream(
+        &base_output_path,
+        &write_id,
+        part_idx,
+        &file_extension,
+        single_file_output,
+        max_buffered_batches,
+        &mut tx,
+    )?;
+    part_idx += 1;
+
+    while let Some(rb) = input.next().await.transpose()? {
+        total_rows_current_file += rb.num_rows();
+        tx_file.send(rb).await.map_err(|_| {
+            DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+        })?;
+
+        if total_rows_current_file >= max_rows_per_file && !single_file_output {
+            total_rows_current_file = 0;
+            tx_file = create_new_file_stream(
+                &base_output_path,
+                &write_id,
+                part_idx,
+                &file_extension,
+                single_file_output,
+                max_buffered_batches,
+                &mut tx,
+            )?;
+            part_idx += 1;
+        }
+    }
+    Ok(())
+}
+
+/// Helper for row count demuxer
+fn generate_file_path(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+) -> Path {
+    if !single_file_output {
+        base_output_path
+            .prefix()
+            .child(format!("{}_{}.{}", write_id, part_idx, file_extension))
+    } else {
+        base_output_path.prefix().to_owned()
+    }
+}
+
+/// Helper for row count demuxer
+fn create_new_file_stream(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+    max_buffered_batches: usize,
+    tx: &mut UnboundedSender<(Path, Receiver<RecordBatch>)>,
+) -> Result<Sender<RecordBatch>> {
+    let file_path = generate_file_path(
+        base_output_path,
+        write_id,
+        part_idx,
+        file_extension,
+        single_file_output,
+    );
+    let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2);
+    tx.send((file_path, rx_file)).map_err(|_| {
+        DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+    })?;
+    Ok(tx_file)
+}
+
+/// Splits an input stream based on the distinct values of a set of columns
+/// Assumes standard hive style partition paths such as
+/// /col1=val1/col2=val2/outputfile.parquet
+async fn hive_style_partitions_demuxer(
+    tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    partition_by: Vec<(String, DataType)>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+) -> Result<()> {
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let exec_options = &context.session_config().options().execution;
+    let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file;
+
+    // To support non string partition col types, cast the type to &str first
+    let mut value_map: HashMap<Vec<String>, Sender<RecordBatch>> = HashMap::new();
+
+    while let Some(rb) = input.next().await.transpose()? {
+        // First compute partition key for each row of batch, e.g. (col1=val1, col2=val2, ...)
+        let all_partition_values = compute_partition_keys_by_row(&rb, &partition_by)?;
+
+        // Next compute how the batch should be split up to take each distinct key to its own batch
+        let take_map = compute_take_arrays(&rb, all_partition_values);
+
+        // Divide up the batch into distinct partition key batches and send each batch
+        for (part_key, mut builder) in take_map.into_iter() {
+            // Take method adapted from https://github.com/lancedb/lance/pull/1337/files
+            // TODO: upstream RecordBatch::take to arrow-rs
+            let take_indices = builder.finish();
+            let struct_array: StructArray = rb.clone().into();
+            let parted_batch = RecordBatch::try_from(
+                arrow::compute::take(&struct_array, &take_indices, None)?.as_struct(),
+            )
+            .map_err(|_| {
+                DataFusionError::Internal("Unexpected error partitioning batch!".into())
+            })?;
+
+            // Get or create channel for this batch
+            let part_tx = match value_map.get_mut(&part_key) {
+                Some(part_tx) => part_tx,
+                None => {
+                    // Create channel for previously unseen distinct partition key and notify consumer of new file
+                    let (part_tx, part_rx) = tokio::sync::mpsc::channel::<RecordBatch>(
+                        max_buffered_recordbatches,
+                    );
+                    let file_path = compute_hive_style_file_path(
+                        &part_key,
+                        &partition_by,
+                        &write_id,
+                        &file_extension,
+                        &base_output_path,
+                    );
+
+                    tx.send((file_path, part_rx)).map_err(|_| {
+                        DataFusionError::Execution(
+                            "Error sending new file stream!".into(),
+                        )
+                    })?;
+
+                    value_map.insert(part_key.clone(), part_tx);
+                    value_map
+                        .get_mut(&part_key)
+                        .ok_or(DataFusionError::Internal(
+                            "Key must exist since it was just inserted!".into(),
+                        ))?
+                }
+            };
+
+            // remove partitions columns
+            let final_batch_to_send =
+                remove_partition_by_columns(&parted_batch, &partition_by)?;
+
+            // Finally send the partial batch partitioned by distinct value!
+            part_tx.send(final_batch_to_send).await.map_err(|_| {
+                DataFusionError::Internal("Unexpected error sending parted batch!".into())
+            })?;
+        }
+    }
+
+    Ok(())
+}
+
+fn compute_partition_keys_by_row<'a>(
+    rb: &'a RecordBatch,
+    partition_by: &'a [(String, DataType)],
+) -> Result<Vec<Vec<&'a str>>> {
+    let mut all_partition_values = vec![];
+
+    for (col, dtype) in partition_by.iter() {
+        let mut partition_values = vec![];
+        let col_array =
+            rb.column_by_name(col)
+                .ok_or(DataFusionError::Execution(format!(
+                    "PartitionBy Column {} does not exist in source data!",
+                    col
+                )))?;
+
+        match dtype {
+            DataType::Utf8 => {
+                let array = as_string_array(col_array)?;
+                for i in 0..rb.num_rows() {
+                    partition_values.push(array.value(i));
+                }
+            }
+            _ => {
+                return Err(DataFusionError::NotImplemented(format!(
+                "it is not yet supported to write to hive partitions with datatype {}",
+                dtype
+            )))
+            }
+        }
+
+        all_partition_values.push(partition_values);
+    }
+
+    Ok(all_partition_values)
+}
+
+fn compute_take_arrays(
+    rb: &RecordBatch,
+    all_partition_values: Vec<Vec<&str>>,
+) -> HashMap<Vec<String>, UInt64Builder> {
+    let mut take_map = HashMap::new();
+    for i in 0..rb.num_rows() {
+        let mut part_key = vec![];
+        for vals in all_partition_values.iter() {
+            part_key.push(vals[i].to_owned());

Review Comment:
   The reason is that the partition keys are ultimately pushed into `value_map` later and we need owned data there since that map outlives each `RecordBatch`, and the &str from this function is borrowed from the current `RecordBatch`.
   
   We could call to_owned later, but then we would need a loop to do something like:
   ```rust
   value_map.insert(part_key.map(|s| s.to_owned()).collect::<Vec<String>>(), part_tx);
   ```
   
   I decided it was cleaner to obtain owned data earlier to avoid the map/collect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement Hive-Style Partitioned Write Support [arrow-datafusion]

Posted by "suremarc (via GitHub)" <gi...@apache.org>.
suremarc commented on PR #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801#issuecomment-1773144039

   @devinjdangelo I attempted to use this feature in `datafusion-cli` today, as it is useful for something I am doing. I got this error when writing to a partitioned table:
   
   ```
   This feature is not implemented: it is not yet supported to write to hive partitions with datatype Dictionary(UInt16, Utf8)
   ```
   
   Here is a repro using `datafusion-cli`:
   
   ```sql
   CREATE EXTERNAL TABLE lz4_raw_compressed_larger
   STORED AS PARQUET
   PARTITIONED BY (partition)
   LOCATION 'data/';
   
   INSERT INTO lz4_raw_compressed_larger VALUES ('non-partition-value', 'partition');
   ```
   
   Here's a [zip file](https://github.com/apache/arrow-datafusion/files/13057020/lz4_raw_compressed_larger.zip) with a single file in it, `data/partition=A/lz4_raw_compressed_larger.parquet`. 
   
   I noticed the unit tests specify the schema explicitly, but I am guessing if you have DataFusion infer the schema, the partition columns are encoded as dictionaries. I think this will limit the usefulness of this feature if partitioned writes don't work with tables whose schemas are inferred. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement Hive-Style Partitioned Write Support [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801#discussion_r1365531237


##########
datafusion/core/src/datasource/file_format/write/demux.rs:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
+//! dividing input stream into multiple output files at execution time
+
+use std::collections::HashMap;
+
+use std::sync::Arc;
+
+use crate::datasource::listing::ListingTableUrl;
+
+use crate::error::Result;
+use crate::physical_plan::SendableRecordBatchStream;
+
+use arrow_array::builder::UInt64Builder;
+use arrow_array::cast::AsArray;
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::{DataType, Schema};
+use datafusion_common::cast::as_string_array;
+use datafusion_common::DataFusionError;
+
+use datafusion_execution::TaskContext;
+
+use futures::StreamExt;
+use object_store::path::Path;
+
+use rand::distributions::DistString;
+
+use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio::task::JoinHandle;
+
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined

Review Comment:
   thank you for this description @devinjdangelo  -- it is very clear to read and extremely helpful for review (and long term understanding how this code works)



##########
datafusion/core/src/datasource/file_format/write/demux.rs:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
+//! dividing input stream into multiple output files at execution time
+
+use std::collections::HashMap;
+
+use std::sync::Arc;
+
+use crate::datasource::listing::ListingTableUrl;
+
+use crate::error::Result;
+use crate::physical_plan::SendableRecordBatchStream;
+
+use arrow_array::builder::UInt64Builder;
+use arrow_array::cast::AsArray;
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::{DataType, Schema};
+use datafusion_common::cast::as_string_array;
+use datafusion_common::DataFusionError;
+
+use datafusion_execution::TaskContext;
+
+use futures::StreamExt;
+use object_store::path::Path;
+
+use rand::distributions::DistString;
+
+use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio::task::JoinHandle;
+
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the unique
+/// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>``

Review Comment:
   I think the formatting got messed up here somehow:
   
   ```suggestion
   /// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>
   ///
   /// ```text
   ```



##########
datafusion/sqllogictest/test_files/insert_to_external.slt:
##########
@@ -87,6 +87,127 @@ SELECT * from ordered_insert_test;
 7 8
 7 7
 
+# test partitioned insert
+
+statement ok
+CREATE EXTERNAL TABLE
+partitioned_insert_test(a string, b string, c bigint)
+STORED AS csv
+LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/'
+PARTITIONED BY (a, b) 
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+#note that partitioned cols are moved to the end so value tuples are (c, a, b)

Review Comment:
   Thank you for the comment, I recommend updating the test to use values like `(1, 10, 100), (1, 20, 100), ...` so it is more clear which values belong to which columns



##########
datafusion/core/src/datasource/file_format/write/demux.rs:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
+//! dividing input stream into multiple output files at execution time
+
+use std::collections::HashMap;
+
+use std::sync::Arc;
+
+use crate::datasource::listing::ListingTableUrl;
+
+use crate::error::Result;
+use crate::physical_plan::SendableRecordBatchStream;
+
+use arrow_array::builder::UInt64Builder;
+use arrow_array::cast::AsArray;
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::{DataType, Schema};
+use datafusion_common::cast::as_string_array;
+use datafusion_common::DataFusionError;
+
+use datafusion_execution::TaskContext;
+
+use futures::StreamExt;
+use object_store::path::Path;
+
+use rand::distributions::DistString;
+
+use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio::task::JoinHandle;
+
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the unique
+/// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>``
+///                                                                              ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     ┌──────▶ │  batch 1  ├────▶...──────▶│   Batch a  │    │ Output File1│
+///                                                                     │        └───────────┘               └────────────┘    └─────────────┘
+///                                                                     │
+///                                                 ┌──────────┐        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+/// ┌───────────┐               ┌────────────┐      │          │        ├──────▶ │  batch a+1├────▶...──────▶│   Batch b  │    │ Output File2│
+/// │  batch 1  ├────▶...──────▶│   Batch N  ├─────▶│  Demux   ├────────┤ ...    └───────────┘               └────────────┘    └─────────────┘
+/// └───────────┘               └────────────┘      │          │        │
+///                                                 └──────────┘        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     └──────▶ │  batch d  ├────▶...──────▶│   Batch n  │    │ Output FileN│
+///                                                                              └───────────┘               └────────────┘    └─────────────┘
+pub(crate) fn start_demuxer_task(
+    input: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    partition_by: Option<Vec<(String, DataType)>>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
+    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+    let context = context.clone();
+    let task: JoinHandle<std::result::Result<(), DataFusionError>> = match partition_by {
+        Some(parts) => {
+            // There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
+            // bound this channel without risking a deadlock.
+            tokio::spawn(async move {
+                hive_style_partitions_demuxer(
+                    tx,
+                    input,
+                    context,
+                    parts,
+                    base_output_path,
+                    file_extension,
+                )
+                .await
+            })
+        }
+        None => tokio::spawn(async move {
+            row_count_demuxer(
+                tx,
+                input,
+                context,
+                base_output_path,
+                file_extension,
+                single_file_output,
+            )
+            .await
+        }),
+    };
+
+    (task, rx)
+}
+
+/// Dynamically partitions input stream to acheive desired maximum rows per file

Review Comment:
   ```suggestion
   /// Dynamically partitions input stream to achieve desired maximum rows per file
   ```



##########
datafusion/sqllogictest/test_files/insert_to_external.slt:
##########
@@ -87,6 +87,127 @@ SELECT * from ordered_insert_test;
 7 8
 7 7
 
+# test partitioned insert
+
+statement ok
+CREATE EXTERNAL TABLE
+partitioned_insert_test(a string, b string, c bigint)
+STORED AS csv
+LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/'
+PARTITIONED BY (a, b) 
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+#note that partitioned cols are moved to the end so value tuples are (c, a, b)
+query ITT
+INSERT INTO partitioned_insert_test values (1, 1, 1), (1, 1, 2), (1, 2, 1), (1, 2, 2), (2, 2, 1), (2, 2, 2);
+----
+6
+
+query ITT
+select * from partitioned_insert_test order by a,b,c
+----
+1 1 1
+1 1 2
+1 2 1
+2 2 1
+1 2 2
+2 2 2
+
+statement ok
+CREATE EXTERNAL TABLE
+partitioned_insert_test_verify(c bigint)
+STORED AS csv
+LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/a=2/b=1/'

Review Comment:
   that is very cool



##########
datafusion/core/src/datasource/file_format/write/demux.rs:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
+//! dividing input stream into multiple output files at execution time
+
+use std::collections::HashMap;
+
+use std::sync::Arc;
+
+use crate::datasource::listing::ListingTableUrl;
+
+use crate::error::Result;
+use crate::physical_plan::SendableRecordBatchStream;
+
+use arrow_array::builder::UInt64Builder;
+use arrow_array::cast::AsArray;
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::{DataType, Schema};
+use datafusion_common::cast::as_string_array;
+use datafusion_common::DataFusionError;
+
+use datafusion_execution::TaskContext;
+
+use futures::StreamExt;
+use object_store::path::Path;
+
+use rand::distributions::DistString;
+
+use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio::task::JoinHandle;
+
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the unique
+/// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>``
+///                                                                              ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     ┌──────▶ │  batch 1  ├────▶...──────▶│   Batch a  │    │ Output File1│
+///                                                                     │        └───────────┘               └────────────┘    └─────────────┘
+///                                                                     │
+///                                                 ┌──────────┐        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+/// ┌───────────┐               ┌────────────┐      │          │        ├──────▶ │  batch a+1├────▶...──────▶│   Batch b  │    │ Output File2│
+/// │  batch 1  ├────▶...──────▶│   Batch N  ├─────▶│  Demux   ├────────┤ ...    └───────────┘               └────────────┘    └─────────────┘
+/// └───────────┘               └────────────┘      │          │        │
+///                                                 └──────────┘        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     └──────▶ │  batch d  ├────▶...──────▶│   Batch n  │    │ Output FileN│
+///                                                                              └───────────┘               └────────────┘    └─────────────┘
+pub(crate) fn start_demuxer_task(
+    input: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    partition_by: Option<Vec<(String, DataType)>>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
+    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+    let context = context.clone();
+    let task: JoinHandle<std::result::Result<(), DataFusionError>> = match partition_by {
+        Some(parts) => {
+            // There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
+            // bound this channel without risking a deadlock.
+            tokio::spawn(async move {
+                hive_style_partitions_demuxer(
+                    tx,
+                    input,
+                    context,
+                    parts,
+                    base_output_path,
+                    file_extension,
+                )
+                .await
+            })
+        }
+        None => tokio::spawn(async move {
+            row_count_demuxer(
+                tx,
+                input,
+                context,
+                base_output_path,
+                file_extension,
+                single_file_output,
+            )
+            .await
+        }),
+    };
+
+    (task, rx)
+}
+
+/// Dynamically partitions input stream to acheive desired maximum rows per file
+async fn row_count_demuxer(
+    mut tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> Result<()> {
+    let exec_options = &context.session_config().options().execution;
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
+    let mut total_rows_current_file = 0;
+    let mut part_idx = 0;
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let mut tx_file = create_new_file_stream(
+        &base_output_path,
+        &write_id,
+        part_idx,
+        &file_extension,
+        single_file_output,
+        max_buffered_batches,
+        &mut tx,
+    )?;
+    part_idx += 1;
+
+    while let Some(rb) = input.next().await.transpose()? {
+        total_rows_current_file += rb.num_rows();
+        tx_file.send(rb).await.map_err(|_| {
+            DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+        })?;
+
+        if total_rows_current_file >= max_rows_per_file && !single_file_output {
+            total_rows_current_file = 0;
+            tx_file = create_new_file_stream(
+                &base_output_path,
+                &write_id,
+                part_idx,
+                &file_extension,
+                single_file_output,
+                max_buffered_batches,
+                &mut tx,
+            )?;
+            part_idx += 1;
+        }
+    }
+    Ok(())
+}
+
+/// Helper for row count demuxer
+fn generate_file_path(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+) -> Path {
+    if !single_file_output {
+        base_output_path
+            .prefix()
+            .child(format!("{}_{}.{}", write_id, part_idx, file_extension))
+    } else {
+        base_output_path.prefix().to_owned()
+    }
+}
+
+/// Helper for row count demuxer
+fn create_new_file_stream(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+    max_buffered_batches: usize,
+    tx: &mut UnboundedSender<(Path, Receiver<RecordBatch>)>,
+) -> Result<Sender<RecordBatch>> {
+    let file_path = generate_file_path(
+        base_output_path,
+        write_id,
+        part_idx,
+        file_extension,
+        single_file_output,
+    );
+    let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2);
+    tx.send((file_path, rx_file)).map_err(|_| {
+        DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+    })?;
+    Ok(tx_file)
+}
+
+/// Splits an input stream based on the distinct values of a set of columns
+/// Assumes standard hive style partition paths such as
+/// /col1=val1/col2=val2/outputfile.parquet
+async fn hive_style_partitions_demuxer(
+    tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    partition_by: Vec<(String, DataType)>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+) -> Result<()> {
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let exec_options = &context.session_config().options().execution;
+    let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file;
+
+    // To support non string partition col types, cast the type to &str first
+    let mut value_map: HashMap<Vec<String>, Sender<RecordBatch>> = HashMap::new();
+
+    while let Some(rb) = input.next().await.transpose()? {
+        // First compute partition key for each row of batch, e.g. (col1=val1, col2=val2, ...)
+        let all_partition_values = compute_partition_keys_by_row(&rb, &partition_by)?;
+
+        // Next compute how the batch should be split up to take each distinct key to its own batch
+        let take_map = compute_take_arrays(&rb, all_partition_values);
+
+        // Divide up the batch into distinct partition key batches and send each batch
+        for (part_key, mut builder) in take_map.into_iter() {
+            // Take method adapted from https://github.com/lancedb/lance/pull/1337/files
+            // TODO: upstream RecordBatch::take to arrow-rs
+            let take_indices = builder.finish();
+            let struct_array: StructArray = rb.clone().into();
+            let parted_batch = RecordBatch::try_from(
+                arrow::compute::take(&struct_array, &take_indices, None)?.as_struct(),
+            )
+            .map_err(|_| {
+                DataFusionError::Internal("Unexpected error partitioning batch!".into())
+            })?;
+
+            // Get or create channel for this batch
+            let part_tx = match value_map.get_mut(&part_key) {
+                Some(part_tx) => part_tx,
+                None => {
+                    // Create channel for previously unseen distinct partition key and notify consumer of new file
+                    let (part_tx, part_rx) = tokio::sync::mpsc::channel::<RecordBatch>(
+                        max_buffered_recordbatches,
+                    );
+                    let file_path = compute_hive_style_file_path(
+                        &part_key,
+                        &partition_by,
+                        &write_id,
+                        &file_extension,
+                        &base_output_path,
+                    );
+
+                    tx.send((file_path, part_rx)).map_err(|_| {
+                        DataFusionError::Execution(
+                            "Error sending new file stream!".into(),
+                        )
+                    })?;
+
+                    value_map.insert(part_key.clone(), part_tx);
+                    value_map
+                        .get_mut(&part_key)
+                        .ok_or(DataFusionError::Internal(
+                            "Key must exist since it was just inserted!".into(),
+                        ))?
+                }
+            };
+
+            // remove partitions columns
+            let final_batch_to_send =
+                remove_partition_by_columns(&parted_batch, &partition_by)?;
+
+            // Finally send the partial batch partitioned by distinct value!
+            part_tx.send(final_batch_to_send).await.map_err(|_| {
+                DataFusionError::Internal("Unexpected error sending parted batch!".into())
+            })?;
+        }
+    }
+
+    Ok(())
+}
+
+fn compute_partition_keys_by_row<'a>(
+    rb: &'a RecordBatch,
+    partition_by: &'a [(String, DataType)],
+) -> Result<Vec<Vec<&'a str>>> {
+    let mut all_partition_values = vec![];
+
+    for (col, dtype) in partition_by.iter() {
+        let mut partition_values = vec![];
+        let col_array =
+            rb.column_by_name(col)
+                .ok_or(DataFusionError::Execution(format!(
+                    "PartitionBy Column {} does not exist in source data!",
+                    col
+                )))?;
+
+        match dtype {
+            DataType::Utf8 => {
+                let array = as_string_array(col_array)?;
+                for i in 0..rb.num_rows() {
+                    partition_values.push(array.value(i));
+                }
+            }
+            _ => {
+                return Err(DataFusionError::NotImplemented(format!(
+                "it is not yet supported to write to hive partitions with datatype {}",
+                dtype
+            )))
+            }
+        }
+
+        all_partition_values.push(partition_values);
+    }
+
+    Ok(all_partition_values)
+}
+
+fn compute_take_arrays(
+    rb: &RecordBatch,
+    all_partition_values: Vec<Vec<&str>>,
+) -> HashMap<Vec<String>, UInt64Builder> {
+    let mut take_map = HashMap::new();
+    for i in 0..rb.num_rows() {
+        let mut part_key = vec![];
+        for vals in all_partition_values.iter() {
+            part_key.push(vals[i].to_owned());

Review Comment:
   is there any reason to use `to_owned` here? given `take_map` isn't returned I think it would be fine to store `&str` values here too
   
   ```suggestion
               part_key.push(vals[i]);
   ```



##########
datafusion/core/src/datasource/file_format/write/demux.rs:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
+//! dividing input stream into multiple output files at execution time
+
+use std::collections::HashMap;
+
+use std::sync::Arc;
+
+use crate::datasource::listing::ListingTableUrl;
+
+use crate::error::Result;
+use crate::physical_plan::SendableRecordBatchStream;
+
+use arrow_array::builder::UInt64Builder;
+use arrow_array::cast::AsArray;
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::{DataType, Schema};
+use datafusion_common::cast::as_string_array;
+use datafusion_common::DataFusionError;
+
+use datafusion_execution::TaskContext;
+
+use futures::StreamExt;
+use object_store::path::Path;
+
+use rand::distributions::DistString;
+
+use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio::task::JoinHandle;
+
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the unique
+/// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>``
+///                                                                              ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     ┌──────▶ │  batch 1  ├────▶...──────▶│   Batch a  │    │ Output File1│
+///                                                                     │        └───────────┘               └────────────┘    └─────────────┘
+///                                                                     │
+///                                                 ┌──────────┐        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+/// ┌───────────┐               ┌────────────┐      │          │        ├──────▶ │  batch a+1├────▶...──────▶│   Batch b  │    │ Output File2│
+/// │  batch 1  ├────▶...──────▶│   Batch N  ├─────▶│  Demux   ├────────┤ ...    └───────────┘               └────────────┘    └─────────────┘
+/// └───────────┘               └────────────┘      │          │        │
+///                                                 └──────────┘        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     └──────▶ │  batch d  ├────▶...──────▶│   Batch n  │    │ Output FileN│
+///                                                                              └───────────┘               └────────────┘    └─────────────┘
+pub(crate) fn start_demuxer_task(
+    input: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    partition_by: Option<Vec<(String, DataType)>>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
+    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+    let context = context.clone();
+    let task: JoinHandle<std::result::Result<(), DataFusionError>> = match partition_by {
+        Some(parts) => {
+            // There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
+            // bound this channel without risking a deadlock.
+            tokio::spawn(async move {
+                hive_style_partitions_demuxer(
+                    tx,
+                    input,
+                    context,
+                    parts,
+                    base_output_path,
+                    file_extension,
+                )
+                .await
+            })
+        }
+        None => tokio::spawn(async move {
+            row_count_demuxer(
+                tx,
+                input,
+                context,
+                base_output_path,
+                file_extension,
+                single_file_output,
+            )
+            .await
+        }),
+    };
+
+    (task, rx)
+}
+
+/// Dynamically partitions input stream to acheive desired maximum rows per file
+async fn row_count_demuxer(
+    mut tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> Result<()> {
+    let exec_options = &context.session_config().options().execution;
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
+    let mut total_rows_current_file = 0;
+    let mut part_idx = 0;
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let mut tx_file = create_new_file_stream(
+        &base_output_path,
+        &write_id,
+        part_idx,
+        &file_extension,
+        single_file_output,
+        max_buffered_batches,
+        &mut tx,
+    )?;
+    part_idx += 1;
+
+    while let Some(rb) = input.next().await.transpose()? {
+        total_rows_current_file += rb.num_rows();
+        tx_file.send(rb).await.map_err(|_| {
+            DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+        })?;
+
+        if total_rows_current_file >= max_rows_per_file && !single_file_output {
+            total_rows_current_file = 0;
+            tx_file = create_new_file_stream(
+                &base_output_path,
+                &write_id,
+                part_idx,
+                &file_extension,
+                single_file_output,
+                max_buffered_batches,
+                &mut tx,
+            )?;
+            part_idx += 1;
+        }
+    }
+    Ok(())
+}
+
+/// Helper for row count demuxer
+fn generate_file_path(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+) -> Path {
+    if !single_file_output {
+        base_output_path
+            .prefix()
+            .child(format!("{}_{}.{}", write_id, part_idx, file_extension))
+    } else {
+        base_output_path.prefix().to_owned()
+    }
+}
+
+/// Helper for row count demuxer
+fn create_new_file_stream(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+    max_buffered_batches: usize,
+    tx: &mut UnboundedSender<(Path, Receiver<RecordBatch>)>,
+) -> Result<Sender<RecordBatch>> {
+    let file_path = generate_file_path(
+        base_output_path,
+        write_id,
+        part_idx,
+        file_extension,
+        single_file_output,
+    );
+    let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2);
+    tx.send((file_path, rx_file)).map_err(|_| {
+        DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+    })?;
+    Ok(tx_file)
+}
+
+/// Splits an input stream based on the distinct values of a set of columns
+/// Assumes standard hive style partition paths such as
+/// /col1=val1/col2=val2/outputfile.parquet
+async fn hive_style_partitions_demuxer(
+    tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    partition_by: Vec<(String, DataType)>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+) -> Result<()> {
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let exec_options = &context.session_config().options().execution;
+    let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file;
+
+    // To support non string partition col types, cast the type to &str first
+    let mut value_map: HashMap<Vec<String>, Sender<RecordBatch>> = HashMap::new();
+
+    while let Some(rb) = input.next().await.transpose()? {
+        // First compute partition key for each row of batch, e.g. (col1=val1, col2=val2, ...)
+        let all_partition_values = compute_partition_keys_by_row(&rb, &partition_by)?;
+
+        // Next compute how the batch should be split up to take each distinct key to its own batch
+        let take_map = compute_take_arrays(&rb, all_partition_values);
+
+        // Divide up the batch into distinct partition key batches and send each batch
+        for (part_key, mut builder) in take_map.into_iter() {
+            // Take method adapted from https://github.com/lancedb/lance/pull/1337/files
+            // TODO: upstream RecordBatch::take to arrow-rs
+            let take_indices = builder.finish();
+            let struct_array: StructArray = rb.clone().into();
+            let parted_batch = RecordBatch::try_from(
+                arrow::compute::take(&struct_array, &take_indices, None)?.as_struct(),
+            )
+            .map_err(|_| {
+                DataFusionError::Internal("Unexpected error partitioning batch!".into())
+            })?;
+
+            // Get or create channel for this batch
+            let part_tx = match value_map.get_mut(&part_key) {
+                Some(part_tx) => part_tx,
+                None => {
+                    // Create channel for previously unseen distinct partition key and notify consumer of new file
+                    let (part_tx, part_rx) = tokio::sync::mpsc::channel::<RecordBatch>(
+                        max_buffered_recordbatches,
+                    );
+                    let file_path = compute_hive_style_file_path(
+                        &part_key,
+                        &partition_by,
+                        &write_id,
+                        &file_extension,
+                        &base_output_path,
+                    );
+
+                    tx.send((file_path, part_rx)).map_err(|_| {
+                        DataFusionError::Execution(
+                            "Error sending new file stream!".into(),
+                        )
+                    })?;
+
+                    value_map.insert(part_key.clone(), part_tx);
+                    value_map
+                        .get_mut(&part_key)
+                        .ok_or(DataFusionError::Internal(
+                            "Key must exist since it was just inserted!".into(),
+                        ))?
+                }
+            };
+
+            // remove partitions columns
+            let final_batch_to_send =
+                remove_partition_by_columns(&parted_batch, &partition_by)?;
+
+            // Finally send the partial batch partitioned by distinct value!
+            part_tx.send(final_batch_to_send).await.map_err(|_| {
+                DataFusionError::Internal("Unexpected error sending parted batch!".into())
+            })?;
+        }
+    }
+
+    Ok(())
+}
+
+fn compute_partition_keys_by_row<'a>(
+    rb: &'a RecordBatch,
+    partition_by: &'a [(String, DataType)],
+) -> Result<Vec<Vec<&'a str>>> {
+    let mut all_partition_values = vec![];
+
+    for (col, dtype) in partition_by.iter() {
+        let mut partition_values = vec![];
+        let col_array =
+            rb.column_by_name(col)
+                .ok_or(DataFusionError::Execution(format!(
+                    "PartitionBy Column {} does not exist in source data!",
+                    col
+                )))?;
+
+        match dtype {
+            DataType::Utf8 => {
+                let array = as_string_array(col_array)?;
+                for i in 0..rb.num_rows() {
+                    partition_values.push(array.value(i));
+                }
+            }
+            _ => {
+                return Err(DataFusionError::NotImplemented(format!(
+                "it is not yet supported to write to hive partitions with datatype {}",

Review Comment:
   ```suggestion
                   "it is not yet supported to write to hive partitions with datatype {} while writing column {col}",
   ```



##########
datafusion/core/src/datasource/file_format/write/demux.rs:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
+//! dividing input stream into multiple output files at execution time
+
+use std::collections::HashMap;
+
+use std::sync::Arc;
+
+use crate::datasource::listing::ListingTableUrl;
+
+use crate::error::Result;
+use crate::physical_plan::SendableRecordBatchStream;
+
+use arrow_array::builder::UInt64Builder;
+use arrow_array::cast::AsArray;
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::{DataType, Schema};
+use datafusion_common::cast::as_string_array;
+use datafusion_common::DataFusionError;
+
+use datafusion_execution::TaskContext;
+
+use futures::StreamExt;
+use object_store::path::Path;
+
+use rand::distributions::DistString;
+
+use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio::task::JoinHandle;
+
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the unique
+/// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>``
+///                                                                              ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     ┌──────▶ │  batch 1  ├────▶...──────▶│   Batch a  │    │ Output File1│
+///                                                                     │        └───────────┘               └────────────┘    └─────────────┘
+///                                                                     │
+///                                                 ┌──────────┐        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+/// ┌───────────┐               ┌────────────┐      │          │        ├──────▶ │  batch a+1├────▶...──────▶│   Batch b  │    │ Output File2│
+/// │  batch 1  ├────▶...──────▶│   Batch N  ├─────▶│  Demux   ├────────┤ ...    └───────────┘               └────────────┘    └─────────────┘
+/// └───────────┘               └────────────┘      │          │        │
+///                                                 └──────────┘        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     └──────▶ │  batch d  ├────▶...──────▶│   Batch n  │    │ Output FileN│
+///                                                                              └───────────┘               └────────────┘    └─────────────┘
+pub(crate) fn start_demuxer_task(
+    input: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    partition_by: Option<Vec<(String, DataType)>>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
+    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+    let context = context.clone();
+    let task: JoinHandle<std::result::Result<(), DataFusionError>> = match partition_by {
+        Some(parts) => {
+            // There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
+            // bound this channel without risking a deadlock.
+            tokio::spawn(async move {
+                hive_style_partitions_demuxer(
+                    tx,
+                    input,
+                    context,
+                    parts,
+                    base_output_path,
+                    file_extension,
+                )
+                .await
+            })
+        }
+        None => tokio::spawn(async move {
+            row_count_demuxer(
+                tx,
+                input,
+                context,
+                base_output_path,
+                file_extension,
+                single_file_output,
+            )
+            .await
+        }),
+    };
+
+    (task, rx)
+}
+
+/// Dynamically partitions input stream to acheive desired maximum rows per file
+async fn row_count_demuxer(
+    mut tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> Result<()> {
+    let exec_options = &context.session_config().options().execution;
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
+    let mut total_rows_current_file = 0;
+    let mut part_idx = 0;
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let mut tx_file = create_new_file_stream(
+        &base_output_path,
+        &write_id,
+        part_idx,
+        &file_extension,
+        single_file_output,
+        max_buffered_batches,
+        &mut tx,
+    )?;
+    part_idx += 1;
+
+    while let Some(rb) = input.next().await.transpose()? {
+        total_rows_current_file += rb.num_rows();
+        tx_file.send(rb).await.map_err(|_| {
+            DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+        })?;
+
+        if total_rows_current_file >= max_rows_per_file && !single_file_output {
+            total_rows_current_file = 0;
+            tx_file = create_new_file_stream(
+                &base_output_path,
+                &write_id,
+                part_idx,
+                &file_extension,
+                single_file_output,
+                max_buffered_batches,
+                &mut tx,
+            )?;
+            part_idx += 1;
+        }
+    }
+    Ok(())
+}
+
+/// Helper for row count demuxer
+fn generate_file_path(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+) -> Path {
+    if !single_file_output {
+        base_output_path
+            .prefix()
+            .child(format!("{}_{}.{}", write_id, part_idx, file_extension))
+    } else {
+        base_output_path.prefix().to_owned()
+    }
+}
+
+/// Helper for row count demuxer
+fn create_new_file_stream(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+    max_buffered_batches: usize,
+    tx: &mut UnboundedSender<(Path, Receiver<RecordBatch>)>,
+) -> Result<Sender<RecordBatch>> {
+    let file_path = generate_file_path(
+        base_output_path,
+        write_id,
+        part_idx,
+        file_extension,
+        single_file_output,
+    );
+    let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2);
+    tx.send((file_path, rx_file)).map_err(|_| {
+        DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+    })?;
+    Ok(tx_file)
+}
+
+/// Splits an input stream based on the distinct values of a set of columns
+/// Assumes standard hive style partition paths such as
+/// /col1=val1/col2=val2/outputfile.parquet
+async fn hive_style_partitions_demuxer(
+    tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    partition_by: Vec<(String, DataType)>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+) -> Result<()> {
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let exec_options = &context.session_config().options().execution;
+    let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file;
+
+    // To support non string partition col types, cast the type to &str first
+    let mut value_map: HashMap<Vec<String>, Sender<RecordBatch>> = HashMap::new();
+
+    while let Some(rb) = input.next().await.transpose()? {
+        // First compute partition key for each row of batch, e.g. (col1=val1, col2=val2, ...)
+        let all_partition_values = compute_partition_keys_by_row(&rb, &partition_by)?;
+
+        // Next compute how the batch should be split up to take each distinct key to its own batch
+        let take_map = compute_take_arrays(&rb, all_partition_values);
+
+        // Divide up the batch into distinct partition key batches and send each batch
+        for (part_key, mut builder) in take_map.into_iter() {
+            // Take method adapted from https://github.com/lancedb/lance/pull/1337/files
+            // TODO: upstream RecordBatch::take to arrow-rs

Review Comment:
   ```suggestion
               // TODO: use RecordBatch::take in arrow-rs https://github.com/apache/arrow-rs/issues/4958
   ```



##########
datafusion/core/src/datasource/file_format/write/demux.rs:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
+//! dividing input stream into multiple output files at execution time
+
+use std::collections::HashMap;
+
+use std::sync::Arc;
+
+use crate::datasource::listing::ListingTableUrl;
+
+use crate::error::Result;
+use crate::physical_plan::SendableRecordBatchStream;
+
+use arrow_array::builder::UInt64Builder;
+use arrow_array::cast::AsArray;
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::{DataType, Schema};
+use datafusion_common::cast::as_string_array;
+use datafusion_common::DataFusionError;
+
+use datafusion_execution::TaskContext;
+
+use futures::StreamExt;
+use object_store::path::Path;
+
+use rand::distributions::DistString;
+
+use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio::task::JoinHandle;
+
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the unique
+/// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>``
+///                                                                              ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     ┌──────▶ │  batch 1  ├────▶...──────▶│   Batch a  │    │ Output File1│
+///                                                                     │        └───────────┘               └────────────┘    └─────────────┘
+///                                                                     │
+///                                                 ┌──────────┐        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+/// ┌───────────┐               ┌────────────┐      │          │        ├──────▶ │  batch a+1├────▶...──────▶│   Batch b  │    │ Output File2│
+/// │  batch 1  ├────▶...──────▶│   Batch N  ├─────▶│  Demux   ├────────┤ ...    └───────────┘               └────────────┘    └─────────────┘
+/// └───────────┘               └────────────┘      │          │        │
+///                                                 └──────────┘        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     └──────▶ │  batch d  ├────▶...──────▶│   Batch n  │    │ Output FileN│
+///                                                                              └───────────┘               └────────────┘    └─────────────┘
+pub(crate) fn start_demuxer_task(
+    input: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    partition_by: Option<Vec<(String, DataType)>>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
+    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+    let context = context.clone();
+    let task: JoinHandle<std::result::Result<(), DataFusionError>> = match partition_by {
+        Some(parts) => {
+            // There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
+            // bound this channel without risking a deadlock.
+            tokio::spawn(async move {
+                hive_style_partitions_demuxer(
+                    tx,
+                    input,
+                    context,
+                    parts,
+                    base_output_path,
+                    file_extension,
+                )
+                .await
+            })
+        }
+        None => tokio::spawn(async move {
+            row_count_demuxer(
+                tx,
+                input,
+                context,
+                base_output_path,
+                file_extension,
+                single_file_output,
+            )
+            .await
+        }),
+    };
+
+    (task, rx)
+}
+
+/// Dynamically partitions input stream to acheive desired maximum rows per file
+async fn row_count_demuxer(
+    mut tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> Result<()> {
+    let exec_options = &context.session_config().options().execution;
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
+    let mut total_rows_current_file = 0;
+    let mut part_idx = 0;
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let mut tx_file = create_new_file_stream(
+        &base_output_path,
+        &write_id,
+        part_idx,
+        &file_extension,
+        single_file_output,
+        max_buffered_batches,
+        &mut tx,
+    )?;
+    part_idx += 1;
+
+    while let Some(rb) = input.next().await.transpose()? {
+        total_rows_current_file += rb.num_rows();
+        tx_file.send(rb).await.map_err(|_| {
+            DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+        })?;
+
+        if total_rows_current_file >= max_rows_per_file && !single_file_output {
+            total_rows_current_file = 0;
+            tx_file = create_new_file_stream(
+                &base_output_path,
+                &write_id,
+                part_idx,
+                &file_extension,
+                single_file_output,
+                max_buffered_batches,
+                &mut tx,
+            )?;
+            part_idx += 1;
+        }
+    }
+    Ok(())
+}
+
+/// Helper for row count demuxer
+fn generate_file_path(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+) -> Path {
+    if !single_file_output {
+        base_output_path
+            .prefix()
+            .child(format!("{}_{}.{}", write_id, part_idx, file_extension))
+    } else {
+        base_output_path.prefix().to_owned()
+    }
+}
+
+/// Helper for row count demuxer
+fn create_new_file_stream(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+    max_buffered_batches: usize,
+    tx: &mut UnboundedSender<(Path, Receiver<RecordBatch>)>,
+) -> Result<Sender<RecordBatch>> {
+    let file_path = generate_file_path(
+        base_output_path,
+        write_id,
+        part_idx,
+        file_extension,
+        single_file_output,
+    );
+    let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2);
+    tx.send((file_path, rx_file)).map_err(|_| {
+        DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+    })?;
+    Ok(tx_file)
+}
+
+/// Splits an input stream based on the distinct values of a set of columns
+/// Assumes standard hive style partition paths such as
+/// /col1=val1/col2=val2/outputfile.parquet
+async fn hive_style_partitions_demuxer(
+    tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    partition_by: Vec<(String, DataType)>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+) -> Result<()> {
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let exec_options = &context.session_config().options().execution;
+    let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file;
+
+    // To support non string partition col types, cast the type to &str first
+    let mut value_map: HashMap<Vec<String>, Sender<RecordBatch>> = HashMap::new();
+
+    while let Some(rb) = input.next().await.transpose()? {
+        // First compute partition key for each row of batch, e.g. (col1=val1, col2=val2, ...)
+        let all_partition_values = compute_partition_keys_by_row(&rb, &partition_by)?;
+
+        // Next compute how the batch should be split up to take each distinct key to its own batch
+        let take_map = compute_take_arrays(&rb, all_partition_values);
+
+        // Divide up the batch into distinct partition key batches and send each batch
+        for (part_key, mut builder) in take_map.into_iter() {
+            // Take method adapted from https://github.com/lancedb/lance/pull/1337/files
+            // TODO: upstream RecordBatch::take to arrow-rs
+            let take_indices = builder.finish();
+            let struct_array: StructArray = rb.clone().into();
+            let parted_batch = RecordBatch::try_from(
+                arrow::compute::take(&struct_array, &take_indices, None)?.as_struct(),
+            )
+            .map_err(|_| {
+                DataFusionError::Internal("Unexpected error partitioning batch!".into())
+            })?;
+
+            // Get or create channel for this batch
+            let part_tx = match value_map.get_mut(&part_key) {
+                Some(part_tx) => part_tx,
+                None => {
+                    // Create channel for previously unseen distinct partition key and notify consumer of new file
+                    let (part_tx, part_rx) = tokio::sync::mpsc::channel::<RecordBatch>(
+                        max_buffered_recordbatches,
+                    );
+                    let file_path = compute_hive_style_file_path(
+                        &part_key,
+                        &partition_by,
+                        &write_id,
+                        &file_extension,
+                        &base_output_path,
+                    );
+
+                    tx.send((file_path, part_rx)).map_err(|_| {
+                        DataFusionError::Execution(
+                            "Error sending new file stream!".into(),
+                        )
+                    })?;
+
+                    value_map.insert(part_key.clone(), part_tx);
+                    value_map
+                        .get_mut(&part_key)
+                        .ok_or(DataFusionError::Internal(
+                            "Key must exist since it was just inserted!".into(),
+                        ))?
+                }
+            };
+
+            // remove partitions columns
+            let final_batch_to_send =
+                remove_partition_by_columns(&parted_batch, &partition_by)?;
+
+            // Finally send the partial batch partitioned by distinct value!
+            part_tx.send(final_batch_to_send).await.map_err(|_| {
+                DataFusionError::Internal("Unexpected error sending parted batch!".into())
+            })?;
+        }
+    }
+
+    Ok(())
+}
+
+fn compute_partition_keys_by_row<'a>(
+    rb: &'a RecordBatch,
+    partition_by: &'a [(String, DataType)],
+) -> Result<Vec<Vec<&'a str>>> {
+    let mut all_partition_values = vec![];
+
+    for (col, dtype) in partition_by.iter() {
+        let mut partition_values = vec![];
+        let col_array =
+            rb.column_by_name(col)
+                .ok_or(DataFusionError::Execution(format!(
+                    "PartitionBy Column {} does not exist in source data!",
+                    col
+                )))?;
+
+        match dtype {
+            DataType::Utf8 => {
+                let array = as_string_array(col_array)?;
+                for i in 0..rb.num_rows() {
+                    partition_values.push(array.value(i));
+                }
+            }
+            _ => {
+                return Err(DataFusionError::NotImplemented(format!(
+                "it is not yet supported to write to hive partitions with datatype {}",
+                dtype
+            )))
+            }
+        }
+
+        all_partition_values.push(partition_values);
+    }
+
+    Ok(all_partition_values)
+}
+
+fn compute_take_arrays(
+    rb: &RecordBatch,
+    all_partition_values: Vec<Vec<&str>>,
+) -> HashMap<Vec<String>, UInt64Builder> {
+    let mut take_map = HashMap::new();
+    for i in 0..rb.num_rows() {
+        let mut part_key = vec![];
+        for vals in all_partition_values.iter() {
+            part_key.push(vals[i].to_owned());
+        }
+        let builder = take_map.entry(part_key).or_insert(UInt64Builder::new());
+        builder.append_value(i as u64);
+    }
+    take_map
+}
+
+fn remove_partition_by_columns(
+    parted_batch: &RecordBatch,
+    partition_by: &Vec<(String, DataType)>,
+) -> Result<RecordBatch> {
+    let end_idx = parted_batch.num_columns() - partition_by.len();

Review Comment:
   You could probably express this more concisely using `[filter()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.filter) to discard any partition columns



##########
datafusion/core/src/datasource/file_format/write/demux.rs:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
+//! dividing input stream into multiple output files at execution time
+
+use std::collections::HashMap;
+
+use std::sync::Arc;
+
+use crate::datasource::listing::ListingTableUrl;
+
+use crate::error::Result;
+use crate::physical_plan::SendableRecordBatchStream;
+
+use arrow_array::builder::UInt64Builder;
+use arrow_array::cast::AsArray;
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::{DataType, Schema};
+use datafusion_common::cast::as_string_array;
+use datafusion_common::DataFusionError;
+
+use datafusion_execution::TaskContext;
+
+use futures::StreamExt;
+use object_store::path::Path;
+
+use rand::distributions::DistString;
+
+use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio::task::JoinHandle;
+
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the unique
+/// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>``
+///                                                                              ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     ┌──────▶ │  batch 1  ├────▶...──────▶│   Batch a  │    │ Output File1│
+///                                                                     │        └───────────┘               └────────────┘    └─────────────┘
+///                                                                     │
+///                                                 ┌──────────┐        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+/// ┌───────────┐               ┌────────────┐      │          │        ├──────▶ │  batch a+1├────▶...──────▶│   Batch b  │    │ Output File2│
+/// │  batch 1  ├────▶...──────▶│   Batch N  ├─────▶│  Demux   ├────────┤ ...    └───────────┘               └────────────┘    └─────────────┘
+/// └───────────┘               └────────────┘      │          │        │
+///                                                 └──────────┘        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     └──────▶ │  batch d  ├────▶...──────▶│   Batch n  │    │ Output FileN│
+///                                                                              └───────────┘               └────────────┘    └─────────────┘
+pub(crate) fn start_demuxer_task(
+    input: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    partition_by: Option<Vec<(String, DataType)>>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
+    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+    let context = context.clone();
+    let task: JoinHandle<std::result::Result<(), DataFusionError>> = match partition_by {
+        Some(parts) => {
+            // There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
+            // bound this channel without risking a deadlock.
+            tokio::spawn(async move {
+                hive_style_partitions_demuxer(
+                    tx,
+                    input,
+                    context,
+                    parts,
+                    base_output_path,
+                    file_extension,
+                )
+                .await
+            })
+        }
+        None => tokio::spawn(async move {
+            row_count_demuxer(
+                tx,
+                input,
+                context,
+                base_output_path,
+                file_extension,
+                single_file_output,
+            )
+            .await
+        }),
+    };
+
+    (task, rx)
+}
+
+/// Dynamically partitions input stream to acheive desired maximum rows per file
+async fn row_count_demuxer(
+    mut tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> Result<()> {
+    let exec_options = &context.session_config().options().execution;
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
+    let mut total_rows_current_file = 0;
+    let mut part_idx = 0;
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let mut tx_file = create_new_file_stream(
+        &base_output_path,
+        &write_id,
+        part_idx,
+        &file_extension,
+        single_file_output,
+        max_buffered_batches,
+        &mut tx,
+    )?;
+    part_idx += 1;
+
+    while let Some(rb) = input.next().await.transpose()? {
+        total_rows_current_file += rb.num_rows();
+        tx_file.send(rb).await.map_err(|_| {
+            DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+        })?;
+
+        if total_rows_current_file >= max_rows_per_file && !single_file_output {
+            total_rows_current_file = 0;
+            tx_file = create_new_file_stream(
+                &base_output_path,
+                &write_id,
+                part_idx,
+                &file_extension,
+                single_file_output,
+                max_buffered_batches,
+                &mut tx,
+            )?;
+            part_idx += 1;
+        }
+    }
+    Ok(())
+}
+
+/// Helper for row count demuxer
+fn generate_file_path(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+) -> Path {
+    if !single_file_output {
+        base_output_path
+            .prefix()
+            .child(format!("{}_{}.{}", write_id, part_idx, file_extension))
+    } else {
+        base_output_path.prefix().to_owned()
+    }
+}
+
+/// Helper for row count demuxer
+fn create_new_file_stream(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+    max_buffered_batches: usize,
+    tx: &mut UnboundedSender<(Path, Receiver<RecordBatch>)>,
+) -> Result<Sender<RecordBatch>> {
+    let file_path = generate_file_path(
+        base_output_path,
+        write_id,
+        part_idx,
+        file_extension,
+        single_file_output,
+    );
+    let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2);
+    tx.send((file_path, rx_file)).map_err(|_| {
+        DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+    })?;
+    Ok(tx_file)
+}
+
+/// Splits an input stream based on the distinct values of a set of columns
+/// Assumes standard hive style partition paths such as
+/// /col1=val1/col2=val2/outputfile.parquet
+async fn hive_style_partitions_demuxer(
+    tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    partition_by: Vec<(String, DataType)>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+) -> Result<()> {
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let exec_options = &context.session_config().options().execution;
+    let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file;
+
+    // To support non string partition col types, cast the type to &str first
+    let mut value_map: HashMap<Vec<String>, Sender<RecordBatch>> = HashMap::new();
+
+    while let Some(rb) = input.next().await.transpose()? {
+        // First compute partition key for each row of batch, e.g. (col1=val1, col2=val2, ...)
+        let all_partition_values = compute_partition_keys_by_row(&rb, &partition_by)?;
+
+        // Next compute how the batch should be split up to take each distinct key to its own batch
+        let take_map = compute_take_arrays(&rb, all_partition_values);
+
+        // Divide up the batch into distinct partition key batches and send each batch
+        for (part_key, mut builder) in take_map.into_iter() {
+            // Take method adapted from https://github.com/lancedb/lance/pull/1337/files
+            // TODO: upstream RecordBatch::take to arrow-rs
+            let take_indices = builder.finish();
+            let struct_array: StructArray = rb.clone().into();
+            let parted_batch = RecordBatch::try_from(
+                arrow::compute::take(&struct_array, &take_indices, None)?.as_struct(),
+            )
+            .map_err(|_| {
+                DataFusionError::Internal("Unexpected error partitioning batch!".into())
+            })?;
+
+            // Get or create channel for this batch
+            let part_tx = match value_map.get_mut(&part_key) {
+                Some(part_tx) => part_tx,
+                None => {
+                    // Create channel for previously unseen distinct partition key and notify consumer of new file
+                    let (part_tx, part_rx) = tokio::sync::mpsc::channel::<RecordBatch>(
+                        max_buffered_recordbatches,
+                    );
+                    let file_path = compute_hive_style_file_path(
+                        &part_key,
+                        &partition_by,
+                        &write_id,
+                        &file_extension,
+                        &base_output_path,
+                    );
+
+                    tx.send((file_path, part_rx)).map_err(|_| {
+                        DataFusionError::Execution(
+                            "Error sending new file stream!".into(),
+                        )
+                    })?;
+
+                    value_map.insert(part_key.clone(), part_tx);
+                    value_map
+                        .get_mut(&part_key)
+                        .ok_or(DataFusionError::Internal(
+                            "Key must exist since it was just inserted!".into(),
+                        ))?
+                }
+            };
+
+            // remove partitions columns
+            let final_batch_to_send =
+                remove_partition_by_columns(&parted_batch, &partition_by)?;
+
+            // Finally send the partial batch partitioned by distinct value!
+            part_tx.send(final_batch_to_send).await.map_err(|_| {
+                DataFusionError::Internal("Unexpected error sending parted batch!".into())
+            })?;
+        }
+    }
+
+    Ok(())
+}
+
+fn compute_partition_keys_by_row<'a>(
+    rb: &'a RecordBatch,
+    partition_by: &'a [(String, DataType)],
+) -> Result<Vec<Vec<&'a str>>> {
+    let mut all_partition_values = vec![];
+
+    for (col, dtype) in partition_by.iter() {
+        let mut partition_values = vec![];
+        let col_array =
+            rb.column_by_name(col)
+                .ok_or(DataFusionError::Execution(format!(
+                    "PartitionBy Column {} does not exist in source data!",
+                    col
+                )))?;
+
+        match dtype {
+            DataType::Utf8 => {
+                let array = as_string_array(col_array)?;
+                for i in 0..rb.num_rows() {
+                    partition_values.push(array.value(i));
+                }
+            }
+            _ => {
+                return Err(DataFusionError::NotImplemented(format!(
+                "it is not yet supported to write to hive partitions with datatype {}",
+                dtype
+            )))
+            }
+        }
+
+        all_partition_values.push(partition_values);
+    }
+
+    Ok(all_partition_values)
+}
+
+fn compute_take_arrays(
+    rb: &RecordBatch,
+    all_partition_values: Vec<Vec<&str>>,
+) -> HashMap<Vec<String>, UInt64Builder> {
+    let mut take_map = HashMap::new();
+    for i in 0..rb.num_rows() {
+        let mut part_key = vec![];
+        for vals in all_partition_values.iter() {
+            part_key.push(vals[i].to_owned());
+        }
+        let builder = take_map.entry(part_key).or_insert(UInt64Builder::new());
+        builder.append_value(i as u64);
+    }
+    take_map
+}
+
+fn remove_partition_by_columns(
+    parted_batch: &RecordBatch,
+    partition_by: &Vec<(String, DataType)>,
+) -> Result<RecordBatch> {
+    let end_idx = parted_batch.num_columns() - partition_by.len();
+    let non_part_cols = &parted_batch.columns()[..end_idx];
+    let mut non_part_fields = vec![];
+    'outer: for field in parted_batch.schema().all_fields() {
+        let name = field.name();
+        for (part_name, _) in partition_by.iter() {
+            if name == part_name {
+                continue 'outer;
+            }
+        }
+        non_part_fields.push(field.to_owned())
+    }
+    let schema = Schema::new(non_part_fields);
+    let final_batch_to_send =
+        RecordBatch::try_new(Arc::new(schema), non_part_cols.into())?;
+
+    Ok(final_batch_to_send)
+}
+
+fn compute_hive_style_file_path(
+    part_key: &Vec<String>,
+    partition_by: &[(String, DataType)],
+    write_id: &str,
+    file_extension: &str,
+    base_output_path: &ListingTableUrl,
+) -> Path {
+    let mut file_path = base_output_path.prefix().clone();
+    for j in 0..part_key.len() {
+        file_path = file_path.child(format!("{}={}", partition_by[j].0, part_key[j]));
+    }
+
+    file_path.child(format!("{}.{}", write_id, file_extension))
+}

Review Comment:
   What would you think about writing some unit tests of the partition extraction and schema pruning code? Perhaps those tests could be used to cover the issue I found while testing this end to end?



##########
datafusion/core/src/datasource/file_format/write/mod.rs:
##########
@@ -0,0 +1,305 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
+//! write support for the various file formats
+
+use std::io::Error;
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
+
+use crate::datasource::physical_plan::FileMeta;
+use crate::error::Result;
+
+use arrow_array::RecordBatch;
+
+use datafusion_common::{exec_err, DataFusionError};
+
+use async_trait::async_trait;
+use bytes::Bytes;
+
+use futures::future::BoxFuture;
+use futures::ready;
+use futures::FutureExt;
+use object_store::path::Path;
+use object_store::{MultipartId, ObjectMeta, ObjectStore};
+
+use tokio::io::AsyncWrite;
+
+pub(crate) mod demux;
+pub(crate) mod orchestration;
+
+/// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores.
+/// It is specifically designed for the `object_store` crate's `put` method and sends
+/// whole bytes at once when the buffer is flushed.
+pub struct AsyncPutWriter {
+    /// Object metadata
+    object_meta: ObjectMeta,
+    /// A shared reference to the object store
+    store: Arc<dyn ObjectStore>,
+    /// A buffer that stores the bytes to be sent
+    current_buffer: Vec<u8>,
+    /// Used for async handling in flush method
+    inner_state: AsyncPutState,
+}
+
+impl AsyncPutWriter {
+    /// Constructor for the `AsyncPutWriter` object
+    pub fn new(object_meta: ObjectMeta, store: Arc<dyn ObjectStore>) -> Self {
+        Self {
+            object_meta,
+            store,
+            current_buffer: vec![],
+            // The writer starts out in buffering mode
+            inner_state: AsyncPutState::Buffer,
+        }
+    }
+
+    /// Separate implementation function that unpins the [`AsyncPutWriter`] so
+    /// that partial borrows work correctly
+    fn poll_shutdown_inner(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        loop {
+            match &mut self.inner_state {
+                AsyncPutState::Buffer => {
+                    // Convert the current buffer to bytes and take ownership of it
+                    let bytes = Bytes::from(mem::take(&mut self.current_buffer));
+                    // Set the inner state to Put variant with the bytes
+                    self.inner_state = AsyncPutState::Put { bytes }
+                }
+                AsyncPutState::Put { bytes } => {
+                    // Send the bytes to the object store's put method
+                    return Poll::Ready(
+                        ready!(self
+                            .store
+                            .put(&self.object_meta.location, bytes.clone())
+                            .poll_unpin(cx))
+                        .map_err(Error::from),
+                    );
+                }
+            }
+        }
+    }
+}
+
+/// An enum that represents the inner state of AsyncPut
+enum AsyncPutState {
+    /// Building Bytes struct in this state
+    Buffer,
+    /// Data in the buffer is being sent to the object store
+    Put { bytes: Bytes },
+}
+
+impl AsyncWrite for AsyncPutWriter {
+    // Define the implementation of the AsyncWrite trait for the `AsyncPutWriter` struct
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        _: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<std::result::Result<usize, Error>> {
+        // Extend the current buffer with the incoming buffer
+        self.current_buffer.extend_from_slice(buf);
+        // Return a ready poll with the length of the incoming buffer
+        Poll::Ready(Ok(buf.len()))
+    }
+
+    fn poll_flush(
+        self: Pin<&mut Self>,
+        _: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        // Return a ready poll with an empty result
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_shutdown(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        // Call the poll_shutdown_inner method to handle the actual sending of data to the object store
+        self.poll_shutdown_inner(cx)
+    }
+}
+
+/// Stores data needed during abortion of MultiPart writers
+pub(crate) struct MultiPart {
+    /// A shared reference to the object store
+    store: Arc<dyn ObjectStore>,
+    multipart_id: MultipartId,
+    location: Path,
+}
+
+impl MultiPart {
+    /// Create a new `MultiPart`
+    pub fn new(
+        store: Arc<dyn ObjectStore>,
+        multipart_id: MultipartId,
+        location: Path,
+    ) -> Self {
+        Self {
+            store,
+            multipart_id,
+            location,
+        }
+    }
+}
+
+pub(crate) enum AbortMode {
+    Put,
+    Append,
+    MultiPart(MultiPart),
+}
+
+/// A wrapper struct with abort method and writer
+pub(crate) struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
+    writer: W,
+    mode: AbortMode,
+}
+
+impl<W: AsyncWrite + Unpin + Send> AbortableWrite<W> {
+    /// Create a new `AbortableWrite` instance with the given writer, and write mode.
+    pub(crate) fn new(writer: W, mode: AbortMode) -> Self {
+        Self { writer, mode }
+    }
+
+    /// handling of abort for different write modes
+    pub(crate) fn abort_writer(&self) -> Result<BoxFuture<'static, Result<()>>> {
+        match &self.mode {
+            AbortMode::Put => Ok(async { Ok(()) }.boxed()),
+            AbortMode::Append => exec_err!("Cannot abort in append mode"),
+            AbortMode::MultiPart(MultiPart {
+                store,
+                multipart_id,
+                location,
+            }) => {
+                let location = location.clone();
+                let multipart_id = multipart_id.clone();
+                let store = store.clone();
+                Ok(Box::pin(async move {
+                    store
+                        .abort_multipart(&location, &multipart_id)
+                        .await
+                        .map_err(DataFusionError::ObjectStore)
+                }))
+            }
+        }
+    }
+}
+
+impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AbortableWrite<W> {
+    fn poll_write(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<std::result::Result<usize, Error>> {
+        Pin::new(&mut self.get_mut().writer).poll_write(cx, buf)
+    }
+
+    fn poll_flush(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        Pin::new(&mut self.get_mut().writer).poll_flush(cx)
+    }
+
+    fn poll_shutdown(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        Pin::new(&mut self.get_mut().writer).poll_shutdown(cx)
+    }
+}
+
+/// An enum that defines different file writer modes.
+#[derive(Debug, Clone, Copy)]
+pub enum FileWriterMode {
+    /// Data is appended to an existing file.
+    Append,
+    /// Data is written to a new file.
+    Put,
+    /// Data is written to a new file in multiple parts.
+    PutMultipart,
+}
+/// A trait that defines the methods required for a RecordBatch serializer.
+#[async_trait]
+pub trait BatchSerializer: Unpin + Send {

Review Comment:
   once we get your outstanding PRs merged, it might be a good time to refactor the code into (even more) modules



##########
datafusion/core/src/datasource/file_format/write/demux.rs:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
+//! dividing input stream into multiple output files at execution time
+
+use std::collections::HashMap;
+
+use std::sync::Arc;
+
+use crate::datasource::listing::ListingTableUrl;
+
+use crate::error::Result;
+use crate::physical_plan::SendableRecordBatchStream;
+
+use arrow_array::builder::UInt64Builder;
+use arrow_array::cast::AsArray;
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::{DataType, Schema};
+use datafusion_common::cast::as_string_array;
+use datafusion_common::DataFusionError;
+
+use datafusion_execution::TaskContext;
+
+use futures::StreamExt;
+use object_store::path::Path;
+
+use rand::distributions::DistString;
+
+use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio::task::JoinHandle;
+
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the unique
+/// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>``
+///                                                                              ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     ┌──────▶ │  batch 1  ├────▶...──────▶│   Batch a  │    │ Output File1│
+///                                                                     │        └───────────┘               └────────────┘    └─────────────┘
+///                                                                     │
+///                                                 ┌──────────┐        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+/// ┌───────────┐               ┌────────────┐      │          │        ├──────▶ │  batch a+1├────▶...──────▶│   Batch b  │    │ Output File2│
+/// │  batch 1  ├────▶...──────▶│   Batch N  ├─────▶│  Demux   ├────────┤ ...    └───────────┘               └────────────┘    └─────────────┘
+/// └───────────┘               └────────────┘      │          │        │
+///                                                 └──────────┘        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
+///                                                                     └──────▶ │  batch d  ├────▶...──────▶│   Batch n  │    │ Output FileN│
+///                                                                              └───────────┘               └────────────┘    └─────────────┘
+pub(crate) fn start_demuxer_task(
+    input: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    partition_by: Option<Vec<(String, DataType)>>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
+    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+    let context = context.clone();
+    let task: JoinHandle<std::result::Result<(), DataFusionError>> = match partition_by {
+        Some(parts) => {
+            // There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
+            // bound this channel without risking a deadlock.
+            tokio::spawn(async move {
+                hive_style_partitions_demuxer(
+                    tx,
+                    input,
+                    context,
+                    parts,
+                    base_output_path,
+                    file_extension,
+                )
+                .await
+            })
+        }
+        None => tokio::spawn(async move {
+            row_count_demuxer(
+                tx,
+                input,
+                context,
+                base_output_path,
+                file_extension,
+                single_file_output,
+            )
+            .await
+        }),
+    };
+
+    (task, rx)
+}
+
+/// Dynamically partitions input stream to acheive desired maximum rows per file
+async fn row_count_demuxer(
+    mut tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> Result<()> {
+    let exec_options = &context.session_config().options().execution;
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
+    let mut total_rows_current_file = 0;
+    let mut part_idx = 0;
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let mut tx_file = create_new_file_stream(
+        &base_output_path,
+        &write_id,
+        part_idx,
+        &file_extension,
+        single_file_output,
+        max_buffered_batches,
+        &mut tx,
+    )?;
+    part_idx += 1;
+
+    while let Some(rb) = input.next().await.transpose()? {
+        total_rows_current_file += rb.num_rows();
+        tx_file.send(rb).await.map_err(|_| {
+            DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+        })?;
+
+        if total_rows_current_file >= max_rows_per_file && !single_file_output {
+            total_rows_current_file = 0;
+            tx_file = create_new_file_stream(
+                &base_output_path,
+                &write_id,
+                part_idx,
+                &file_extension,
+                single_file_output,
+                max_buffered_batches,
+                &mut tx,
+            )?;
+            part_idx += 1;
+        }
+    }
+    Ok(())
+}
+
+/// Helper for row count demuxer
+fn generate_file_path(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+) -> Path {
+    if !single_file_output {
+        base_output_path
+            .prefix()
+            .child(format!("{}_{}.{}", write_id, part_idx, file_extension))
+    } else {
+        base_output_path.prefix().to_owned()
+    }
+}
+
+/// Helper for row count demuxer
+fn create_new_file_stream(
+    base_output_path: &ListingTableUrl,
+    write_id: &str,
+    part_idx: usize,
+    file_extension: &str,
+    single_file_output: bool,
+    max_buffered_batches: usize,
+    tx: &mut UnboundedSender<(Path, Receiver<RecordBatch>)>,
+) -> Result<Sender<RecordBatch>> {
+    let file_path = generate_file_path(
+        base_output_path,
+        write_id,
+        part_idx,
+        file_extension,
+        single_file_output,
+    );
+    let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2);
+    tx.send((file_path, rx_file)).map_err(|_| {
+        DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
+    })?;
+    Ok(tx_file)
+}
+
+/// Splits an input stream based on the distinct values of a set of columns
+/// Assumes standard hive style partition paths such as
+/// /col1=val1/col2=val2/outputfile.parquet
+async fn hive_style_partitions_demuxer(
+    tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    partition_by: Vec<(String, DataType)>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+) -> Result<()> {
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
+
+    let exec_options = &context.session_config().options().execution;
+    let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file;
+
+    // To support non string partition col types, cast the type to &str first
+    let mut value_map: HashMap<Vec<String>, Sender<RecordBatch>> = HashMap::new();
+
+    while let Some(rb) = input.next().await.transpose()? {
+        // First compute partition key for each row of batch, e.g. (col1=val1, col2=val2, ...)
+        let all_partition_values = compute_partition_keys_by_row(&rb, &partition_by)?;
+
+        // Next compute how the batch should be split up to take each distinct key to its own batch
+        let take_map = compute_take_arrays(&rb, all_partition_values);
+
+        // Divide up the batch into distinct partition key batches and send each batch
+        for (part_key, mut builder) in take_map.into_iter() {
+            // Take method adapted from https://github.com/lancedb/lance/pull/1337/files
+            // TODO: upstream RecordBatch::take to arrow-rs

Review Comment:
   I filed https://github.com/apache/arrow-rs/issues/4958 to upstream the issue to arrow-rs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement Hive-Style Partitioned Write Support [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801#issuecomment-1773145548

   Thanks for checking @suremarc  -- what I plan to do is merge this PR and then file follow on tickets for the known issues which we can address as follow on work


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement Hive-Style Partitioned Write Support [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801#issuecomment-1773154212

   I filed these tickets to track the improvements:
   *  https://github.com/apache/arrow-datafusion/issues/7891
   *  https://github.com/apache/arrow-datafusion/issues/7892 t


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement Hive-Style Partitioned Write Support [arrow-datafusion]

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on PR #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801#issuecomment-1771867480

   Thank you @alamb for the review! No worries, the amount of high quality reviews you complete is impressive :bow:.
   
   I spent some time digging into the failure case you found, and it appears to be specific to `datafusion-cli` interpreting the schema of the parquet file incorrectly in the `SELECT` query. You can see in the describe query that the second column is actually a dictionary:
   
   ```
     ❯ describe 'input.parquet';
   +-------------+-------------------------+-------------+
   | column_name | data_type               | is_nullable |
   +-------------+-------------------------+-------------+
   | partition   | Utf8                    | NO          |
   | trace_id    | Dictionary(Int32, Utf8) | YES         |
   +-------------+-------------------------+-------------+
   2 rows in set. Query took 0.001 seconds.
   ```
   
   But when you do a select it is incorrectly specifying the parquet schema as both Utf8 columns. I added some debug print statements to see the following (note that trace_id is type Utf8):
   
   ```
     ❯ insert into test select partition, trace_id from 'input.parquet';
   Table Schema: Field { name: "trace_id", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }
   Input Schema: Field { name: "trace_id", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }
   Object Store error: Generic LocalFileSystem error: Unable to open file /tmp/partition=00000000000000002d0d2b9bbb99d0e3/OdZUZxMhsRWtSdIi.parquet#1: Too many open files (os error 24)
   ```
   
   My theory is that the schema being wrong is resulting in the arrow function `RecordBatch::column_by_name` returning the wrong values corresponding to a different column! Since the demux code uses the column name, it should never end up partitioning by the wrong column unless `RecordBatch::column_by_name` somehow returns the wrong values.
   
   I tried to replicate the error in rust rather than via the cli, but I see different and correct behavior:
   
   ```rust
   use arrow_schema::DataType;
   use datafusion::{
       dataframe::DataFrameWriteOptions,
       prelude::*,
   };
   use datafusion_common::DataFusionError;
   use datafusion_expr::ExprSchemable;
   use object_store::local::LocalFileSystem;
   use std::sync::Arc;
   use url::Url;
   
   const FILENAME: &str =
       "/home/dev/arrow-datafusion/input.parquet";
   
   #[tokio::main]
   async fn main() -> Result<(), DataFusionError> {
       let _ctx = SessionContext::new();
       let local = Arc::new(LocalFileSystem::new());
       let local_url = Url::parse("file://local").unwrap();
       _ctx.runtime_env().register_object_store(&local_url, local);
   
       let _read_options = ParquetReadOptions::default();
   
       let _df = _ctx
           .read_parquet(FILENAME, _read_options.clone())
           .await?;
   
       _df.clone().show_limit(10).await?;
   
       println!("{}", _df.clone().schema());
   
       _ctx.sql("create external table 
       test(partition varchar, trace_id varchar)
       stored as parquet 
       partitioned by (partition) 
       location './temptest/'  
       options (create_local_path 'true');").await?.collect().await?;
   
       // Expecting an error here since schemas do not match
       _df.clone()
           .select(vec![col("trace_id"), col("partition")])?
           .write_table("test", DataFrameWriteOptions::new()).await
           .expect_err("Inserting query must have the same schema with the table.");
   
       // Cast the column to the correct type and it works as expected!
       _df.clone()
           .select(vec![col("trace_id").cast_to(&DataType::Utf8, _df.schema())?.alias("trace_id"), col("partition")])?
           .write_table("test", DataFrameWriteOptions::new()).await?;
   
       _ctx.sql("select count(1) from test").await?.show().await?;
   
       Ok(())
   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org