You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by th...@apache.org on 2023/06/01 10:22:29 UTC
[arrow-datafusion] branch main updated: Make `FileStream` error handling configurable (#6491)
This is an automated email from the ASF dual-hosted git repository.
thinkharderdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new aa7141a223 Make `FileStream` error handling configurable (#6491)
aa7141a223 is described below
commit aa7141a22373333410dad00126ae66b2a9c9f7d0
Author: Dan Harris <13...@users.noreply.github.com>
AuthorDate: Thu Jun 1 06:22:22 2023 -0400
Make `FileStream` error handling configurable (#6491)
* Make FileStream error behavior configurable (#187)
* Skip files with errors in FileStream`
* Pass error through on fail
(cherry picked from commit e755a057bce8b42052fb875cc3adf03c129301f3)
* Make OnError public
(cherry picked from commit ace7db291f7cf9415a1ab0d8cba7bc6580a5ab12)
* Update datafusion/core/src/physical_plan/file_format/file_stream.rs
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* Create common test builder for FileStream tests
* Add unit tests for OnError::Fail
---------
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
.../src/physical_plan/file_format/file_stream.rs | 516 +++++++++++++++++++--
.../core/src/physical_plan/file_format/mod.rs | 2 +-
2 files changed, 476 insertions(+), 42 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 2ca9a076cf..4dd3ad60be 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -41,7 +41,7 @@ use crate::physical_plan::file_format::{
FileMeta, FileScanConfig, PartitionColumnProjector,
};
use crate::physical_plan::metrics::{
- BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, Time,
+ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
};
use crate::physical_plan::RecordBatchStream;
@@ -49,6 +49,20 @@ use crate::physical_plan::RecordBatchStream;
pub type FileOpenFuture =
BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>;
+/// Describes the behavior of the `FileStream` if file opening or scanning fails
+pub enum OnError {
+ /// Fail the entire stream and return the underlying error
+ Fail,
+ /// Continue scanning, ignoring the failed file
+ Skip,
+}
+
+impl Default for OnError {
+ fn default() -> Self {
+ Self::Fail
+ }
+}
+
/// Generic API for opening a file using an [`ObjectStore`] and resolving to a
/// stream of [`RecordBatch`]
///
@@ -81,6 +95,8 @@ pub struct FileStream<F: FileOpener> {
file_stream_metrics: FileStreamMetrics,
/// runtime baseline metrics
baseline_metrics: BaselineMetrics,
+ /// Describes the behavior of the `FileStream` if file opening or scanning fails
+ on_error: OnError,
}
/// Represents the state of the next `FileOpenFuture`. Since we need to poll
@@ -173,6 +189,16 @@ struct FileStreamMetrics {
///
/// Time spent waiting for the FileStream's input.
pub time_processing: StartableTime,
+ /// Count of errors opening file.
+ ///
+ /// If using `OnError::Skip` this will provide a count of the number of files
+ /// which were skipped and will not be included in the scan results.
+ pub file_open_errors: Count,
+ /// Count of errors scanning file
+ ///
+ /// If using `OnError::Skip` this will provide a count of the number of files
+ /// which were skipped and will not be included in the scan results.
+ pub file_scan_errors: Count,
}
impl FileStreamMetrics {
@@ -201,11 +227,19 @@ impl FileStreamMetrics {
start: None,
};
+ let file_open_errors =
+ MetricBuilder::new(metrics).counter("file_open_errors", partition);
+
+ let file_scan_errors =
+ MetricBuilder::new(metrics).counter("file_scan_errors", partition);
+
Self {
time_opening,
time_scanning_until_data,
time_scanning_total,
time_processing,
+ file_open_errors,
+ file_scan_errors,
}
}
}
@@ -239,9 +273,19 @@ impl<F: FileOpener> FileStream<F> {
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
baseline_metrics: BaselineMetrics::new(metrics, partition),
+ on_error: OnError::Fail,
})
}
+ /// Specify the behavior when an error occurs opening or scanning a file
+ ///
+ /// If `OnError::Skip` the stream will skip files which encounter an error and continue
+ /// If `OnError:Fail` (default) the stream will fail and stop processing when an error occurs
+ pub fn with_on_error(mut self, on_error: OnError) -> Self {
+ self.on_error = on_error;
+ self
+ }
+
/// Begin opening the next file in parallel while decoding the current file in FileStream.
///
/// Since file opening is mostly IO (and may involve a
@@ -320,8 +364,17 @@ impl<F: FileOpener> FileStream<F> {
}
}
Err(e) => {
- self.state = FileStreamState::Error;
- return Poll::Ready(Some(Err(e)));
+ self.file_stream_metrics.file_open_errors.add(1);
+ match self.on_error {
+ OnError::Skip => {
+ self.file_stream_metrics.time_opening.stop();
+ self.state = FileStreamState::Idle
+ }
+ OnError::Fail => {
+ self.state = FileStreamState::Error;
+ return Poll::Ready(Some(Err(e)));
+ }
+ }
}
},
FileStreamState::Scan {
@@ -338,15 +391,13 @@ impl<F: FileOpener> FileStream<F> {
}
}
match ready!(reader.poll_next_unpin(cx)) {
- Some(result) => {
+ Some(Ok(batch)) => {
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
- let result = result
- .and_then(|b| {
- self.pc_projector
- .project(b, partition_values)
- .map_err(|e| ArrowError::ExternalError(e.into()))
- })
+ let result = self
+ .pc_projector
+ .project(batch, partition_values)
+ .map_err(|e| ArrowError::ExternalError(e.into()))
.map(|batch| match &mut self.remain {
Some(remain) => {
if *remain > batch.num_rows() {
@@ -363,11 +414,49 @@ impl<F: FileOpener> FileStream<F> {
});
if result.is_err() {
+ // If the partition value projection fails, this is not governed by
+ // the `OnError` behavior
self.state = FileStreamState::Error
}
self.file_stream_metrics.time_scanning_total.start();
return Poll::Ready(Some(result.map_err(Into::into)));
}
+ Some(Err(err)) => {
+ self.file_stream_metrics.file_scan_errors.add(1);
+ self.file_stream_metrics.time_scanning_until_data.stop();
+ self.file_stream_metrics.time_scanning_total.stop();
+
+ match self.on_error {
+ // If `OnError::Skip` we skip the file as soon as we hit the first error
+ OnError::Skip => match mem::take(next) {
+ Some((future, partition_values)) => {
+ self.file_stream_metrics.time_opening.start();
+
+ match future {
+ NextOpen::Pending(future) => {
+ self.state = FileStreamState::Open {
+ future,
+ partition_values,
+ }
+ }
+ NextOpen::Ready(reader) => {
+ self.state = FileStreamState::Open {
+ future: Box::pin(std::future::ready(
+ reader,
+ )),
+ partition_values,
+ }
+ }
+ }
+ }
+ None => return Poll::Ready(None),
+ },
+ OnError::Fail => {
+ self.state = FileStreamState::Error;
+ return Poll::Ready(Some(Err(err.into())));
+ }
+ }
+ }
None => {
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
@@ -428,7 +517,11 @@ impl<F: FileOpener> RecordBatchStream for FileStream<F> {
#[cfg(test)]
mod tests {
+ use arrow_schema::Schema;
+ use datafusion_common::DataFusionError;
use futures::StreamExt;
+ use std::sync::atomic::{AtomicUsize, Ordering};
+ use std::sync::Arc;
use super::*;
use crate::datasource::object_store::ObjectStoreUrl;
@@ -439,49 +532,390 @@ mod tests {
test::{make_partition, object_store::register_test_store},
};
+ /// Test `FileOpener` which will simulate errors during file opening or scanning
+ #[derive(Default)]
struct TestOpener {
+ /// Index in stream of files which should throw an error while opening
+ error_opening_idx: Vec<usize>,
+ /// Index in stream of files which should throw an error while scanning
+ error_scanning_idx: Vec<usize>,
+ /// Index of last file in stream
+ current_idx: AtomicUsize,
+ /// `RecordBatch` to return
records: Vec<RecordBatch>,
}
impl FileOpener for TestOpener {
fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
- let iterator = self.records.clone().into_iter().map(Ok);
- let stream = futures::stream::iter(iterator).boxed();
- Ok(futures::future::ready(Ok(stream)).boxed())
+ let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
+
+ if self.error_opening_idx.contains(&idx) {
+ Ok(futures::future::ready(Err(DataFusionError::Internal(
+ "error opening".to_owned(),
+ )))
+ .boxed())
+ } else if self.error_scanning_idx.contains(&idx) {
+ let error = futures::future::ready(Err(ArrowError::IoError(
+ "error scanning".to_owned(),
+ )));
+ let stream = futures::stream::once(error).boxed();
+ Ok(futures::future::ready(Ok(stream)).boxed())
+ } else {
+ let iterator = self.records.clone().into_iter().map(Ok);
+ let stream = futures::stream::iter(iterator).boxed();
+ Ok(futures::future::ready(Ok(stream)).boxed())
+ }
+ }
+ }
+
+ #[derive(Default)]
+ struct FileStreamTest {
+ /// Number of files in the stream
+ num_files: usize,
+ /// Global limit of records emitted by the stream
+ limit: Option<usize>,
+ /// Error-handling behavior of the stream
+ on_error: OnError,
+ /// Mock `FileOpener`
+ opener: TestOpener,
+ }
+
+ impl FileStreamTest {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Specify the number of files in the stream
+ pub fn with_num_files(mut self, num_files: usize) -> Self {
+ self.num_files = num_files;
+ self
+ }
+
+ /// Specify the limit
+ pub fn with_limit(mut self, limit: Option<usize>) -> Self {
+ self.limit = limit;
+ self
+ }
+
+ /// Specify the index of files in the stream which should
+ /// throw an error when opening
+ pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
+ self.opener.error_opening_idx = idx;
+ self
+ }
+
+ /// Specify the index of files in the stream which should
+ /// throw an error when scanning
+ pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
+ self.opener.error_scanning_idx = idx;
+ self
+ }
+
+ /// Specify the behavior of the stream when an error occurs
+ pub fn with_on_error(mut self, on_error: OnError) -> Self {
+ self.on_error = on_error;
+ self
+ }
+
+ /// Specify the record batches that should be returned from each
+ /// file that is successfully scanned
+ pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
+ self.opener.records = records;
+ self
+ }
+
+ /// Collect the results of the `FileStream`
+ pub async fn result(self) -> Result<Vec<RecordBatch>> {
+ let file_schema = self
+ .opener
+ .records
+ .first()
+ .map(|batch| batch.schema())
+ .unwrap_or_else(|| Arc::new(Schema::empty()));
+
+ let ctx = SessionContext::new();
+ let mock_files: Vec<(String, u64)> = (0..self.num_files)
+ .map(|idx| (format!("mock_file{idx}"), 10_u64))
+ .collect();
+
+ let mock_files_ref: Vec<(&str, u64)> = mock_files
+ .iter()
+ .map(|(name, size)| (name.as_str(), *size))
+ .collect();
+
+ register_test_store(&ctx, &mock_files_ref);
+
+ let file_group = mock_files
+ .into_iter()
+ .map(|(name, size)| PartitionedFile::new(name, size))
+ .collect();
+
+ let on_error = self.on_error;
+
+ let config = FileScanConfig {
+ object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+ file_schema,
+ file_groups: vec![file_group],
+ statistics: Default::default(),
+ projection: None,
+ limit: self.limit,
+ table_partition_cols: vec![],
+ output_ordering: vec![],
+ infinite_source: false,
+ };
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let file_stream = FileStream::new(&config, 0, self.opener, &metrics_set)
+ .unwrap()
+ .with_on_error(on_error);
+
+ file_stream
+ .collect::<Vec<_>>()
+ .await
+ .into_iter()
+ .collect::<Result<Vec<_>>>()
}
}
/// helper that creates a stream of 2 files with the same pair of batches in each ([0,1,2] and [0,1])
async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
- let records = vec![make_partition(3), make_partition(2)];
- let file_schema = records[0].schema();
-
- let reader = TestOpener { records };
-
- let ctx = SessionContext::new();
- register_test_store(&ctx, &[("mock_file1", 10), ("mock_file2", 20)]);
-
- let config = FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema,
- file_groups: vec![vec![
- PartitionedFile::new("mock_file1".to_owned(), 10),
- PartitionedFile::new("mock_file2".to_owned(), 20),
- ]],
- statistics: Default::default(),
- projection: None,
- limit,
- table_partition_cols: vec![],
- output_ordering: vec![],
- infinite_source: false,
- };
- let metrics_set = ExecutionPlanMetricsSet::new();
- let file_stream = FileStream::new(&config, 0, reader, &metrics_set).unwrap();
-
- file_stream
- .map(|b| b.expect("No error expected in stream"))
- .collect::<Vec<_>>()
+ FileStreamTest::new()
+ .with_records(vec![make_partition(3), make_partition(2)])
+ .with_num_files(2)
+ .with_limit(limit)
+ .result()
.await
+ .expect("error executing stream")
+ }
+
+ #[tokio::test]
+ async fn on_error_opening() -> Result<()> {
+ let batches = FileStreamTest::new()
+ .with_records(vec![make_partition(3), make_partition(2)])
+ .with_num_files(2)
+ .with_on_error(OnError::Skip)
+ .with_open_errors(vec![0])
+ .result()
+ .await?;
+
+ #[rustfmt::skip]
+ crate::assert_batches_eq!(&[
+ "+---+",
+ "| i |",
+ "+---+",
+ "| 0 |",
+ "| 1 |",
+ "| 2 |",
+ "| 0 |",
+ "| 1 |",
+ "+---+",
+ ], &batches);
+
+ let batches = FileStreamTest::new()
+ .with_records(vec![make_partition(3), make_partition(2)])
+ .with_num_files(2)
+ .with_on_error(OnError::Skip)
+ .with_open_errors(vec![1])
+ .result()
+ .await?;
+
+ #[rustfmt::skip]
+ crate::assert_batches_eq!(&[
+ "+---+",
+ "| i |",
+ "+---+",
+ "| 0 |",
+ "| 1 |",
+ "| 2 |",
+ "| 0 |",
+ "| 1 |",
+ "+---+",
+ ], &batches);
+
+ let batches = FileStreamTest::new()
+ .with_records(vec![make_partition(3), make_partition(2)])
+ .with_num_files(2)
+ .with_on_error(OnError::Skip)
+ .with_open_errors(vec![0, 1])
+ .result()
+ .await?;
+
+ #[rustfmt::skip]
+ crate::assert_batches_eq!(&[
+ "++",
+ "++",
+ ], &batches);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn on_error_scanning_fail() -> Result<()> {
+ let result = FileStreamTest::new()
+ .with_records(vec![make_partition(3), make_partition(2)])
+ .with_num_files(2)
+ .with_on_error(OnError::Fail)
+ .with_scan_errors(vec![1])
+ .result()
+ .await;
+
+ assert!(result.is_err());
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn on_error_opening_fail() -> Result<()> {
+ let result = FileStreamTest::new()
+ .with_records(vec![make_partition(3), make_partition(2)])
+ .with_num_files(2)
+ .with_on_error(OnError::Fail)
+ .with_open_errors(vec![1])
+ .result()
+ .await;
+
+ assert!(result.is_err());
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn on_error_scanning() -> Result<()> {
+ let batches = FileStreamTest::new()
+ .with_records(vec![make_partition(3), make_partition(2)])
+ .with_num_files(2)
+ .with_on_error(OnError::Skip)
+ .with_scan_errors(vec![0])
+ .result()
+ .await?;
+
+ #[rustfmt::skip]
+ crate::assert_batches_eq!(&[
+ "+---+",
+ "| i |",
+ "+---+",
+ "| 0 |",
+ "| 1 |",
+ "| 2 |",
+ "| 0 |",
+ "| 1 |",
+ "+---+",
+ ], &batches);
+
+ let batches = FileStreamTest::new()
+ .with_records(vec![make_partition(3), make_partition(2)])
+ .with_num_files(2)
+ .with_on_error(OnError::Skip)
+ .with_scan_errors(vec![1])
+ .result()
+ .await?;
+
+ #[rustfmt::skip]
+ crate::assert_batches_eq!(&[
+ "+---+",
+ "| i |",
+ "+---+",
+ "| 0 |",
+ "| 1 |",
+ "| 2 |",
+ "| 0 |",
+ "| 1 |",
+ "+---+",
+ ], &batches);
+
+ let batches = FileStreamTest::new()
+ .with_records(vec![make_partition(3), make_partition(2)])
+ .with_num_files(2)
+ .with_on_error(OnError::Skip)
+ .with_scan_errors(vec![0, 1])
+ .result()
+ .await?;
+
+ #[rustfmt::skip]
+ crate::assert_batches_eq!(&[
+ "++",
+ "++",
+ ], &batches);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn on_error_mixed() -> Result<()> {
+ let batches = FileStreamTest::new()
+ .with_records(vec![make_partition(3), make_partition(2)])
+ .with_num_files(3)
+ .with_on_error(OnError::Skip)
+ .with_open_errors(vec![1])
+ .with_scan_errors(vec![0])
+ .result()
+ .await?;
+
+ #[rustfmt::skip]
+ crate::assert_batches_eq!(&[
+ "+---+",
+ "| i |",
+ "+---+",
+ "| 0 |",
+ "| 1 |",
+ "| 2 |",
+ "| 0 |",
+ "| 1 |",
+ "+---+",
+ ], &batches);
+
+ let batches = FileStreamTest::new()
+ .with_records(vec![make_partition(3), make_partition(2)])
+ .with_num_files(3)
+ .with_on_error(OnError::Skip)
+ .with_open_errors(vec![0])
+ .with_scan_errors(vec![1])
+ .result()
+ .await?;
+
+ #[rustfmt::skip]
+ crate::assert_batches_eq!(&[
+ "+---+",
+ "| i |",
+ "+---+",
+ "| 0 |",
+ "| 1 |",
+ "| 2 |",
+ "| 0 |",
+ "| 1 |",
+ "+---+",
+ ], &batches);
+
+ let batches = FileStreamTest::new()
+ .with_records(vec![make_partition(3), make_partition(2)])
+ .with_num_files(3)
+ .with_on_error(OnError::Skip)
+ .with_open_errors(vec![2])
+ .with_scan_errors(vec![0, 1])
+ .result()
+ .await?;
+
+ #[rustfmt::skip]
+ crate::assert_batches_eq!(&[
+ "++",
+ "++",
+ ], &batches);
+
+ let batches = FileStreamTest::new()
+ .with_records(vec![make_partition(3), make_partition(2)])
+ .with_num_files(3)
+ .with_on_error(OnError::Skip)
+ .with_open_errors(vec![0, 2])
+ .with_scan_errors(vec![1])
+ .result()
+ .await?;
+
+ #[rustfmt::skip]
+ crate::assert_batches_eq!(&[
+ "++",
+ "++",
+ ], &batches);
+
+ Ok(())
}
#[tokio::test]
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 72dd262b05..f96806544f 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -39,7 +39,7 @@ use arrow::{
pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
-pub use file_stream::{FileOpenFuture, FileOpener, FileStream};
+pub use file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
pub(crate) use json::plan_to_json;
pub use json::{JsonOpener, NdJsonExec};