You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/04/13 15:36:09 UTC

[arrow-datafusion] branch master updated: Make ParquetExec usable outside of a tokio runtime (#2201) (#2202)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new f39692932 Make ParquetExec usable outside of a tokio runtime (#2201) (#2202)
f39692932 is described below

commit f39692932ceb5d6ed51d2b55831dd94766d74df2
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Apr 13 16:36:04 2022 +0100

    Make ParquetExec usable outside of a tokio runtime (#2201) (#2202)
---
 .../core/src/physical_plan/file_format/parquet.rs  | 489 ++++++++++-----------
 1 file changed, 244 insertions(+), 245 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 2b21a5421..071a07fa0 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -17,59 +17,56 @@
 
 //! Execution plan for reading Parquet files
 
-use futures::{StreamExt, TryStreamExt};
+use fmt::Debug;
+use std::collections::VecDeque;
 use std::fmt;
 use std::fs;
 use std::path::Path;
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::{Context, Poll};
 use std::{any::Any, convert::TryInto};
 
-use crate::datasource::file_format::parquet::ChunkObjectReader;
-use crate::execution::context::{SessionState, TaskContext};
-use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::{
-    error::{DataFusionError, Result},
-    physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
-    physical_plan::{
-        file_format::FileScanConfig,
-        metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
-        stream::RecordBatchReceiverStream,
-        DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
-        Statistics,
-    },
-    scalar::ScalarValue,
-};
-use datafusion_common::Column;
-use datafusion_data_access::object_store::ObjectStore;
-use datafusion_expr::Expr;
-
 use arrow::{
     array::ArrayRef,
     datatypes::{Schema, SchemaRef},
     error::{ArrowError, Result as ArrowResult},
     record_batch::RecordBatch,
 };
-use log::{debug, warn};
-use parquet::arrow::ArrowWriter;
+use async_trait::async_trait;
+use futures::{Stream, StreamExt, TryStreamExt};
+use log::debug;
+use parquet::arrow::{
+    arrow_reader::ParquetRecordBatchReader, ArrowReader, ArrowWriter,
+    ParquetFileArrowReader,
+};
+use parquet::file::reader::FileReader;
 use parquet::file::{
-    metadata::RowGroupMetaData, reader::SerializedFileReader,
-    serialized_reader::ReadOptionsBuilder, statistics::Statistics as ParquetStatistics,
+    metadata::RowGroupMetaData, properties::WriterProperties,
+    reader::SerializedFileReader, serialized_reader::ReadOptionsBuilder,
+    statistics::Statistics as ParquetStatistics,
 };
 
-use fmt::Debug;
-use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
-use parquet::file::properties::WriterProperties;
+use datafusion_common::Column;
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_expr::Expr;
 
-use tokio::task::JoinHandle;
-use tokio::{
-    sync::mpsc::{channel, Receiver, Sender},
-    task,
+use crate::physical_plan::stream::RecordBatchReceiverStream;
+use crate::{
+    datasource::{file_format::parquet::ChunkObjectReader, listing::PartitionedFile},
+    error::{DataFusionError, Result},
+    execution::context::{SessionState, TaskContext},
+    physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
+    physical_plan::{
+        expressions::PhysicalSortExpr,
+        file_format::{FileScanConfig, SchemaAdapter},
+        metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
+        DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+        SendableRecordBatchStream, Statistics,
+    },
+    scalar::ScalarValue,
 };
 
-use crate::datasource::listing::PartitionedFile;
-use crate::physical_plan::file_format::SchemaAdapter;
-use async_trait::async_trait;
-
 use super::PartitionColumnProjector;
 
 /// Execution plan for scanning one or more Parquet partitions
@@ -203,75 +200,51 @@ impl ExecutionPlan for ParquetExec {
         partition_index: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        // because the parquet implementation is not thread-safe, it is necessary to execute
-        // on a thread and communicate with channels
-        let (response_tx, response_rx): (
-            Sender<ArrowResult<RecordBatch>>,
-            Receiver<ArrowResult<RecordBatch>>,
-        ) = channel(2);
-
-        let partition = self.base_config.file_groups[partition_index].clone();
-        let metrics = self.metrics.clone();
         let projection = match self.base_config.file_column_projection_indices() {
             Some(proj) => proj,
             None => (0..self.base_config.file_schema.fields().len()).collect(),
         };
-        let pruning_predicate = self.pruning_predicate.clone();
-        let batch_size = context.session_config().batch_size;
-        let limit = self.base_config.limit;
-        let object_store = Arc::clone(&self.base_config.object_store);
         let partition_col_proj = PartitionColumnProjector::new(
             Arc::clone(&self.projected_schema),
             &self.base_config.table_partition_cols,
         );
 
-        let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
-
-        let join_handle = task::spawn_blocking(move || {
-            let res = if projection.is_empty() {
-                read_partition_no_file_columns(
-                    object_store.as_ref(),
-                    &partition,
-                    batch_size,
-                    response_tx.clone(),
-                    limit,
-                    partition_col_proj,
-                )
-            } else {
-                read_partition(
-                    object_store.as_ref(),
-                    adapter,
-                    partition_index,
-                    &partition,
-                    metrics,
-                    &projection,
-                    &pruning_predicate,
-                    batch_size,
-                    response_tx.clone(),
-                    limit,
-                    partition_col_proj,
-                )
-            };
+        let stream = ParquetExecStream {
+            error: false,
+            partition_index,
+            metrics: self.metrics.clone(),
+            object_store: self.base_config.object_store.clone(),
+            pruning_predicate: self.pruning_predicate.clone(),
+            batch_size: context.session_config().batch_size,
+            schema: self.projected_schema.clone(),
+            projection,
+            remaining_rows: self.base_config.limit,
+            reader: None,
+            files: self.base_config.file_groups[partition_index].clone().into(),
+            projector: partition_col_proj,
+            adapter: SchemaAdapter::new(self.base_config.file_schema.clone()),
+        };
 
-            if let Err(e) = res {
-                warn!(
-                    "Parquet reader thread terminated due to error: {:?} for files: {:?}",
-                    e, partition
-                );
-                // Send the error back to the main thread.
-                //
-                // Ignore error sending (via `.ok()`) because that
-                // means the receiver has been torn down (and nothing
-                // cares about the errors anymore)
-                send_result(&response_tx, Err(e.into())).ok();
+        // Use spawn_blocking only if running from a tokio context (#2201)
+        match tokio::runtime::Handle::try_current() {
+            Ok(handle) => {
+                let (response_tx, response_rx) = tokio::sync::mpsc::channel(2);
+                let schema = stream.schema();
+                let join_handle = handle.spawn_blocking(move || {
+                    for result in stream {
+                        if response_tx.blocking_send(result).is_err() {
+                            break;
+                        }
+                    }
+                });
+                Ok(RecordBatchReceiverStream::create(
+                    &schema,
+                    response_rx,
+                    join_handle,
+                ))
             }
-        });
-
-        Ok(RecordBatchReceiverStream::create(
-            &self.projected_schema,
-            response_rx,
-            join_handle,
-        ))
+            Err(_) => Ok(Box::pin(stream)),
+        }
     }
 
     fn fmt_as(
@@ -312,15 +285,175 @@ impl ExecutionPlan for ParquetExec {
     }
 }
 
-fn send_result(
-    response_tx: &Sender<ArrowResult<RecordBatch>>,
-    result: ArrowResult<RecordBatch>,
-) -> Result<()> {
-    // Note this function is running on its own blockng tokio thread so blocking here is ok.
-    response_tx
-        .blocking_send(result)
-        .map_err(|e| DataFusionError::Execution(e.to_string()))?;
-    Ok(())
+/// Special-case empty column projection
+///
+/// This is a workaround for https://github.com/apache/arrow-rs/issues/1537
+enum ProjectedReader {
+    Reader {
+        reader: ParquetRecordBatchReader,
+    },
+    EmptyProjection {
+        remaining_rows: usize,
+        batch_size: usize,
+    },
+}
+
+/// Implements [`RecordBatchStream`] for a collection of [`PartitionedFile`]
+///
+/// NB: This will perform blocking IO synchronously without yielding which may
+/// be problematic in certain contexts (e.g. a tokio runtime that also performs
+/// network IO)
+struct ParquetExecStream {
+    error: bool,
+    partition_index: usize,
+    metrics: ExecutionPlanMetricsSet,
+    object_store: Arc<dyn ObjectStore>,
+    pruning_predicate: Option<PruningPredicate>,
+    batch_size: usize,
+    schema: SchemaRef,
+    projection: Vec<usize>,
+    remaining_rows: Option<usize>,
+    reader: Option<(ProjectedReader, PartitionedFile)>,
+    files: VecDeque<PartitionedFile>,
+    projector: PartitionColumnProjector,
+    adapter: SchemaAdapter,
+}
+
+impl ParquetExecStream {
+    fn create_reader(&mut self, file: &PartitionedFile) -> Result<ProjectedReader> {
+        let file_metrics = ParquetFileMetrics::new(
+            self.partition_index,
+            file.file_meta.path(),
+            &self.metrics,
+        );
+        let object_reader = self
+            .object_store
+            .file_reader(file.file_meta.sized_file.clone())?;
+
+        let mut opt = ReadOptionsBuilder::new();
+        if let Some(pruning_predicate) = &self.pruning_predicate {
+            opt = opt.with_predicate(build_row_group_predicate(
+                pruning_predicate,
+                file_metrics,
+            ));
+        }
+
+        let file_reader = SerializedFileReader::new_with_options(
+            ChunkObjectReader(object_reader),
+            opt.build(),
+        )?;
+
+        if self.projection.is_empty() {
+            let remaining_rows = file_reader
+                .metadata()
+                .file_metadata()
+                .num_rows()
+                .try_into()
+                .expect("Row count should always be greater than or equal to 0 and less than usize::MAX");
+
+            return Ok(ProjectedReader::EmptyProjection {
+                remaining_rows,
+                batch_size: self.batch_size,
+            });
+        }
+
+        let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
+
+        let adapted_projections = self
+            .adapter
+            .map_projections(&arrow_reader.get_schema()?, &self.projection)?;
+
+        let reader = arrow_reader
+            .get_record_reader_by_columns(adapted_projections, self.batch_size)?;
+
+        Ok(ProjectedReader::Reader { reader })
+    }
+}
+
+impl Iterator for ParquetExecStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if self.error || matches!(self.remaining_rows, Some(0)) {
+            return None;
+        }
+
+        // TODO: Split this out into separate operators (#2079)
+        loop {
+            let (reader, file) = match self.reader.as_mut() {
+                Some(current) => current,
+                None => match self.files.pop_front() {
+                    None => return None,
+                    Some(file) => match self.create_reader(&file) {
+                        Ok(reader) => self.reader.insert((reader, file)),
+                        Err(e) => {
+                            self.error = true;
+                            return Some(Err(ArrowError::ExternalError(Box::new(e))));
+                        }
+                    },
+                },
+            };
+
+            let result = match reader {
+                ProjectedReader::Reader { reader } => reader.next().map(|result| {
+                    result
+                        .and_then(|batch| {
+                            self.adapter
+                                .adapt_batch(batch, &self.projection)
+                                .map_err(|e| ArrowError::ExternalError(Box::new(e)))
+                        })
+                        .and_then(|batch| {
+                            self.projector.project(batch, &file.partition_values)
+                        })
+                }),
+                ProjectedReader::EmptyProjection {
+                    remaining_rows,
+                    batch_size,
+                } => {
+                    let size = *remaining_rows.min(batch_size);
+                    *remaining_rows -= size;
+                    (size != 0).then(|| {
+                        self.projector
+                            .project_from_size(size, &file.partition_values)
+                    })
+                }
+            };
+
+            let result = match result {
+                Some(result) => result,
+                None => {
+                    self.reader = None;
+                    continue;
+                }
+            };
+
+            match (&result, self.remaining_rows.as_mut()) {
+                (Ok(batch), Some(remaining_rows)) => {
+                    *remaining_rows = remaining_rows.saturating_sub(batch.num_rows());
+                }
+                _ => self.error = result.is_err(),
+            }
+
+            return Some(result);
+        }
+    }
+}
+
+impl Stream for ParquetExecStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        Poll::Ready(Iterator::next(&mut *self))
+    }
+}
+
+impl RecordBatchStream for ParquetExecStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
 }
 
 /// Wraps parquet statistics in a way
@@ -454,141 +587,6 @@ fn build_row_group_predicate(
     )
 }
 
-fn read_partition_no_file_columns(
-    object_store: &dyn ObjectStore,
-    partition: &[PartitionedFile],
-    batch_size: usize,
-    response_tx: Sender<ArrowResult<RecordBatch>>,
-    limit: Option<usize>,
-    mut partition_column_projector: PartitionColumnProjector,
-) -> Result<()> {
-    use parquet::file::reader::FileReader;
-    let mut limit = limit.unwrap_or(usize::MAX);
-
-    for partitioned_file in partition {
-        if limit == 0 {
-            break;
-        }
-        let object_reader =
-            object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
-        let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?;
-        let mut num_rows = usize::min(limit, file_reader
-                        .metadata()
-                        .file_metadata()
-                        .num_rows()
-                        .try_into()
-                        .expect("Row count should always be greater than or equal to 0 and less than usize::MAX"));
-        limit -= num_rows;
-
-        let partition_batch = partition_column_projector
-            .project_from_size(batch_size, &partitioned_file.partition_values)
-            .map_err(|e| {
-                let err_msg =
-                    format!("Error reading batch from {}: {}", partitioned_file, e);
-                if let Err(send_err) = send_result(
-                    &response_tx,
-                    Err(ArrowError::ParquetError(err_msg.clone())),
-                ) {
-                    return send_err;
-                }
-                DataFusionError::Execution(err_msg)
-            })?;
-
-        while num_rows > batch_size {
-            send_result(&response_tx, Ok(partition_batch.clone()))?;
-            num_rows -= batch_size;
-        }
-        let residual_batch = partition_batch.slice(0, num_rows);
-        send_result(&response_tx, Ok(residual_batch))?;
-    }
-    Ok(())
-}
-
-#[allow(clippy::too_many_arguments)]
-fn read_partition(
-    object_store: &dyn ObjectStore,
-    schema_adapter: SchemaAdapter,
-    partition_index: usize,
-    partition: &[PartitionedFile],
-    metrics: ExecutionPlanMetricsSet,
-    projection: &[usize],
-    pruning_predicate: &Option<PruningPredicate>,
-    batch_size: usize,
-    response_tx: Sender<ArrowResult<RecordBatch>>,
-    limit: Option<usize>,
-    mut partition_column_projector: PartitionColumnProjector,
-) -> Result<()> {
-    let mut total_rows = 0;
-    'outer: for partitioned_file in partition {
-        debug!("Reading file {}", &partitioned_file.file_meta.path());
-
-        let file_metrics = ParquetFileMetrics::new(
-            partition_index,
-            &*partitioned_file.file_meta.path(),
-            &metrics,
-        );
-        let object_reader =
-            object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
-
-        let mut opt = ReadOptionsBuilder::new();
-        if let Some(pruning_predicate) = pruning_predicate {
-            opt = opt.with_predicate(build_row_group_predicate(
-                pruning_predicate,
-                file_metrics,
-            ));
-        }
-
-        let file_reader = SerializedFileReader::new_with_options(
-            ChunkObjectReader(object_reader),
-            opt.build(),
-        )?;
-
-        let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
-        let adapted_projections =
-            schema_adapter.map_projections(&arrow_reader.get_schema()?, projection)?;
-
-        let mut batch_reader =
-            arrow_reader.get_record_reader_by_columns(adapted_projections, batch_size)?;
-        loop {
-            match batch_reader.next() {
-                Some(Ok(batch)) => {
-                    total_rows += batch.num_rows();
-
-                    let adapted_batch = schema_adapter.adapt_batch(batch, projection)?;
-
-                    let proj_batch = partition_column_projector
-                        .project(adapted_batch, &partitioned_file.partition_values);
-
-                    let send_rt = send_result(&response_tx, proj_batch);
-                    if send_rt.is_err() || limit.map(|l| total_rows >= l).unwrap_or(false)
-                    {
-                        break 'outer;
-                    }
-                }
-                None => {
-                    break;
-                }
-                Some(Err(e)) => {
-                    let err_msg =
-                        format!("Error reading batch from {}: {}", partitioned_file, e);
-                    // send_result error, if any, should not overwrite
-                    // the original ArrowError, so ignore it
-                    let _ = send_result(
-                        &response_tx,
-                        Err(ArrowError::ParquetError(err_msg.clone())),
-                    );
-                    // terminate thread with error
-                    return Err(DataFusionError::Execution(err_msg));
-                }
-            }
-        }
-    }
-
-    // finished reading files (dropping response_tx will close
-    // channel)
-    Ok(())
-}
-
 /// Executes a query and writes the results to a partitioned Parquet file.
 pub async fn plan_to_parquet(
     state: &SessionState,
@@ -614,14 +612,15 @@ pub async fn plan_to_parquet(
                 )?;
                 let task_ctx = Arc::new(TaskContext::from(state));
                 let stream = plan.execute(i, task_ctx).await?;
-                let handle: JoinHandle<Result<()>> = task::spawn(async move {
-                    stream
-                        .map(|batch| writer.write(&batch?))
-                        .try_collect()
-                        .await
-                        .map_err(DataFusionError::from)?;
-                    writer.close().map_err(DataFusionError::from).map(|_| ())
-                });
+                let handle: tokio::task::JoinHandle<Result<()>> =
+                    tokio::task::spawn(async move {
+                        stream
+                            .map(|batch| writer.write(&batch?))
+                            .try_collect()
+                            .await
+                            .map_err(DataFusionError::from)?;
+                        writer.close().map_err(DataFusionError::from).map(|_| ())
+                    });
                 tasks.push(handle);
             }
             futures::future::join_all(tasks).await;