You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/06/10 13:18:24 UTC

[arrow-datafusion] branch master updated: Return errors properly from RepartitionExec (#521)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f84564  Return errors properly from RepartitionExec (#521)
8f84564 is described below

commit 8f84564edab1679163d91691f63381f38907d515
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Jun 10 09:18:15 2021 -0400

    Return errors properly from RepartitionExec (#521)
---
 datafusion/src/physical_plan/repartition.rs | 205 ++++++++++++++++++++++++++--
 datafusion/src/test/exec.rs                 | 183 ++++++++++++++++++++++++-
 2 files changed, 372 insertions(+), 16 deletions(-)

diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs
index e5747dd..37d98c7 100644
--- a/datafusion/src/physical_plan/repartition.rs
+++ b/datafusion/src/physical_plan/repartition.rs
@@ -147,12 +147,13 @@ impl ExecutionPlan for RepartitionExec {
                 let fetch_time = self.fetch_time_nanos.clone();
                 let repart_time = self.repart_time_nanos.clone();
                 let send_time = self.send_time_nanos.clone();
-                let mut txs: HashMap<_, _> = channels
+                let txs: HashMap<_, _> = channels
                     .iter()
                     .map(|(partition, (tx, _rx))| (*partition, tx.clone()))
                     .collect();
                 let partitioning = self.partitioning.clone();
-                let _: JoinHandle<Result<()>> = tokio::spawn(async move {
+                let mut txs_captured = txs.clone();
+                let input_task: JoinHandle<Result<()>> = tokio::spawn(async move {
                     // execute the child operator
                     let now = Instant::now();
                     let mut stream = input.execute(i).await?;
@@ -170,13 +171,13 @@ impl ExecutionPlan for RepartitionExec {
                         if result.is_none() {
                             break;
                         }
-                        let result = result.unwrap();
+                        let result: ArrowResult<RecordBatch> = result.unwrap();
 
                         match &partitioning {
                             Partitioning::RoundRobinBatch(_) => {
                                 let now = Instant::now();
                                 let output_partition = counter % num_output_partitions;
-                                let tx = txs.get_mut(&output_partition).unwrap();
+                                let tx = txs_captured.get_mut(&output_partition).unwrap();
                                 tx.send(Some(result)).map_err(|e| {
                                     DataFusionError::Execution(e.to_string())
                                 })?;
@@ -230,7 +231,9 @@ impl ExecutionPlan for RepartitionExec {
                                     );
                                     repart_time.add(now.elapsed().as_nanos() as usize);
                                     let now = Instant::now();
-                                    let tx = txs.get_mut(&num_output_partition).unwrap();
+                                    let tx = txs_captured
+                                        .get_mut(&num_output_partition)
+                                        .unwrap();
                                     tx.send(Some(output_batch)).map_err(|e| {
                                         DataFusionError::Execution(e.to_string())
                                     })?;
@@ -249,13 +252,12 @@ impl ExecutionPlan for RepartitionExec {
                         counter += 1;
                     }
 
-                    // notify each output partition that this input partition has no more data
-                    for (_, tx) in txs {
-                        tx.send(None)
-                            .map_err(|e| DataFusionError::Execution(e.to_string()))?;
-                    }
                     Ok(())
                 });
+
+                // In a separate task, wait for each input to be done
+                // (and pass along any errors)
+                tokio::spawn(async move { Self::wait_for_task(input_task, txs).await });
             }
         }
 
@@ -308,6 +310,45 @@ impl RepartitionExec {
             send_time_nanos: SQLMetric::time_nanos(),
         })
     }
+
+    /// Waits for `input_task` which is consuming one of the inputs to
+    /// complete. Upon each successful completion, sends a `None` to
+    /// each of the output tx channels to signal one of the inputs is
+    /// complete. Upon error, propagates the errors to all output tx
+    /// channels.
+    async fn wait_for_task(
+        input_task: JoinHandle<Result<()>>,
+        txs: HashMap<usize, UnboundedSender<Option<ArrowResult<RecordBatch>>>>,
+    ) {
+        // wait for completion, and propagate error
+        // note we ignore errors on send (.ok) as that means the receiver has already shutdown.
+        match input_task.await {
+            // Error in joining task
+            Err(e) => {
+                for (_, tx) in txs {
+                    let err = DataFusionError::Execution(format!("Join Error: {}", e));
+                    let err = Err(err.into_arrow_external_error());
+                    tx.send(Some(err)).ok();
+                }
+            }
+            // Error from running input task
+            Ok(Err(e)) => {
+                for (_, tx) in txs {
+                    // wrap it because need to send error to all output partitions
+                    let err = DataFusionError::Execution(e.to_string());
+                    let err = Err(err.into_arrow_external_error());
+                    tx.send(Some(err)).ok();
+                }
+            }
+            // Input task completed successfully
+            Ok(Ok(())) => {
+                // notify each output partition that this input partition has no more data
+                for (_, tx) in txs {
+                    tx.send(None).ok();
+                }
+            }
+        }
+    }
 }
 
 struct RepartitionStream {
@@ -356,10 +397,17 @@ impl RecordBatchStream for RepartitionStream {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::physical_plan::memory::MemoryExec;
-    use arrow::array::UInt32Array;
+    use crate::{
+        assert_batches_sorted_eq,
+        physical_plan::memory::MemoryExec,
+        test::exec::{ErrorExec, MockExec},
+    };
     use arrow::datatypes::{DataType, Field, Schema};
     use arrow::record_batch::RecordBatch;
+    use arrow::{
+        array::{ArrayRef, StringArray, UInt32Array},
+        error::ArrowError,
+    };
 
     #[tokio::test]
     async fn one_to_many_round_robin() -> Result<()> {
@@ -517,4 +565,137 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn unsupported_partitioning() {
+        // have to send at least one batch through to provoke error
+        let batch = RecordBatch::try_from_iter(vec![(
+            "my_awesome_field",
+            Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef,
+        )])
+        .unwrap();
+
+        let schema = batch.schema();
+        let input = MockExec::new(vec![Ok(batch)], schema);
+        // This generates an error (partitioning type not supported)
+        // but only after the plan is executed. The error should be
+        // returned and no results produced
+        let partitioning = Partitioning::UnknownPartitioning(1);
+        let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
+        let output_stream = exec.execute(0).await.unwrap();
+
+        // Expect that an error is returned
+        let result_string = crate::physical_plan::common::collect(output_stream)
+            .await
+            .unwrap_err()
+            .to_string();
+        assert!(
+            result_string
+                .contains("Unsupported repartitioning scheme UnknownPartitioning(1)"),
+            "actual: {}",
+            result_string
+        );
+    }
+
+    #[tokio::test]
+    async fn error_for_input_exec() {
+        // This generates an error on a call to execute. The error
+        // should be returned and no results produced.
+
+        let input = ErrorExec::new();
+        let partitioning = Partitioning::RoundRobinBatch(1);
+        let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
+
+        // Note: this should pass (the stream can be created) but the
+        // error when the input is executed should get passed back
+        let output_stream = exec.execute(0).await.unwrap();
+
+        // Expect that an error is returned
+        let result_string = crate::physical_plan::common::collect(output_stream)
+            .await
+            .unwrap_err()
+            .to_string();
+        assert!(
+            result_string.contains("ErrorExec, unsurprisingly, errored in partition 0"),
+            "actual: {}",
+            result_string
+        );
+    }
+
+    #[tokio::test]
+    async fn repartition_with_error_in_stream() {
+        let batch = RecordBatch::try_from_iter(vec![(
+            "my_awesome_field",
+            Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef,
+        )])
+        .unwrap();
+
+        // input stream returns one good batch and then one error. The
+        // error should be returned.
+        let err = Err(ArrowError::ComputeError("bad data error".to_string()));
+
+        let schema = batch.schema();
+        let input = MockExec::new(vec![Ok(batch), err], schema);
+        let partitioning = Partitioning::RoundRobinBatch(1);
+        let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
+
+        // Note: this should pass (the stream can be created) but the
+        // error when the input is executed should get passed back
+        let output_stream = exec.execute(0).await.unwrap();
+
+        // Expect that an error is returned
+        let result_string = crate::physical_plan::common::collect(output_stream)
+            .await
+            .unwrap_err()
+            .to_string();
+        assert!(
+            result_string.contains("bad data error"),
+            "actual: {}",
+            result_string
+        );
+    }
+
+    #[tokio::test]
+    async fn repartition_with_delayed_stream() {
+        let batch1 = RecordBatch::try_from_iter(vec![(
+            "my_awesome_field",
+            Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef,
+        )])
+        .unwrap();
+
+        let batch2 = RecordBatch::try_from_iter(vec![(
+            "my_awesome_field",
+            Arc::new(StringArray::from(vec!["frob", "baz"])) as ArrayRef,
+        )])
+        .unwrap();
+
+        // The mock exec doesn't return immediately (instead it
+        // requires the input to wait at least once)
+        let schema = batch1.schema();
+        let expected_batches = vec![batch1.clone(), batch2.clone()];
+        let input = MockExec::new(vec![Ok(batch1), Ok(batch2)], schema);
+        let partitioning = Partitioning::RoundRobinBatch(1);
+
+        let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
+
+        let expected = vec![
+            "+------------------+",
+            "| my_awesome_field |",
+            "+------------------+",
+            "| foo              |",
+            "| bar              |",
+            "| frob             |",
+            "| baz              |",
+            "+------------------+",
+        ];
+
+        assert_batches_sorted_eq!(&expected, &expected_batches);
+
+        let output_stream = exec.execute(0).await.unwrap();
+        let batches = crate::physical_plan::common::collect(output_stream)
+            .await
+            .unwrap();
+
+        assert_batches_sorted_eq!(&expected, &batches);
+    }
 }
diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs
index 04cd295..bcd94dd 100644
--- a/datafusion/src/test/exec.rs
+++ b/datafusion/src/test/exec.rs
@@ -17,14 +17,25 @@
 
 //! Simple iterator over batches for use in testing
 
-use std::task::{Context, Poll};
+use async_trait::async_trait;
+use std::{
+    any::Any,
+    sync::Arc,
+    task::{Context, Poll},
+};
 
 use arrow::{
-    datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch,
+    datatypes::{DataType, Field, Schema, SchemaRef},
+    error::{ArrowError, Result as ArrowResult},
+    record_batch::RecordBatch,
 };
-use futures::Stream;
+use futures::{Stream, StreamExt};
+use tokio_stream::wrappers::ReceiverStream;
 
-use crate::physical_plan::RecordBatchStream;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+};
 
 /// Index into the data that has been returned so far
 #[derive(Debug, Default, Clone)]
@@ -100,3 +111,167 @@ impl RecordBatchStream for TestStream {
         self.data[0].schema()
     }
 }
+
+/// A Mock ExecutionPlan that can be used for writing tests of other ExecutionPlans
+///
+#[derive(Debug)]
+pub struct MockExec {
+    /// the results to send back
+    data: Vec<ArrowResult<RecordBatch>>,
+    schema: SchemaRef,
+}
+
+impl MockExec {
+    /// Create a new exec with a single partition that returns the
+    /// record batches in this Exec. Note the batches are not produced
+    /// immediately (the caller has to actually yield and another task
+    /// must run) to ensure any poll loops are correct.
+    pub fn new(data: Vec<ArrowResult<RecordBatch>>, schema: SchemaRef) -> Self {
+        Self { data, schema }
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for MockExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        unimplemented!()
+    }
+
+    fn with_new_children(
+        &self,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        unimplemented!()
+    }
+
+    /// Returns a stream which yields data
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        assert_eq!(partition, 0);
+
+        let schema = self.schema();
+
+        // Result doesn't implement clone, so do it ourself
+        let data: Vec<_> = self
+            .data
+            .iter()
+            .map(|r| match r {
+                Ok(batch) => Ok(batch.clone()),
+                Err(e) => Err(clone_error(e)),
+            })
+            .collect();
+
+        let (tx, rx) = tokio::sync::mpsc::channel(2);
+
+        // task simply sends data in order but in a separate
+        // thread (to ensure the batches are not available without the
+        // DelayedStream yielding).
+        tokio::task::spawn(async move {
+            for batch in data {
+                println!("Sending batch via delayed stream");
+                if let Err(e) = tx.send(batch).await {
+                    println!("ERROR batch via delayed stream: {}", e);
+                }
+            }
+        });
+
+        // returned stream simply reads off the rx stream
+        let stream = DelayedStream {
+            schema,
+            inner: ReceiverStream::new(rx),
+        };
+        Ok(Box::pin(stream))
+    }
+}
+
+fn clone_error(e: &ArrowError) -> ArrowError {
+    use ArrowError::*;
+    match e {
+        ComputeError(msg) => ComputeError(msg.to_string()),
+        _ => unimplemented!(),
+    }
+}
+
+#[derive(Debug)]
+pub struct DelayedStream {
+    schema: SchemaRef,
+    inner: ReceiverStream<ArrowResult<RecordBatch>>,
+}
+
+impl Stream for DelayedStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        self.inner.poll_next_unpin(cx)
+    }
+}
+
+impl RecordBatchStream for DelayedStream {
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+}
+
+/// A mock execution plan that errors on a call to execute
+#[derive(Debug)]
+pub struct ErrorExec {
+    schema: SchemaRef,
+}
+impl ErrorExec {
+    pub fn new() -> Self {
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "dummy",
+            DataType::Int64,
+            true,
+        )]));
+        Self { schema }
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for ErrorExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        unimplemented!()
+    }
+
+    fn with_new_children(
+        &self,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        unimplemented!()
+    }
+
+    /// Returns a stream which yields data
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        Err(DataFusionError::Internal(format!(
+            "ErrorExec, unsurprisingly, errored in partition {}",
+            partition
+        )))
+    }
+}