You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/04/13 15:36:09 UTC
[arrow-datafusion] branch master updated: Make ParquetExec usable outside of a tokio runtime (#2201) (#2202)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new f39692932 Make ParquetExec usable outside of a tokio runtime (#2201) (#2202)
f39692932 is described below
commit f39692932ceb5d6ed51d2b55831dd94766d74df2
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Apr 13 16:36:04 2022 +0100
Make ParquetExec usable outside of a tokio runtime (#2201) (#2202)
---
.../core/src/physical_plan/file_format/parquet.rs | 489 ++++++++++-----------
1 file changed, 244 insertions(+), 245 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 2b21a5421..071a07fa0 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -17,59 +17,56 @@
//! Execution plan for reading Parquet files
-use futures::{StreamExt, TryStreamExt};
+use fmt::Debug;
+use std::collections::VecDeque;
use std::fmt;
use std::fs;
use std::path::Path;
+use std::pin::Pin;
use std::sync::Arc;
+use std::task::{Context, Poll};
use std::{any::Any, convert::TryInto};
-use crate::datasource::file_format::parquet::ChunkObjectReader;
-use crate::execution::context::{SessionState, TaskContext};
-use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::{
- error::{DataFusionError, Result},
- physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
- physical_plan::{
- file_format::FileScanConfig,
- metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
- stream::RecordBatchReceiverStream,
- DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
- Statistics,
- },
- scalar::ScalarValue,
-};
-use datafusion_common::Column;
-use datafusion_data_access::object_store::ObjectStore;
-use datafusion_expr::Expr;
-
use arrow::{
array::ArrayRef,
datatypes::{Schema, SchemaRef},
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
-use log::{debug, warn};
-use parquet::arrow::ArrowWriter;
+use async_trait::async_trait;
+use futures::{Stream, StreamExt, TryStreamExt};
+use log::debug;
+use parquet::arrow::{
+ arrow_reader::ParquetRecordBatchReader, ArrowReader, ArrowWriter,
+ ParquetFileArrowReader,
+};
+use parquet::file::reader::FileReader;
use parquet::file::{
- metadata::RowGroupMetaData, reader::SerializedFileReader,
- serialized_reader::ReadOptionsBuilder, statistics::Statistics as ParquetStatistics,
+ metadata::RowGroupMetaData, properties::WriterProperties,
+ reader::SerializedFileReader, serialized_reader::ReadOptionsBuilder,
+ statistics::Statistics as ParquetStatistics,
};
-use fmt::Debug;
-use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
-use parquet::file::properties::WriterProperties;
+use datafusion_common::Column;
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_expr::Expr;
-use tokio::task::JoinHandle;
-use tokio::{
- sync::mpsc::{channel, Receiver, Sender},
- task,
+use crate::physical_plan::stream::RecordBatchReceiverStream;
+use crate::{
+ datasource::{file_format::parquet::ChunkObjectReader, listing::PartitionedFile},
+ error::{DataFusionError, Result},
+ execution::context::{SessionState, TaskContext},
+ physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
+ physical_plan::{
+ expressions::PhysicalSortExpr,
+ file_format::{FileScanConfig, SchemaAdapter},
+ metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
+ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+ SendableRecordBatchStream, Statistics,
+ },
+ scalar::ScalarValue,
};
-use crate::datasource::listing::PartitionedFile;
-use crate::physical_plan::file_format::SchemaAdapter;
-use async_trait::async_trait;
-
use super::PartitionColumnProjector;
/// Execution plan for scanning one or more Parquet partitions
@@ -203,75 +200,51 @@ impl ExecutionPlan for ParquetExec {
partition_index: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- // because the parquet implementation is not thread-safe, it is necessary to execute
- // on a thread and communicate with channels
- let (response_tx, response_rx): (
- Sender<ArrowResult<RecordBatch>>,
- Receiver<ArrowResult<RecordBatch>>,
- ) = channel(2);
-
- let partition = self.base_config.file_groups[partition_index].clone();
- let metrics = self.metrics.clone();
let projection = match self.base_config.file_column_projection_indices() {
Some(proj) => proj,
None => (0..self.base_config.file_schema.fields().len()).collect(),
};
- let pruning_predicate = self.pruning_predicate.clone();
- let batch_size = context.session_config().batch_size;
- let limit = self.base_config.limit;
- let object_store = Arc::clone(&self.base_config.object_store);
let partition_col_proj = PartitionColumnProjector::new(
Arc::clone(&self.projected_schema),
&self.base_config.table_partition_cols,
);
- let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
-
- let join_handle = task::spawn_blocking(move || {
- let res = if projection.is_empty() {
- read_partition_no_file_columns(
- object_store.as_ref(),
- &partition,
- batch_size,
- response_tx.clone(),
- limit,
- partition_col_proj,
- )
- } else {
- read_partition(
- object_store.as_ref(),
- adapter,
- partition_index,
- &partition,
- metrics,
- &projection,
- &pruning_predicate,
- batch_size,
- response_tx.clone(),
- limit,
- partition_col_proj,
- )
- };
+ let stream = ParquetExecStream {
+ error: false,
+ partition_index,
+ metrics: self.metrics.clone(),
+ object_store: self.base_config.object_store.clone(),
+ pruning_predicate: self.pruning_predicate.clone(),
+ batch_size: context.session_config().batch_size,
+ schema: self.projected_schema.clone(),
+ projection,
+ remaining_rows: self.base_config.limit,
+ reader: None,
+ files: self.base_config.file_groups[partition_index].clone().into(),
+ projector: partition_col_proj,
+ adapter: SchemaAdapter::new(self.base_config.file_schema.clone()),
+ };
- if let Err(e) = res {
- warn!(
- "Parquet reader thread terminated due to error: {:?} for files: {:?}",
- e, partition
- );
- // Send the error back to the main thread.
- //
- // Ignore error sending (via `.ok()`) because that
- // means the receiver has been torn down (and nothing
- // cares about the errors anymore)
- send_result(&response_tx, Err(e.into())).ok();
+ // Use spawn_blocking only if running from a tokio context (#2201)
+ match tokio::runtime::Handle::try_current() {
+ Ok(handle) => {
+ let (response_tx, response_rx) = tokio::sync::mpsc::channel(2);
+ let schema = stream.schema();
+ let join_handle = handle.spawn_blocking(move || {
+ for result in stream {
+ if response_tx.blocking_send(result).is_err() {
+ break;
+ }
+ }
+ });
+ Ok(RecordBatchReceiverStream::create(
+ &schema,
+ response_rx,
+ join_handle,
+ ))
}
- });
-
- Ok(RecordBatchReceiverStream::create(
- &self.projected_schema,
- response_rx,
- join_handle,
- ))
+ Err(_) => Ok(Box::pin(stream)),
+ }
}
fn fmt_as(
@@ -312,15 +285,175 @@ impl ExecutionPlan for ParquetExec {
}
}
-fn send_result(
- response_tx: &Sender<ArrowResult<RecordBatch>>,
- result: ArrowResult<RecordBatch>,
-) -> Result<()> {
- // Note this function is running on its own blockng tokio thread so blocking here is ok.
- response_tx
- .blocking_send(result)
- .map_err(|e| DataFusionError::Execution(e.to_string()))?;
- Ok(())
+/// Special-case empty column projection
+///
+/// This is a workaround for https://github.com/apache/arrow-rs/issues/1537
+enum ProjectedReader {
+ Reader {
+ reader: ParquetRecordBatchReader,
+ },
+ EmptyProjection {
+ remaining_rows: usize,
+ batch_size: usize,
+ },
+}
+
+/// Implements [`RecordBatchStream`] for a collection of [`PartitionedFile`]
+///
+/// NB: This will perform blocking IO synchronously without yielding which may
+/// be problematic in certain contexts (e.g. a tokio runtime that also performs
+/// network IO)
+struct ParquetExecStream {
+ error: bool,
+ partition_index: usize,
+ metrics: ExecutionPlanMetricsSet,
+ object_store: Arc<dyn ObjectStore>,
+ pruning_predicate: Option<PruningPredicate>,
+ batch_size: usize,
+ schema: SchemaRef,
+ projection: Vec<usize>,
+ remaining_rows: Option<usize>,
+ reader: Option<(ProjectedReader, PartitionedFile)>,
+ files: VecDeque<PartitionedFile>,
+ projector: PartitionColumnProjector,
+ adapter: SchemaAdapter,
+}
+
+impl ParquetExecStream {
+ fn create_reader(&mut self, file: &PartitionedFile) -> Result<ProjectedReader> {
+ let file_metrics = ParquetFileMetrics::new(
+ self.partition_index,
+ file.file_meta.path(),
+ &self.metrics,
+ );
+ let object_reader = self
+ .object_store
+ .file_reader(file.file_meta.sized_file.clone())?;
+
+ let mut opt = ReadOptionsBuilder::new();
+ if let Some(pruning_predicate) = &self.pruning_predicate {
+ opt = opt.with_predicate(build_row_group_predicate(
+ pruning_predicate,
+ file_metrics,
+ ));
+ }
+
+ let file_reader = SerializedFileReader::new_with_options(
+ ChunkObjectReader(object_reader),
+ opt.build(),
+ )?;
+
+ if self.projection.is_empty() {
+ let remaining_rows = file_reader
+ .metadata()
+ .file_metadata()
+ .num_rows()
+ .try_into()
+ .expect("Row count should always be greater than or equal to 0 and less than usize::MAX");
+
+ return Ok(ProjectedReader::EmptyProjection {
+ remaining_rows,
+ batch_size: self.batch_size,
+ });
+ }
+
+ let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
+
+ let adapted_projections = self
+ .adapter
+ .map_projections(&arrow_reader.get_schema()?, &self.projection)?;
+
+ let reader = arrow_reader
+ .get_record_reader_by_columns(adapted_projections, self.batch_size)?;
+
+ Ok(ProjectedReader::Reader { reader })
+ }
+}
+
+impl Iterator for ParquetExecStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.error || matches!(self.remaining_rows, Some(0)) {
+ return None;
+ }
+
+ // TODO: Split this out into separate operators (#2079)
+ loop {
+ let (reader, file) = match self.reader.as_mut() {
+ Some(current) => current,
+ None => match self.files.pop_front() {
+ None => return None,
+ Some(file) => match self.create_reader(&file) {
+ Ok(reader) => self.reader.insert((reader, file)),
+ Err(e) => {
+ self.error = true;
+ return Some(Err(ArrowError::ExternalError(Box::new(e))));
+ }
+ },
+ },
+ };
+
+ let result = match reader {
+ ProjectedReader::Reader { reader } => reader.next().map(|result| {
+ result
+ .and_then(|batch| {
+ self.adapter
+ .adapt_batch(batch, &self.projection)
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))
+ })
+ .and_then(|batch| {
+ self.projector.project(batch, &file.partition_values)
+ })
+ }),
+ ProjectedReader::EmptyProjection {
+ remaining_rows,
+ batch_size,
+ } => {
+ let size = *remaining_rows.min(batch_size);
+ *remaining_rows -= size;
+ (size != 0).then(|| {
+ self.projector
+ .project_from_size(size, &file.partition_values)
+ })
+ }
+ };
+
+ let result = match result {
+ Some(result) => result,
+ None => {
+ self.reader = None;
+ continue;
+ }
+ };
+
+ match (&result, self.remaining_rows.as_mut()) {
+ (Ok(batch), Some(remaining_rows)) => {
+ *remaining_rows = remaining_rows.saturating_sub(batch.num_rows());
+ }
+ _ => self.error = result.is_err(),
+ }
+
+ return Some(result);
+ }
+ }
+}
+
+impl Stream for ParquetExecStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ Poll::Ready(Iterator::next(&mut *self))
+ }
+}
+
+impl RecordBatchStream for ParquetExecStream {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
}
/// Wraps parquet statistics in a way
@@ -454,141 +587,6 @@ fn build_row_group_predicate(
)
}
-fn read_partition_no_file_columns(
- object_store: &dyn ObjectStore,
- partition: &[PartitionedFile],
- batch_size: usize,
- response_tx: Sender<ArrowResult<RecordBatch>>,
- limit: Option<usize>,
- mut partition_column_projector: PartitionColumnProjector,
-) -> Result<()> {
- use parquet::file::reader::FileReader;
- let mut limit = limit.unwrap_or(usize::MAX);
-
- for partitioned_file in partition {
- if limit == 0 {
- break;
- }
- let object_reader =
- object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
- let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?;
- let mut num_rows = usize::min(limit, file_reader
- .metadata()
- .file_metadata()
- .num_rows()
- .try_into()
- .expect("Row count should always be greater than or equal to 0 and less than usize::MAX"));
- limit -= num_rows;
-
- let partition_batch = partition_column_projector
- .project_from_size(batch_size, &partitioned_file.partition_values)
- .map_err(|e| {
- let err_msg =
- format!("Error reading batch from {}: {}", partitioned_file, e);
- if let Err(send_err) = send_result(
- &response_tx,
- Err(ArrowError::ParquetError(err_msg.clone())),
- ) {
- return send_err;
- }
- DataFusionError::Execution(err_msg)
- })?;
-
- while num_rows > batch_size {
- send_result(&response_tx, Ok(partition_batch.clone()))?;
- num_rows -= batch_size;
- }
- let residual_batch = partition_batch.slice(0, num_rows);
- send_result(&response_tx, Ok(residual_batch))?;
- }
- Ok(())
-}
-
-#[allow(clippy::too_many_arguments)]
-fn read_partition(
- object_store: &dyn ObjectStore,
- schema_adapter: SchemaAdapter,
- partition_index: usize,
- partition: &[PartitionedFile],
- metrics: ExecutionPlanMetricsSet,
- projection: &[usize],
- pruning_predicate: &Option<PruningPredicate>,
- batch_size: usize,
- response_tx: Sender<ArrowResult<RecordBatch>>,
- limit: Option<usize>,
- mut partition_column_projector: PartitionColumnProjector,
-) -> Result<()> {
- let mut total_rows = 0;
- 'outer: for partitioned_file in partition {
- debug!("Reading file {}", &partitioned_file.file_meta.path());
-
- let file_metrics = ParquetFileMetrics::new(
- partition_index,
- &*partitioned_file.file_meta.path(),
- &metrics,
- );
- let object_reader =
- object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
-
- let mut opt = ReadOptionsBuilder::new();
- if let Some(pruning_predicate) = pruning_predicate {
- opt = opt.with_predicate(build_row_group_predicate(
- pruning_predicate,
- file_metrics,
- ));
- }
-
- let file_reader = SerializedFileReader::new_with_options(
- ChunkObjectReader(object_reader),
- opt.build(),
- )?;
-
- let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
- let adapted_projections =
- schema_adapter.map_projections(&arrow_reader.get_schema()?, projection)?;
-
- let mut batch_reader =
- arrow_reader.get_record_reader_by_columns(adapted_projections, batch_size)?;
- loop {
- match batch_reader.next() {
- Some(Ok(batch)) => {
- total_rows += batch.num_rows();
-
- let adapted_batch = schema_adapter.adapt_batch(batch, projection)?;
-
- let proj_batch = partition_column_projector
- .project(adapted_batch, &partitioned_file.partition_values);
-
- let send_rt = send_result(&response_tx, proj_batch);
- if send_rt.is_err() || limit.map(|l| total_rows >= l).unwrap_or(false)
- {
- break 'outer;
- }
- }
- None => {
- break;
- }
- Some(Err(e)) => {
- let err_msg =
- format!("Error reading batch from {}: {}", partitioned_file, e);
- // send_result error, if any, should not overwrite
- // the original ArrowError, so ignore it
- let _ = send_result(
- &response_tx,
- Err(ArrowError::ParquetError(err_msg.clone())),
- );
- // terminate thread with error
- return Err(DataFusionError::Execution(err_msg));
- }
- }
- }
- }
-
- // finished reading files (dropping response_tx will close
- // channel)
- Ok(())
-}
-
/// Executes a query and writes the results to a partitioned Parquet file.
pub async fn plan_to_parquet(
state: &SessionState,
@@ -614,14 +612,15 @@ pub async fn plan_to_parquet(
)?;
let task_ctx = Arc::new(TaskContext::from(state));
let stream = plan.execute(i, task_ctx).await?;
- let handle: JoinHandle<Result<()>> = task::spawn(async move {
- stream
- .map(|batch| writer.write(&batch?))
- .try_collect()
- .await
- .map_err(DataFusionError::from)?;
- writer.close().map_err(DataFusionError::from).map(|_| ())
- });
+ let handle: tokio::task::JoinHandle<Result<()>> =
+ tokio::task::spawn(async move {
+ stream
+ .map(|batch| writer.write(&batch?))
+ .try_collect()
+ .await
+ .map_err(DataFusionError::from)?;
+ writer.close().map_err(DataFusionError::from).map(|_| ())
+ });
tasks.push(handle);
}
futures::future::join_all(tasks).await;