You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/12/19 14:17:10 UTC
[arrow-datafusion] branch master updated: Avoid send empty batches for Hash partitioning. (#1459)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 07b2985 Avoid send empty batches for Hash partitioning. (#1459)
07b2985 is described below
commit 07b29856c7a1287459e6b8545a41142c466d82bd
Author: Yang <37...@users.noreply.github.com>
AuthorDate: Sun Dec 19 22:17:07 2021 +0800
Avoid send empty batches for Hash partitioning. (#1459)
---
datafusion/src/physical_plan/repartition.rs | 31 +++++++++++++++++++++++++++++
1 file changed, 31 insertions(+)
diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs
index 3cc7d54..a3a5b06 100644
--- a/datafusion/src/physical_plan/repartition.rs
+++ b/datafusion/src/physical_plan/repartition.rs
@@ -348,6 +348,9 @@ impl RepartitionExec {
for (num_output_partition, partition_indices) in
indices.into_iter().enumerate()
{
+ if partition_indices.is_empty() {
+ continue;
+ }
let timer = r_metrics.repart_time.timer();
let indices = partition_indices.into();
// Produce batches based on indices
@@ -952,4 +955,32 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn hash_repartition_avoid_empty_batch() -> Result<()> {
+ let batch = RecordBatch::try_from_iter(vec![(
+ "a",
+ Arc::new(StringArray::from(vec!["foo"])) as ArrayRef,
+ )])
+ .unwrap();
+ let partitioning = Partitioning::Hash(
+ vec![Arc::new(crate::physical_plan::expressions::Column::new(
+ "a", 0,
+ ))],
+ 2,
+ );
+ let schema = batch.schema();
+ let input = MockExec::new(vec![Ok(batch)], schema);
+ let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
+ let output_stream0 = exec.execute(0).await.unwrap();
+ let batch0 = crate::physical_plan::common::collect(output_stream0)
+ .await
+ .unwrap();
+ let output_stream1 = exec.execute(1).await.unwrap();
+ let batch1 = crate::physical_plan::common::collect(output_stream1)
+ .await
+ .unwrap();
+ assert!(batch0.is_empty() || batch1.is_empty());
+ Ok(())
+ }
}