You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by th...@apache.org on 2023/06/01 10:22:29 UTC

[arrow-datafusion] branch main updated: Make `FileStream` error handling configurable (#6491)

This is an automated email from the ASF dual-hosted git repository.

thinkharderdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new aa7141a223 Make `FileStream` error handling configurable (#6491)
aa7141a223 is described below

commit aa7141a22373333410dad00126ae66b2a9c9f7d0
Author: Dan Harris <13...@users.noreply.github.com>
AuthorDate: Thu Jun 1 06:22:22 2023 -0400

    Make `FileStream` error handling configurable (#6491)
    
    * Make FileStream error behavior configurable (#187)
    
    * Skip files with errors in FileStream`
    
    * Pass error through on fail
    
    (cherry picked from commit e755a057bce8b42052fb875cc3adf03c129301f3)
    
    * Make OnError public
    
    (cherry picked from commit ace7db291f7cf9415a1ab0d8cba7bc6580a5ab12)
    
    * Update datafusion/core/src/physical_plan/file_format/file_stream.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Create common test builder for FileStream tests
    
    * Add unit tests for OnError::Fail
    
    ---------
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 .../src/physical_plan/file_format/file_stream.rs   | 516 +++++++++++++++++++--
 .../core/src/physical_plan/file_format/mod.rs      |   2 +-
 2 files changed, 476 insertions(+), 42 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 2ca9a076cf..4dd3ad60be 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -41,7 +41,7 @@ use crate::physical_plan::file_format::{
     FileMeta, FileScanConfig, PartitionColumnProjector,
 };
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, Time,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
 };
 use crate::physical_plan::RecordBatchStream;
 
@@ -49,6 +49,20 @@ use crate::physical_plan::RecordBatchStream;
 pub type FileOpenFuture =
     BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>;
 
+/// Describes the behavior of the `FileStream` if file opening or scanning fails
+pub enum OnError {
+    /// Fail the entire stream and return the underlying error
+    Fail,
+    /// Continue scanning, ignoring the failed file
+    Skip,
+}
+
+impl Default for OnError {
+    fn default() -> Self {
+        Self::Fail
+    }
+}
+
 /// Generic API for opening a file using an [`ObjectStore`] and resolving to a
 /// stream of [`RecordBatch`]
 ///
@@ -81,6 +95,8 @@ pub struct FileStream<F: FileOpener> {
     file_stream_metrics: FileStreamMetrics,
     /// runtime baseline metrics
     baseline_metrics: BaselineMetrics,
+    /// Describes the behavior of the `FileStream` if file opening or scanning fails
+    on_error: OnError,
 }
 
 /// Represents the state of the next `FileOpenFuture`. Since we need to poll
@@ -173,6 +189,16 @@ struct FileStreamMetrics {
     ///
     /// Time spent waiting for the FileStream's input.
     pub time_processing: StartableTime,
+    /// Count of errors opening file.
+    ///
+    /// If using `OnError::Skip` this will provide a count of the number of files
+    /// which were skipped and will not be included in the scan results.
+    pub file_open_errors: Count,
+    /// Count of errors scanning file
+    ///
+    /// If using `OnError::Skip` this will provide a count of the number of files
+    /// which were skipped and will not be included in the scan results.
+    pub file_scan_errors: Count,
 }
 
 impl FileStreamMetrics {
@@ -201,11 +227,19 @@ impl FileStreamMetrics {
             start: None,
         };
 
+        let file_open_errors =
+            MetricBuilder::new(metrics).counter("file_open_errors", partition);
+
+        let file_scan_errors =
+            MetricBuilder::new(metrics).counter("file_scan_errors", partition);
+
         Self {
             time_opening,
             time_scanning_until_data,
             time_scanning_total,
             time_processing,
+            file_open_errors,
+            file_scan_errors,
         }
     }
 }
@@ -239,9 +273,19 @@ impl<F: FileOpener> FileStream<F> {
             state: FileStreamState::Idle,
             file_stream_metrics: FileStreamMetrics::new(metrics, partition),
             baseline_metrics: BaselineMetrics::new(metrics, partition),
+            on_error: OnError::Fail,
         })
     }
 
+    /// Specify the behavior when an error occurs opening or scanning a file
+    ///
+    /// If `OnError::Skip` the stream will skip files which encounter an error and continue
+    /// If `OnError:Fail` (default) the stream will fail and stop processing when an error occurs
+    pub fn with_on_error(mut self, on_error: OnError) -> Self {
+        self.on_error = on_error;
+        self
+    }
+
     /// Begin opening the next file in parallel while decoding the current file in FileStream.
     ///
     /// Since file opening is mostly IO (and may involve a
@@ -320,8 +364,17 @@ impl<F: FileOpener> FileStream<F> {
                         }
                     }
                     Err(e) => {
-                        self.state = FileStreamState::Error;
-                        return Poll::Ready(Some(Err(e)));
+                        self.file_stream_metrics.file_open_errors.add(1);
+                        match self.on_error {
+                            OnError::Skip => {
+                                self.file_stream_metrics.time_opening.stop();
+                                self.state = FileStreamState::Idle
+                            }
+                            OnError::Fail => {
+                                self.state = FileStreamState::Error;
+                                return Poll::Ready(Some(Err(e)));
+                            }
+                        }
                     }
                 },
                 FileStreamState::Scan {
@@ -338,15 +391,13 @@ impl<F: FileOpener> FileStream<F> {
                         }
                     }
                     match ready!(reader.poll_next_unpin(cx)) {
-                        Some(result) => {
+                        Some(Ok(batch)) => {
                             self.file_stream_metrics.time_scanning_until_data.stop();
                             self.file_stream_metrics.time_scanning_total.stop();
-                            let result = result
-                                .and_then(|b| {
-                                    self.pc_projector
-                                        .project(b, partition_values)
-                                        .map_err(|e| ArrowError::ExternalError(e.into()))
-                                })
+                            let result = self
+                                .pc_projector
+                                .project(batch, partition_values)
+                                .map_err(|e| ArrowError::ExternalError(e.into()))
                                 .map(|batch| match &mut self.remain {
                                     Some(remain) => {
                                         if *remain > batch.num_rows() {
@@ -363,11 +414,49 @@ impl<F: FileOpener> FileStream<F> {
                                 });
 
                             if result.is_err() {
+                                // If the partition value projection fails, this is not governed by
+                                // the `OnError` behavior
                                 self.state = FileStreamState::Error
                             }
                             self.file_stream_metrics.time_scanning_total.start();
                             return Poll::Ready(Some(result.map_err(Into::into)));
                         }
+                        Some(Err(err)) => {
+                            self.file_stream_metrics.file_scan_errors.add(1);
+                            self.file_stream_metrics.time_scanning_until_data.stop();
+                            self.file_stream_metrics.time_scanning_total.stop();
+
+                            match self.on_error {
+                                // If `OnError::Skip` we skip the file as soon as we hit the first error
+                                OnError::Skip => match mem::take(next) {
+                                    Some((future, partition_values)) => {
+                                        self.file_stream_metrics.time_opening.start();
+
+                                        match future {
+                                            NextOpen::Pending(future) => {
+                                                self.state = FileStreamState::Open {
+                                                    future,
+                                                    partition_values,
+                                                }
+                                            }
+                                            NextOpen::Ready(reader) => {
+                                                self.state = FileStreamState::Open {
+                                                    future: Box::pin(std::future::ready(
+                                                        reader,
+                                                    )),
+                                                    partition_values,
+                                                }
+                                            }
+                                        }
+                                    }
+                                    None => return Poll::Ready(None),
+                                },
+                                OnError::Fail => {
+                                    self.state = FileStreamState::Error;
+                                    return Poll::Ready(Some(Err(err.into())));
+                                }
+                            }
+                        }
                         None => {
                             self.file_stream_metrics.time_scanning_until_data.stop();
                             self.file_stream_metrics.time_scanning_total.stop();
@@ -428,7 +517,11 @@ impl<F: FileOpener> RecordBatchStream for FileStream<F> {
 
 #[cfg(test)]
 mod tests {
+    use arrow_schema::Schema;
+    use datafusion_common::DataFusionError;
     use futures::StreamExt;
+    use std::sync::atomic::{AtomicUsize, Ordering};
+    use std::sync::Arc;
 
     use super::*;
     use crate::datasource::object_store::ObjectStoreUrl;
@@ -439,49 +532,390 @@ mod tests {
         test::{make_partition, object_store::register_test_store},
     };
 
+    /// Test `FileOpener` which will simulate errors during file opening or scanning
+    #[derive(Default)]
     struct TestOpener {
+        /// Index in stream of files which should throw an error while opening
+        error_opening_idx: Vec<usize>,
+        /// Index in stream of files which should throw an error while scanning
+        error_scanning_idx: Vec<usize>,
+        /// Index of last file in stream
+        current_idx: AtomicUsize,
+        /// `RecordBatch` to return
         records: Vec<RecordBatch>,
     }
 
     impl FileOpener for TestOpener {
         fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
-            let iterator = self.records.clone().into_iter().map(Ok);
-            let stream = futures::stream::iter(iterator).boxed();
-            Ok(futures::future::ready(Ok(stream)).boxed())
+            let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
+
+            if self.error_opening_idx.contains(&idx) {
+                Ok(futures::future::ready(Err(DataFusionError::Internal(
+                    "error opening".to_owned(),
+                )))
+                .boxed())
+            } else if self.error_scanning_idx.contains(&idx) {
+                let error = futures::future::ready(Err(ArrowError::IoError(
+                    "error scanning".to_owned(),
+                )));
+                let stream = futures::stream::once(error).boxed();
+                Ok(futures::future::ready(Ok(stream)).boxed())
+            } else {
+                let iterator = self.records.clone().into_iter().map(Ok);
+                let stream = futures::stream::iter(iterator).boxed();
+                Ok(futures::future::ready(Ok(stream)).boxed())
+            }
+        }
+    }
+
+    #[derive(Default)]
+    struct FileStreamTest {
+        /// Number of files in the stream
+        num_files: usize,
+        /// Global limit of records emitted by the stream
+        limit: Option<usize>,
+        /// Error-handling behavior of the stream
+        on_error: OnError,
+        /// Mock `FileOpener`
+        opener: TestOpener,
+    }
+
+    impl FileStreamTest {
+        pub fn new() -> Self {
+            Self::default()
+        }
+
+        /// Specify the number of files in the stream
+        pub fn with_num_files(mut self, num_files: usize) -> Self {
+            self.num_files = num_files;
+            self
+        }
+
+        /// Specify the limit
+        pub fn with_limit(mut self, limit: Option<usize>) -> Self {
+            self.limit = limit;
+            self
+        }
+
+        /// Specify the index of files in the stream which should
+        /// throw an error when opening
+        pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
+            self.opener.error_opening_idx = idx;
+            self
+        }
+
+        /// Specify the index of files in the stream which should
+        /// throw an error when scanning
+        pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
+            self.opener.error_scanning_idx = idx;
+            self
+        }
+
+        /// Specify the behavior of the stream when an error occurs
+        pub fn with_on_error(mut self, on_error: OnError) -> Self {
+            self.on_error = on_error;
+            self
+        }
+
+        /// Specify the record batches that should be returned from each
+        /// file that is successfully scanned
+        pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
+            self.opener.records = records;
+            self
+        }
+
+        /// Collect the results of the `FileStream`
+        pub async fn result(self) -> Result<Vec<RecordBatch>> {
+            let file_schema = self
+                .opener
+                .records
+                .first()
+                .map(|batch| batch.schema())
+                .unwrap_or_else(|| Arc::new(Schema::empty()));
+
+            let ctx = SessionContext::new();
+            let mock_files: Vec<(String, u64)> = (0..self.num_files)
+                .map(|idx| (format!("mock_file{idx}"), 10_u64))
+                .collect();
+
+            let mock_files_ref: Vec<(&str, u64)> = mock_files
+                .iter()
+                .map(|(name, size)| (name.as_str(), *size))
+                .collect();
+
+            register_test_store(&ctx, &mock_files_ref);
+
+            let file_group = mock_files
+                .into_iter()
+                .map(|(name, size)| PartitionedFile::new(name, size))
+                .collect();
+
+            let on_error = self.on_error;
+
+            let config = FileScanConfig {
+                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+                file_schema,
+                file_groups: vec![file_group],
+                statistics: Default::default(),
+                projection: None,
+                limit: self.limit,
+                table_partition_cols: vec![],
+                output_ordering: vec![],
+                infinite_source: false,
+            };
+            let metrics_set = ExecutionPlanMetricsSet::new();
+            let file_stream = FileStream::new(&config, 0, self.opener, &metrics_set)
+                .unwrap()
+                .with_on_error(on_error);
+
+            file_stream
+                .collect::<Vec<_>>()
+                .await
+                .into_iter()
+                .collect::<Result<Vec<_>>>()
         }
     }
 
     /// helper that creates a stream of 2 files with the same pair of batches in each ([0,1,2] and [0,1])
     async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
-        let records = vec![make_partition(3), make_partition(2)];
-        let file_schema = records[0].schema();
-
-        let reader = TestOpener { records };
-
-        let ctx = SessionContext::new();
-        register_test_store(&ctx, &[("mock_file1", 10), ("mock_file2", 20)]);
-
-        let config = FileScanConfig {
-            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-            file_schema,
-            file_groups: vec![vec![
-                PartitionedFile::new("mock_file1".to_owned(), 10),
-                PartitionedFile::new("mock_file2".to_owned(), 20),
-            ]],
-            statistics: Default::default(),
-            projection: None,
-            limit,
-            table_partition_cols: vec![],
-            output_ordering: vec![],
-            infinite_source: false,
-        };
-        let metrics_set = ExecutionPlanMetricsSet::new();
-        let file_stream = FileStream::new(&config, 0, reader, &metrics_set).unwrap();
-
-        file_stream
-            .map(|b| b.expect("No error expected in stream"))
-            .collect::<Vec<_>>()
+        FileStreamTest::new()
+            .with_records(vec![make_partition(3), make_partition(2)])
+            .with_num_files(2)
+            .with_limit(limit)
+            .result()
             .await
+            .expect("error executing stream")
+    }
+
+    #[tokio::test]
+    async fn on_error_opening() -> Result<()> {
+        let batches = FileStreamTest::new()
+            .with_records(vec![make_partition(3), make_partition(2)])
+            .with_num_files(2)
+            .with_on_error(OnError::Skip)
+            .with_open_errors(vec![0])
+            .result()
+            .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        let batches = FileStreamTest::new()
+            .with_records(vec![make_partition(3), make_partition(2)])
+            .with_num_files(2)
+            .with_on_error(OnError::Skip)
+            .with_open_errors(vec![1])
+            .result()
+            .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        let batches = FileStreamTest::new()
+            .with_records(vec![make_partition(3), make_partition(2)])
+            .with_num_files(2)
+            .with_on_error(OnError::Skip)
+            .with_open_errors(vec![0, 1])
+            .result()
+            .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "++",
+            "++",
+        ], &batches);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn on_error_scanning_fail() -> Result<()> {
+        let result = FileStreamTest::new()
+            .with_records(vec![make_partition(3), make_partition(2)])
+            .with_num_files(2)
+            .with_on_error(OnError::Fail)
+            .with_scan_errors(vec![1])
+            .result()
+            .await;
+
+        assert!(result.is_err());
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn on_error_opening_fail() -> Result<()> {
+        let result = FileStreamTest::new()
+            .with_records(vec![make_partition(3), make_partition(2)])
+            .with_num_files(2)
+            .with_on_error(OnError::Fail)
+            .with_open_errors(vec![1])
+            .result()
+            .await;
+
+        assert!(result.is_err());
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn on_error_scanning() -> Result<()> {
+        let batches = FileStreamTest::new()
+            .with_records(vec![make_partition(3), make_partition(2)])
+            .with_num_files(2)
+            .with_on_error(OnError::Skip)
+            .with_scan_errors(vec![0])
+            .result()
+            .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        let batches = FileStreamTest::new()
+            .with_records(vec![make_partition(3), make_partition(2)])
+            .with_num_files(2)
+            .with_on_error(OnError::Skip)
+            .with_scan_errors(vec![1])
+            .result()
+            .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        let batches = FileStreamTest::new()
+            .with_records(vec![make_partition(3), make_partition(2)])
+            .with_num_files(2)
+            .with_on_error(OnError::Skip)
+            .with_scan_errors(vec![0, 1])
+            .result()
+            .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "++",
+            "++",
+        ], &batches);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn on_error_mixed() -> Result<()> {
+        let batches = FileStreamTest::new()
+            .with_records(vec![make_partition(3), make_partition(2)])
+            .with_num_files(3)
+            .with_on_error(OnError::Skip)
+            .with_open_errors(vec![1])
+            .with_scan_errors(vec![0])
+            .result()
+            .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        let batches = FileStreamTest::new()
+            .with_records(vec![make_partition(3), make_partition(2)])
+            .with_num_files(3)
+            .with_on_error(OnError::Skip)
+            .with_open_errors(vec![0])
+            .with_scan_errors(vec![1])
+            .result()
+            .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        let batches = FileStreamTest::new()
+            .with_records(vec![make_partition(3), make_partition(2)])
+            .with_num_files(3)
+            .with_on_error(OnError::Skip)
+            .with_open_errors(vec![2])
+            .with_scan_errors(vec![0, 1])
+            .result()
+            .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "++",
+            "++",
+        ], &batches);
+
+        let batches = FileStreamTest::new()
+            .with_records(vec![make_partition(3), make_partition(2)])
+            .with_num_files(3)
+            .with_on_error(OnError::Skip)
+            .with_open_errors(vec![0, 2])
+            .with_scan_errors(vec![1])
+            .result()
+            .await?;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "++",
+            "++",
+        ], &batches);
+
+        Ok(())
     }
 
     #[tokio::test]
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 72dd262b05..f96806544f 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -39,7 +39,7 @@ use arrow::{
 pub use arrow_file::ArrowExec;
 pub use avro::AvroExec;
 use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
-pub use file_stream::{FileOpenFuture, FileOpener, FileStream};
+pub use file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
 pub(crate) use json::plan_to_json;
 pub use json::{JsonOpener, NdJsonExec};