You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/12/19 14:17:10 UTC

[arrow-datafusion] branch master updated: Avoid send empty batches for Hash partitioning. (#1459)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 07b2985  Avoid send empty batches for Hash partitioning. (#1459)
07b2985 is described below

commit 07b29856c7a1287459e6b8545a41142c466d82bd
Author: Yang <37...@users.noreply.github.com>
AuthorDate: Sun Dec 19 22:17:07 2021 +0800

    Avoid send empty batches for Hash partitioning. (#1459)
---
 datafusion/src/physical_plan/repartition.rs | 31 +++++++++++++++++++++++++++++
 1 file changed, 31 insertions(+)

diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs
index 3cc7d54..a3a5b06 100644
--- a/datafusion/src/physical_plan/repartition.rs
+++ b/datafusion/src/physical_plan/repartition.rs
@@ -348,6 +348,9 @@ impl RepartitionExec {
                     for (num_output_partition, partition_indices) in
                         indices.into_iter().enumerate()
                     {
+                        if partition_indices.is_empty() {
+                            continue;
+                        }
                         let timer = r_metrics.repart_time.timer();
                         let indices = partition_indices.into();
                         // Produce batches based on indices
@@ -952,4 +955,32 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn hash_repartition_avoid_empty_batch() -> Result<()> {
+        let batch = RecordBatch::try_from_iter(vec![(
+            "a",
+            Arc::new(StringArray::from(vec!["foo"])) as ArrayRef,
+        )])
+        .unwrap();
+        let partitioning = Partitioning::Hash(
+            vec![Arc::new(crate::physical_plan::expressions::Column::new(
+                "a", 0,
+            ))],
+            2,
+        );
+        let schema = batch.schema();
+        let input = MockExec::new(vec![Ok(batch)], schema);
+        let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
+        let output_stream0 = exec.execute(0).await.unwrap();
+        let batch0 = crate::physical_plan::common::collect(output_stream0)
+            .await
+            .unwrap();
+        let output_stream1 = exec.execute(1).await.unwrap();
+        let batch1 = crate::physical_plan::common::collect(output_stream1)
+            .await
+            .unwrap();
+        assert!(batch0.is_empty() || batch1.is_empty());
+        Ok(())
+    }
 }