You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/06/18 16:30:52 UTC

[arrow-datafusion] branch master updated: RepartitionExec should not error if output has hung up (#576)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a55364  RepartitionExec should not error if output has hung up (#576)
4a55364 is described below

commit 4a55364448c81182731f4fa4c3e65aef5b31fa0f
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri Jun 18 12:30:47 2021 -0400

    RepartitionExec should not error if output has hung up (#576)
    
    * RepartitionExec should not error if output has hung up
    
    * Remove debug logging
---
 datafusion/src/physical_plan/repartition.rs | 128 ++++++++++++++++++++++++++--
 datafusion/src/test/exec.rs                 |  90 +++++++++++++++++++
 2 files changed, 210 insertions(+), 8 deletions(-)

diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs
index 7ef1948..a7b17c4 100644
--- a/datafusion/src/physical_plan/repartition.rs
+++ b/datafusion/src/physical_plan/repartition.rs
@@ -255,12 +255,15 @@ impl RepartitionExec {
         let mut counter = 0;
         let hashes_buf = &mut vec![];
 
-        loop {
+        // While there are still outputs to send to, keep
+        // pulling inputs
+        while !txs.is_empty() {
             // fetch the next batch
             let now = Instant::now();
             let result = stream.next().await;
             metrics.fetch_nanos.add_elapsed(now);
 
+            // Input is done
             if result.is_none() {
                 break;
             }
@@ -270,9 +273,13 @@ impl RepartitionExec {
                 Partitioning::RoundRobinBatch(_) => {
                     let now = Instant::now();
                     let output_partition = counter % num_output_partitions;
-                    let tx = txs.get_mut(&output_partition).unwrap();
-                    tx.send(Some(result))
-                        .map_err(|e| DataFusionError::Execution(e.to_string()))?;
+                    // if there is still a receiver, send to it
+                    if let Some(tx) = txs.get_mut(&output_partition) {
+                        if tx.send(Some(result)).is_err() {
+                            // If the other end has hung up, it was an early shutdown (e.g. LIMIT)
+                            txs.remove(&output_partition);
+                        }
+                    }
                     metrics.send_nanos.add_elapsed(now);
                 }
                 Partitioning::Hash(exprs, _) => {
@@ -315,9 +322,13 @@ impl RepartitionExec {
                             RecordBatch::try_new(input_batch.schema(), columns);
                         metrics.repart_nanos.add_elapsed(now);
                         let now = Instant::now();
-                        let tx = txs.get_mut(&num_output_partition).unwrap();
-                        tx.send(Some(output_batch))
-                            .map_err(|e| DataFusionError::Execution(e.to_string()))?;
+                        // if there is still a receiver, send to it
+                        if let Some(tx) = txs.get_mut(&num_output_partition) {
+                            if tx.send(Some(output_batch)).is_err() {
+                                // If the other end has hung up, it was an early shutdown (e.g. LIMIT)
+                                txs.remove(&num_output_partition);
+                            }
+                        }
                         metrics.send_nanos.add_elapsed(now);
                     }
                 }
@@ -425,7 +436,7 @@ mod tests {
     use crate::{
         assert_batches_sorted_eq,
         physical_plan::memory::MemoryExec,
-        test::exec::{ErrorExec, MockExec},
+        test::exec::{BarrierExec, ErrorExec, MockExec},
     };
     use arrow::datatypes::{DataType, Field, Schema};
     use arrow::record_batch::RecordBatch;
@@ -723,4 +734,105 @@ mod tests {
 
         assert_batches_sorted_eq!(&expected, &batches);
     }
+
+    #[tokio::test]
+    async fn repartition_with_dropping_output_stream() {
+        #[derive(Debug)]
+        struct Case<'a> {
+            partitioning: Partitioning,
+            expected: Vec<&'a str>,
+        }
+
+        let cases = vec![
+            Case {
+                partitioning: Partitioning::RoundRobinBatch(2),
+                expected: vec![
+                    "+------------------+",
+                    "| my_awesome_field |",
+                    "+------------------+",
+                    "| baz              |",
+                    "| frob             |",
+                    "| gaz              |",
+                    "| grob             |",
+                    "+------------------+",
+                ],
+            },
+            Case {
+                partitioning: Partitioning::Hash(
+                    vec![Arc::new(crate::physical_plan::expressions::Column::new(
+                        "my_awesome_field",
+                    ))],
+                    2,
+                ),
+                expected: vec![
+                    "+------------------+",
+                    "| my_awesome_field |",
+                    "+------------------+",
+                    "| frob             |",
+                    "+------------------+",
+                ],
+            },
+        ];
+
+        for case in cases {
+            println!("Running case {:?}", case.partitioning);
+
+            // The barrier exec waits to be pinged
+            // requires the input to wait at least once)
+            let input = Arc::new(make_barrier_exec());
+
+            // partition into two output streams
+            let exec =
+                RepartitionExec::try_new(input.clone(), case.partitioning).unwrap();
+
+            let output_stream0 = exec.execute(0).await.unwrap();
+            let output_stream1 = exec.execute(1).await.unwrap();
+
+            // now, purposely drop output stream 0
+            // *before* any outputs are produced
+            std::mem::drop(output_stream0);
+
+            // Now, start sending input
+            input.wait().await;
+
+            // output stream 1 should *not* error and have one of the input batches
+            let batches = crate::physical_plan::common::collect(output_stream1)
+                .await
+                .unwrap();
+
+            assert_batches_sorted_eq!(&case.expected, &batches);
+        }
+    }
+
+    /// Create a BarrierExec that returns two partitions of two batches each
+    fn make_barrier_exec() -> BarrierExec {
+        let batch1 = RecordBatch::try_from_iter(vec![(
+            "my_awesome_field",
+            Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef,
+        )])
+        .unwrap();
+
+        let batch2 = RecordBatch::try_from_iter(vec![(
+            "my_awesome_field",
+            Arc::new(StringArray::from(vec!["frob", "baz"])) as ArrayRef,
+        )])
+        .unwrap();
+
+        let batch3 = RecordBatch::try_from_iter(vec![(
+            "my_awesome_field",
+            Arc::new(StringArray::from(vec!["goo", "gar"])) as ArrayRef,
+        )])
+        .unwrap();
+
+        let batch4 = RecordBatch::try_from_iter(vec![(
+            "my_awesome_field",
+            Arc::new(StringArray::from(vec!["grob", "gaz"])) as ArrayRef,
+        )])
+        .unwrap();
+
+        // The barrier exec waits to be pinged
+        // requires the input to wait at least once)
+        let schema = batch1.schema();
+        BarrierExec::new(vec![vec![batch1, batch2], vec![batch3, batch4]], schema)
+    }
 }
diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs
index bcd94dd..3971db3 100644
--- a/datafusion/src/test/exec.rs
+++ b/datafusion/src/test/exec.rs
@@ -23,6 +23,7 @@ use std::{
     sync::Arc,
     task::{Context, Poll},
 };
+use tokio::sync::Barrier;
 
 use arrow::{
     datatypes::{DataType, Field, Schema, SchemaRef},
@@ -226,6 +227,95 @@ impl RecordBatchStream for DelayedStream {
     }
 }
 
+/// A Mock ExecutionPlan that does not start producing input until a
+/// barrier is called
+///
+#[derive(Debug)]
+pub struct BarrierExec {
+    /// partitions to send back
+    data: Vec<Vec<RecordBatch>>,
+    schema: SchemaRef,
+
+    /// all streams wait on this barrier to produce
+    barrier: Arc<Barrier>,
+}
+
+impl BarrierExec {
+    /// Create a new exec with some number of partitions.
+    pub fn new(data: Vec<Vec<RecordBatch>>, schema: SchemaRef) -> Self {
+        // wait for all streams and the input
+        let barrier = Arc::new(Barrier::new(data.len() + 1));
+        Self {
+            data,
+            schema,
+            barrier,
+        }
+    }
+
+    /// wait until all the input streams and this function is ready
+    pub async fn wait(&self) {
+        println!("BarrierExec::wait waiting on barrier");
+        self.barrier.wait().await;
+        println!("BarrierExec::wait done waiting");
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for BarrierExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(self.data.len())
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        unimplemented!()
+    }
+
+    fn with_new_children(
+        &self,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        unimplemented!()
+    }
+
+    /// Returns a stream which yields data
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        assert!(partition < self.data.len());
+
+        let schema = self.schema();
+
+        let (tx, rx) = tokio::sync::mpsc::channel(2);
+
+        // task simply sends data in order after barrier is reached
+        let data = self.data[partition].clone();
+        let b = self.barrier.clone();
+        tokio::task::spawn(async move {
+            println!("Partition {} waiting on barrier", partition);
+            b.wait().await;
+            for batch in data {
+                println!("Partition {} sending batch", partition);
+                if let Err(e) = tx.send(Ok(batch)).await {
+                    println!("ERROR batch via barrier stream stream: {}", e);
+                }
+            }
+        });
+
+        // returned stream simply reads off the rx stream
+        let stream = DelayedStream {
+            schema,
+            inner: ReceiverStream::new(rx),
+        };
+        Ok(Box::pin(stream))
+    }
+}
+
 /// A mock execution plan that errors on a call to execute
 #[derive(Debug)]
 pub struct ErrorExec {