You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/06/10 13:18:24 UTC
[arrow-datafusion] branch master updated: Return errors properly
from RepartitionExec (#521)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 8f84564 Return errors properly from RepartitionExec (#521)
8f84564 is described below
commit 8f84564edab1679163d91691f63381f38907d515
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Jun 10 09:18:15 2021 -0400
Return errors properly from RepartitionExec (#521)
---
datafusion/src/physical_plan/repartition.rs | 205 ++++++++++++++++++++++++++--
datafusion/src/test/exec.rs | 183 ++++++++++++++++++++++++-
2 files changed, 372 insertions(+), 16 deletions(-)
diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs
index e5747dd..37d98c7 100644
--- a/datafusion/src/physical_plan/repartition.rs
+++ b/datafusion/src/physical_plan/repartition.rs
@@ -147,12 +147,13 @@ impl ExecutionPlan for RepartitionExec {
let fetch_time = self.fetch_time_nanos.clone();
let repart_time = self.repart_time_nanos.clone();
let send_time = self.send_time_nanos.clone();
- let mut txs: HashMap<_, _> = channels
+ let txs: HashMap<_, _> = channels
.iter()
.map(|(partition, (tx, _rx))| (*partition, tx.clone()))
.collect();
let partitioning = self.partitioning.clone();
- let _: JoinHandle<Result<()>> = tokio::spawn(async move {
+ let mut txs_captured = txs.clone();
+ let input_task: JoinHandle<Result<()>> = tokio::spawn(async move {
// execute the child operator
let now = Instant::now();
let mut stream = input.execute(i).await?;
@@ -170,13 +171,13 @@ impl ExecutionPlan for RepartitionExec {
if result.is_none() {
break;
}
- let result = result.unwrap();
+ let result: ArrowResult<RecordBatch> = result.unwrap();
match &partitioning {
Partitioning::RoundRobinBatch(_) => {
let now = Instant::now();
let output_partition = counter % num_output_partitions;
- let tx = txs.get_mut(&output_partition).unwrap();
+ let tx = txs_captured.get_mut(&output_partition).unwrap();
tx.send(Some(result)).map_err(|e| {
DataFusionError::Execution(e.to_string())
})?;
@@ -230,7 +231,9 @@ impl ExecutionPlan for RepartitionExec {
);
repart_time.add(now.elapsed().as_nanos() as usize);
let now = Instant::now();
- let tx = txs.get_mut(&num_output_partition).unwrap();
+ let tx = txs_captured
+ .get_mut(&num_output_partition)
+ .unwrap();
tx.send(Some(output_batch)).map_err(|e| {
DataFusionError::Execution(e.to_string())
})?;
@@ -249,13 +252,12 @@ impl ExecutionPlan for RepartitionExec {
counter += 1;
}
- // notify each output partition that this input partition has no more data
- for (_, tx) in txs {
- tx.send(None)
- .map_err(|e| DataFusionError::Execution(e.to_string()))?;
- }
Ok(())
});
+
+ // In a separate task, wait for each input to be done
+ // (and pass along any errors)
+ tokio::spawn(async move { Self::wait_for_task(input_task, txs).await });
}
}
@@ -308,6 +310,45 @@ impl RepartitionExec {
send_time_nanos: SQLMetric::time_nanos(),
})
}
+
+ /// Waits for `input_task` which is consuming one of the inputs to
+ /// complete. Upon each successful completion, sends a `None` to
+ /// each of the output tx channels to signal one of the inputs is
+ /// complete. Upon error, propagates the errors to all output tx
+ /// channels.
+ async fn wait_for_task(
+ input_task: JoinHandle<Result<()>>,
+ txs: HashMap<usize, UnboundedSender<Option<ArrowResult<RecordBatch>>>>,
+ ) {
+ // wait for completion, and propagate error
+ // note we ignore errors on send (.ok) as that means the receiver has already shutdown.
+ match input_task.await {
+ // Error in joining task
+ Err(e) => {
+ for (_, tx) in txs {
+ let err = DataFusionError::Execution(format!("Join Error: {}", e));
+ let err = Err(err.into_arrow_external_error());
+ tx.send(Some(err)).ok();
+ }
+ }
+ // Error from running input task
+ Ok(Err(e)) => {
+ for (_, tx) in txs {
+ // wrap it because need to send error to all output partitions
+ let err = DataFusionError::Execution(e.to_string());
+ let err = Err(err.into_arrow_external_error());
+ tx.send(Some(err)).ok();
+ }
+ }
+ // Input task completed successfully
+ Ok(Ok(())) => {
+ // notify each output partition that this input partition has no more data
+ for (_, tx) in txs {
+ tx.send(None).ok();
+ }
+ }
+ }
+ }
}
struct RepartitionStream {
@@ -356,10 +397,17 @@ impl RecordBatchStream for RepartitionStream {
#[cfg(test)]
mod tests {
use super::*;
- use crate::physical_plan::memory::MemoryExec;
- use arrow::array::UInt32Array;
+ use crate::{
+ assert_batches_sorted_eq,
+ physical_plan::memory::MemoryExec,
+ test::exec::{ErrorExec, MockExec},
+ };
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
+ use arrow::{
+ array::{ArrayRef, StringArray, UInt32Array},
+ error::ArrowError,
+ };
#[tokio::test]
async fn one_to_many_round_robin() -> Result<()> {
@@ -517,4 +565,137 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn unsupported_partitioning() {
+ // have to send at least one batch through to provoke error
+ let batch = RecordBatch::try_from_iter(vec![(
+ "my_awesome_field",
+ Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef,
+ )])
+ .unwrap();
+
+ let schema = batch.schema();
+ let input = MockExec::new(vec![Ok(batch)], schema);
+ // This generates an error (partitioning type not supported)
+ // but only after the plan is executed. The error should be
+ // returned and no results produced
+ let partitioning = Partitioning::UnknownPartitioning(1);
+ let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
+ let output_stream = exec.execute(0).await.unwrap();
+
+ // Expect that an error is returned
+ let result_string = crate::physical_plan::common::collect(output_stream)
+ .await
+ .unwrap_err()
+ .to_string();
+ assert!(
+ result_string
+ .contains("Unsupported repartitioning scheme UnknownPartitioning(1)"),
+ "actual: {}",
+ result_string
+ );
+ }
+
+ #[tokio::test]
+ async fn error_for_input_exec() {
+ // This generates an error on a call to execute. The error
+ // should be returned and no results produced.
+
+ let input = ErrorExec::new();
+ let partitioning = Partitioning::RoundRobinBatch(1);
+ let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
+
+ // Note: this should pass (the stream can be created) but the
+ // error when the input is executed should get passed back
+ let output_stream = exec.execute(0).await.unwrap();
+
+ // Expect that an error is returned
+ let result_string = crate::physical_plan::common::collect(output_stream)
+ .await
+ .unwrap_err()
+ .to_string();
+ assert!(
+ result_string.contains("ErrorExec, unsurprisingly, errored in partition 0"),
+ "actual: {}",
+ result_string
+ );
+ }
+
+ #[tokio::test]
+ async fn repartition_with_error_in_stream() {
+ let batch = RecordBatch::try_from_iter(vec![(
+ "my_awesome_field",
+ Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef,
+ )])
+ .unwrap();
+
+ // input stream returns one good batch and then one error. The
+ // error should be returned.
+ let err = Err(ArrowError::ComputeError("bad data error".to_string()));
+
+ let schema = batch.schema();
+ let input = MockExec::new(vec![Ok(batch), err], schema);
+ let partitioning = Partitioning::RoundRobinBatch(1);
+ let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
+
+ // Note: this should pass (the stream can be created) but the
+ // error when the input is executed should get passed back
+ let output_stream = exec.execute(0).await.unwrap();
+
+ // Expect that an error is returned
+ let result_string = crate::physical_plan::common::collect(output_stream)
+ .await
+ .unwrap_err()
+ .to_string();
+ assert!(
+ result_string.contains("bad data error"),
+ "actual: {}",
+ result_string
+ );
+ }
+
+ #[tokio::test]
+ async fn repartition_with_delayed_stream() {
+ let batch1 = RecordBatch::try_from_iter(vec![(
+ "my_awesome_field",
+ Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef,
+ )])
+ .unwrap();
+
+ let batch2 = RecordBatch::try_from_iter(vec![(
+ "my_awesome_field",
+ Arc::new(StringArray::from(vec!["frob", "baz"])) as ArrayRef,
+ )])
+ .unwrap();
+
+ // The mock exec doesn't return immediately (instead it
+ // requires the input to wait at least once)
+ let schema = batch1.schema();
+ let expected_batches = vec![batch1.clone(), batch2.clone()];
+ let input = MockExec::new(vec![Ok(batch1), Ok(batch2)], schema);
+ let partitioning = Partitioning::RoundRobinBatch(1);
+
+ let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
+
+ let expected = vec![
+ "+------------------+",
+ "| my_awesome_field |",
+ "+------------------+",
+ "| foo |",
+ "| bar |",
+ "| frob |",
+ "| baz |",
+ "+------------------+",
+ ];
+
+ assert_batches_sorted_eq!(&expected, &expected_batches);
+
+ let output_stream = exec.execute(0).await.unwrap();
+ let batches = crate::physical_plan::common::collect(output_stream)
+ .await
+ .unwrap();
+
+ assert_batches_sorted_eq!(&expected, &batches);
+ }
}
diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs
index 04cd295..bcd94dd 100644
--- a/datafusion/src/test/exec.rs
+++ b/datafusion/src/test/exec.rs
@@ -17,14 +17,25 @@
//! Simple iterator over batches for use in testing
-use std::task::{Context, Poll};
+use async_trait::async_trait;
+use std::{
+ any::Any,
+ sync::Arc,
+ task::{Context, Poll},
+};
use arrow::{
- datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch,
+ datatypes::{DataType, Field, Schema, SchemaRef},
+ error::{ArrowError, Result as ArrowResult},
+ record_batch::RecordBatch,
};
-use futures::Stream;
+use futures::{Stream, StreamExt};
+use tokio_stream::wrappers::ReceiverStream;
-use crate::physical_plan::RecordBatchStream;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+ ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+};
/// Index into the data that has been returned so far
#[derive(Debug, Default, Clone)]
@@ -100,3 +111,167 @@ impl RecordBatchStream for TestStream {
self.data[0].schema()
}
}
+
+/// A Mock ExecutionPlan that can be used for writing tests of other ExecutionPlans
+///
+#[derive(Debug)]
+pub struct MockExec {
+ /// the results to send back
+ data: Vec<ArrowResult<RecordBatch>>,
+ schema: SchemaRef,
+}
+
+impl MockExec {
+ /// Create a new exec with a single partition that returns the
+ /// record batches in this Exec. Note the batches are not produced
+ /// immediately (the caller has to actually yield and another task
+ /// must run) to ensure any poll loops are correct.
+ pub fn new(data: Vec<ArrowResult<RecordBatch>>, schema: SchemaRef) -> Self {
+ Self { data, schema }
+ }
+}
+
+#[async_trait]
+impl ExecutionPlan for MockExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(1)
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ unimplemented!()
+ }
+
+ fn with_new_children(
+ &self,
+ _children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ unimplemented!()
+ }
+
+ /// Returns a stream which yields data
+ async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+ assert_eq!(partition, 0);
+
+ let schema = self.schema();
+
+ // Result doesn't implement clone, so do it ourself
+ let data: Vec<_> = self
+ .data
+ .iter()
+ .map(|r| match r {
+ Ok(batch) => Ok(batch.clone()),
+ Err(e) => Err(clone_error(e)),
+ })
+ .collect();
+
+ let (tx, rx) = tokio::sync::mpsc::channel(2);
+
+ // task simply sends data in order but in a separate
+ // thread (to ensure the batches are not available without the
+ // DelayedStream yielding).
+ tokio::task::spawn(async move {
+ for batch in data {
+ println!("Sending batch via delayed stream");
+ if let Err(e) = tx.send(batch).await {
+ println!("ERROR batch via delayed stream: {}", e);
+ }
+ }
+ });
+
+ // returned stream simply reads off the rx stream
+ let stream = DelayedStream {
+ schema,
+ inner: ReceiverStream::new(rx),
+ };
+ Ok(Box::pin(stream))
+ }
+}
+
+fn clone_error(e: &ArrowError) -> ArrowError {
+ use ArrowError::*;
+ match e {
+ ComputeError(msg) => ComputeError(msg.to_string()),
+ _ => unimplemented!(),
+ }
+}
+
+#[derive(Debug)]
+pub struct DelayedStream {
+ schema: SchemaRef,
+ inner: ReceiverStream<ArrowResult<RecordBatch>>,
+}
+
+impl Stream for DelayedStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ self.inner.poll_next_unpin(cx)
+ }
+}
+
+impl RecordBatchStream for DelayedStream {
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+}
+
+/// A mock execution plan that errors on a call to execute
+#[derive(Debug)]
+pub struct ErrorExec {
+ schema: SchemaRef,
+}
+impl ErrorExec {
+ pub fn new() -> Self {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "dummy",
+ DataType::Int64,
+ true,
+ )]));
+ Self { schema }
+ }
+}
+
+#[async_trait]
+impl ExecutionPlan for ErrorExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(1)
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ unimplemented!()
+ }
+
+ fn with_new_children(
+ &self,
+ _children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ unimplemented!()
+ }
+
+ /// Returns a stream which yields data
+ async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+ Err(DataFusionError::Internal(format!(
+ "ErrorExec, unsurprisingly, errored in partition {}",
+ partition
+ )))
+ }
+}