You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/04/21 05:49:10 UTC
[arrow-datafusion] branch master updated: Remove Parquet Empty Projection Workaround (#2289)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 110f350db Remove Parquet Empty Projection Workaround (#2289)
110f350db is described below
commit 110f350db84c5b68e5382d022a56d53e59e4e821
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Apr 21 06:49:05 2022 +0100
Remove Parquet Empty Projection Workaround (#2289)
* Remove workaround for https://github.com/apache/arrow-rs/issues/1537
* Handle batches with no columns in SchemaAdapter
---
.../core/src/physical_plan/file_format/mod.rs | 41 +++----------
.../core/src/physical_plan/file_format/parquet.rs | 70 +++++-----------------
2 files changed, 25 insertions(+), 86 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 72e2f9b87..566b4c8d4 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -44,6 +44,7 @@ use crate::{
scalar::ScalarValue,
};
use arrow::array::{new_null_array, UInt16BufferBuilder};
+use arrow::record_batch::RecordBatchOptions;
use datafusion_data_access::object_store::ObjectStore;
use lazy_static::lazy_static;
use log::info;
@@ -275,9 +276,15 @@ impl SchemaAdapter {
let projected_schema = Arc::new(self.table_schema.clone().project(projections)?);
- let merged_batch = RecordBatch::try_new(projected_schema, cols)?;
+ // Necessary to handle empty batches
+ let mut options = RecordBatchOptions::default();
+ options.row_count = Some(batch.num_rows());
- Ok(merged_batch)
+ Ok(RecordBatch::try_new_with_options(
+ projected_schema,
+ cols,
+ &options,
+ )?)
}
}
@@ -322,36 +329,6 @@ impl PartitionColumnProjector {
}
}
- // Creates a RecordBatch with values from the partition_values. Used when no non-partition values are read
- fn project_from_size(
- &mut self,
- batch_size: usize,
- partition_values: &[ScalarValue],
- ) -> ArrowResult<RecordBatch> {
- let expected_cols = self.projected_schema.fields().len();
- if expected_cols != self.projected_partition_indexes.len() {
- return Err(ArrowError::SchemaError(format!(
- "Unexepected number of partition values, expected {} but got {}",
- expected_cols,
- partition_values.len()
- )));
- }
- //The destination index is not needed. Since there are no non-partition columns it will simply be equivalent to
- //the index that would be provided by .enumerate()
- let cols = self
- .projected_partition_indexes
- .iter()
- .map(|(pidx, _)| {
- create_dict_array(
- &mut self.key_buffer_cache,
- &partition_values[*pidx],
- batch_size,
- )
- })
- .collect();
- RecordBatch::try_new(Arc::clone(&self.projected_schema), cols)
- }
-
// Transform the batch read from the file by inserting the partitioning columns
// to the right positions as deduced from `projected_schema`
// - file_batch: batch read from the file, with internal projection applied
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index cfc99a71a..dd9817f95 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -40,7 +40,6 @@ use parquet::arrow::{
arrow_reader::ParquetRecordBatchReader, ArrowReader, ArrowWriter,
ParquetFileArrowReader,
};
-use parquet::file::reader::FileReader;
use parquet::file::{
metadata::RowGroupMetaData, properties::WriterProperties,
reader::SerializedFileReader, serialized_reader::ReadOptionsBuilder,
@@ -285,19 +284,6 @@ impl ExecutionPlan for ParquetExec {
}
}
-/// Special-case empty column projection
-///
-/// This is a workaround for https://github.com/apache/arrow-rs/issues/1537
-enum ProjectedReader {
- Reader {
- reader: ParquetRecordBatchReader,
- },
- EmptyProjection {
- remaining_rows: usize,
- batch_size: usize,
- },
-}
-
/// Implements [`RecordBatchStream`] for a collection of [`PartitionedFile`]
///
/// NB: This will perform blocking IO synchronously without yielding which may
@@ -313,14 +299,17 @@ struct ParquetExecStream {
schema: SchemaRef,
projection: Vec<usize>,
remaining_rows: Option<usize>,
- reader: Option<(ProjectedReader, PartitionedFile)>,
+ reader: Option<(ParquetRecordBatchReader, PartitionedFile)>,
files: VecDeque<PartitionedFile>,
projector: PartitionColumnProjector,
adapter: SchemaAdapter,
}
impl ParquetExecStream {
- fn create_reader(&mut self, file: &PartitionedFile) -> Result<ProjectedReader> {
+ fn create_reader(
+ &mut self,
+ file: &PartitionedFile,
+ ) -> Result<ParquetRecordBatchReader> {
let file_metrics = ParquetFileMetrics::new(
self.partition_index,
file.file_meta.path(),
@@ -351,20 +340,6 @@ impl ParquetExecStream {
opt.build(),
)?;
- if self.projection.is_empty() {
- let remaining_rows = file_reader
- .metadata()
- .file_metadata()
- .num_rows()
- .try_into()
- .expect("Row count should always be greater than or equal to 0 and less than usize::MAX");
-
- return Ok(ProjectedReader::EmptyProjection {
- remaining_rows,
- batch_size: self.batch_size,
- });
- }
-
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let adapted_projections = self
@@ -374,7 +349,7 @@ impl ParquetExecStream {
let reader = arrow_reader
.get_record_reader_by_columns(adapted_projections, self.batch_size)?;
- Ok(ProjectedReader::Reader { reader })
+ Ok(reader)
}
}
@@ -402,30 +377,17 @@ impl Iterator for ParquetExecStream {
},
};
- let result = match reader {
- ProjectedReader::Reader { reader } => reader.next().map(|result| {
- result
- .and_then(|batch| {
- self.adapter
- .adapt_batch(batch, &self.projection)
- .map_err(|e| ArrowError::ExternalError(Box::new(e)))
- })
- .and_then(|batch| {
- self.projector.project(batch, &file.partition_values)
- })
- }),
- ProjectedReader::EmptyProjection {
- remaining_rows,
- batch_size,
- } => {
- let size = *remaining_rows.min(batch_size);
- *remaining_rows -= size;
- (size != 0).then(|| {
- self.projector
- .project_from_size(size, &file.partition_values)
+ let result = reader.next().map(|result| {
+ result
+ .and_then(|batch| {
+ self.adapter
+ .adapt_batch(batch, &self.projection)
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))
})
- }
- };
+ .and_then(|batch| {
+ self.projector.project(batch, &file.partition_values)
+ })
+ });
let result = match result {
Some(result) => result,