You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/04/21 05:49:10 UTC

[arrow-datafusion] branch master updated: Remove Parquet Empty Projection Workaround (#2289)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 110f350db Remove Parquet Empty Projection Workaround (#2289)
110f350db is described below

commit 110f350db84c5b68e5382d022a56d53e59e4e821
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Apr 21 06:49:05 2022 +0100

    Remove Parquet Empty Projection Workaround (#2289)
    
    * Remove workaround for https://github.com/apache/arrow-rs/issues/1537
    
    * Handle batches with no columns in SchemaAdapter
---
 .../core/src/physical_plan/file_format/mod.rs      | 41 +++----------
 .../core/src/physical_plan/file_format/parquet.rs  | 70 +++++-----------------
 2 files changed, 25 insertions(+), 86 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 72e2f9b87..566b4c8d4 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -44,6 +44,7 @@ use crate::{
     scalar::ScalarValue,
 };
 use arrow::array::{new_null_array, UInt16BufferBuilder};
+use arrow::record_batch::RecordBatchOptions;
 use datafusion_data_access::object_store::ObjectStore;
 use lazy_static::lazy_static;
 use log::info;
@@ -275,9 +276,15 @@ impl SchemaAdapter {
 
         let projected_schema = Arc::new(self.table_schema.clone().project(projections)?);
 
-        let merged_batch = RecordBatch::try_new(projected_schema, cols)?;
+        // Necessary to handle empty batches
+        let mut options = RecordBatchOptions::default();
+        options.row_count = Some(batch.num_rows());
 
-        Ok(merged_batch)
+        Ok(RecordBatch::try_new_with_options(
+            projected_schema,
+            cols,
+            &options,
+        )?)
     }
 }
 
@@ -322,36 +329,6 @@ impl PartitionColumnProjector {
         }
     }
 
-    // Creates a RecordBatch with values from the partition_values. Used when no non-partition values are read
-    fn project_from_size(
-        &mut self,
-        batch_size: usize,
-        partition_values: &[ScalarValue],
-    ) -> ArrowResult<RecordBatch> {
-        let expected_cols = self.projected_schema.fields().len();
-        if expected_cols != self.projected_partition_indexes.len() {
-            return Err(ArrowError::SchemaError(format!(
-                "Unexepected number of partition values, expected {} but got {}",
-                expected_cols,
-                partition_values.len()
-            )));
-        }
-        //The destination index is not needed. Since there are no non-partition columns it will simply be equivalent to
-        //the index that would be provided by .enumerate()
-        let cols = self
-            .projected_partition_indexes
-            .iter()
-            .map(|(pidx, _)| {
-                create_dict_array(
-                    &mut self.key_buffer_cache,
-                    &partition_values[*pidx],
-                    batch_size,
-                )
-            })
-            .collect();
-        RecordBatch::try_new(Arc::clone(&self.projected_schema), cols)
-    }
-
     // Transform the batch read from the file by inserting the partitioning columns
     // to the right positions as deduced from `projected_schema`
     // - file_batch: batch read from the file, with internal projection applied
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index cfc99a71a..dd9817f95 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -40,7 +40,6 @@ use parquet::arrow::{
     arrow_reader::ParquetRecordBatchReader, ArrowReader, ArrowWriter,
     ParquetFileArrowReader,
 };
-use parquet::file::reader::FileReader;
 use parquet::file::{
     metadata::RowGroupMetaData, properties::WriterProperties,
     reader::SerializedFileReader, serialized_reader::ReadOptionsBuilder,
@@ -285,19 +284,6 @@ impl ExecutionPlan for ParquetExec {
     }
 }
 
-/// Special-case empty column projection
-///
-/// This is a workaround for https://github.com/apache/arrow-rs/issues/1537
-enum ProjectedReader {
-    Reader {
-        reader: ParquetRecordBatchReader,
-    },
-    EmptyProjection {
-        remaining_rows: usize,
-        batch_size: usize,
-    },
-}
-
 /// Implements [`RecordBatchStream`] for a collection of [`PartitionedFile`]
 ///
 /// NB: This will perform blocking IO synchronously without yielding which may
@@ -313,14 +299,17 @@ struct ParquetExecStream {
     schema: SchemaRef,
     projection: Vec<usize>,
     remaining_rows: Option<usize>,
-    reader: Option<(ProjectedReader, PartitionedFile)>,
+    reader: Option<(ParquetRecordBatchReader, PartitionedFile)>,
     files: VecDeque<PartitionedFile>,
     projector: PartitionColumnProjector,
     adapter: SchemaAdapter,
 }
 
 impl ParquetExecStream {
-    fn create_reader(&mut self, file: &PartitionedFile) -> Result<ProjectedReader> {
+    fn create_reader(
+        &mut self,
+        file: &PartitionedFile,
+    ) -> Result<ParquetRecordBatchReader> {
         let file_metrics = ParquetFileMetrics::new(
             self.partition_index,
             file.file_meta.path(),
@@ -351,20 +340,6 @@ impl ParquetExecStream {
             opt.build(),
         )?;
 
-        if self.projection.is_empty() {
-            let remaining_rows = file_reader
-                .metadata()
-                .file_metadata()
-                .num_rows()
-                .try_into()
-                .expect("Row count should always be greater than or equal to 0 and less than usize::MAX");
-
-            return Ok(ProjectedReader::EmptyProjection {
-                remaining_rows,
-                batch_size: self.batch_size,
-            });
-        }
-
         let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
 
         let adapted_projections = self
@@ -374,7 +349,7 @@ impl ParquetExecStream {
         let reader = arrow_reader
             .get_record_reader_by_columns(adapted_projections, self.batch_size)?;
 
-        Ok(ProjectedReader::Reader { reader })
+        Ok(reader)
     }
 }
 
@@ -402,30 +377,17 @@ impl Iterator for ParquetExecStream {
                 },
             };
 
-            let result = match reader {
-                ProjectedReader::Reader { reader } => reader.next().map(|result| {
-                    result
-                        .and_then(|batch| {
-                            self.adapter
-                                .adapt_batch(batch, &self.projection)
-                                .map_err(|e| ArrowError::ExternalError(Box::new(e)))
-                        })
-                        .and_then(|batch| {
-                            self.projector.project(batch, &file.partition_values)
-                        })
-                }),
-                ProjectedReader::EmptyProjection {
-                    remaining_rows,
-                    batch_size,
-                } => {
-                    let size = *remaining_rows.min(batch_size);
-                    *remaining_rows -= size;
-                    (size != 0).then(|| {
-                        self.projector
-                            .project_from_size(size, &file.partition_values)
+            let result = reader.next().map(|result| {
+                result
+                    .and_then(|batch| {
+                        self.adapter
+                            .adapt_batch(batch, &self.projection)
+                            .map_err(|e| ArrowError::ExternalError(Box::new(e)))
                     })
-                }
-            };
+                    .and_then(|batch| {
+                        self.projector.project(batch, &file.partition_values)
+                    })
+            });
 
             let result = match result {
                 Some(result) => result,