You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/04 18:13:26 UTC

[arrow-datafusion] branch master updated: Fixed parquet path partitioning when only selecting partitioned columns (#2000)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new fa5cef8c9 Fixed parquet path partitioning when only selecting partitioned columns (#2000)
fa5cef8c9 is described below

commit fa5cef8c96e16094aa4379f1c4c4fa08b4d48e37
Author: Patrick More <34...@users.noreply.github.com>
AuthorDate: Mon Apr 4 11:13:21 2022 -0700

    Fixed parquet path partitioning when only selecting partitioned columns (#2000)
    
    * Fixed parquet path partitioning when only selecting partitioned columns
    
    * Removed unnecesary row group pruning and file metrics
    
    * Ran cargo fmt
    
    * Switched from row group level metadata to file level metadata to determine number of rows to emit
    
    * Reworked task spawning in ParquetExec::execute
    
    * Changed index based partition column generating to iterator version
    
    * Moved limit unwrap outside of loop
    
    * Fixed bug about number of rows emitted when querying only partition columns and  reuse partition record batch
    
    * Added limit logic tests for partitioned hive partitioned parquet file
    
    * Formatted code
    
    * Fixed clippy lint
    
    * Fixed other clippy lint
---
 .../core/src/physical_plan/file_format/mod.rs      |  31 ++++-
 .../core/src/physical_plan/file_format/parquet.rs  |  89 ++++++++++++--
 datafusion/core/tests/path_partition.rs            | 131 +++++++++++++++++++++
 3 files changed, 237 insertions(+), 14 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 1e8f9e2d8..cd2bb7fc1 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -310,6 +310,36 @@ impl PartitionColumnProjector {
         }
     }
 
+    // Creates a RecordBatch with values from the partition_values. Used when no non-partition values are read
+    fn project_from_size(
+        &mut self,
+        batch_size: usize,
+        partition_values: &[ScalarValue],
+    ) -> ArrowResult<RecordBatch> {
+        let expected_cols = self.projected_schema.fields().len();
+        if expected_cols != self.projected_partition_indexes.len() {
+            return Err(ArrowError::SchemaError(format!(
+                "Unexepected number of partition values, expected {} but got {}",
+                expected_cols,
+                partition_values.len()
+            )));
+        }
+        //The destination index is not needed. Since there are no non-partition columns it will simply be equivalent to
+        //the index that would be provided by .enumerate()
+        let cols = self
+            .projected_partition_indexes
+            .iter()
+            .map(|(pidx, _)| {
+                create_dict_array(
+                    &mut self.key_buffer_cache,
+                    &partition_values[*pidx],
+                    batch_size,
+                )
+            })
+            .collect();
+        RecordBatch::try_new(Arc::clone(&self.projected_schema), cols)
+    }
+
     // Transform the batch read from the file by inserting the partitioning columns
     // to the right positions as deduced from `projected_schema`
     // - file_batch: batch read from the file, with internal projection applied
@@ -329,7 +359,6 @@ impl PartitionColumnProjector {
                 file_batch.columns().len()
             )));
         }
-
         let mut cols = file_batch.columns().to_vec();
         for &(pidx, sidx) in &self.projected_partition_indexes {
             cols.insert(
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 310c5e77e..d6800af9f 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -235,19 +235,32 @@ impl ExecutionPlan for ParquetExec {
         let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
 
         let join_handle = task::spawn_blocking(move || {
-            if let Err(e) = read_partition(
-                object_store.as_ref(),
-                adapter,
-                partition_index,
-                &partition,
-                metrics,
-                &projection,
-                &pruning_predicate,
-                batch_size,
-                response_tx.clone(),
-                limit,
-                partition_col_proj,
-            ) {
+            let res = if projection.is_empty() {
+                read_partition_no_file_columns(
+                    object_store.as_ref(),
+                    &partition,
+                    batch_size,
+                    response_tx.clone(),
+                    limit,
+                    partition_col_proj,
+                )
+            } else {
+                read_partition(
+                    object_store.as_ref(),
+                    adapter,
+                    partition_index,
+                    &partition,
+                    metrics,
+                    &projection,
+                    &pruning_predicate,
+                    batch_size,
+                    response_tx.clone(),
+                    limit,
+                    partition_col_proj,
+                )
+            };
+
+            if let Err(e) = res {
                 warn!(
                     "Parquet reader thread terminated due to error: {:?} for files: {:?}",
                     e, partition
@@ -448,6 +461,56 @@ fn build_row_group_predicate(
     )
 }
 
+fn read_partition_no_file_columns(
+    object_store: &dyn ObjectStore,
+    partition: &[PartitionedFile],
+    batch_size: usize,
+    response_tx: Sender<ArrowResult<RecordBatch>>,
+    limit: Option<usize>,
+    mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+    use parquet::file::reader::FileReader;
+    let mut limit = limit.unwrap_or(usize::MAX);
+
+    for partitioned_file in partition {
+        if limit == 0 {
+            break;
+        }
+        let object_reader =
+            object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+        let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+        let mut num_rows = usize::min(limit, file_reader
+                        .metadata()
+                        .file_metadata()
+                        .num_rows()
+                        .try_into()
+                        .expect("Row count should always be greater than or equal to 0 and less than usize::MAX"));
+        limit -= num_rows;
+
+        let partition_batch = partition_column_projector
+            .project_from_size(batch_size, &partitioned_file.partition_values)
+            .map_err(|e| {
+                let err_msg =
+                    format!("Error reading batch from {}: {}", partitioned_file, e);
+                if let Err(send_err) = send_result(
+                    &response_tx,
+                    Err(ArrowError::ParquetError(err_msg.clone())),
+                ) {
+                    return send_err;
+                }
+                DataFusionError::Execution(err_msg)
+            })?;
+
+        while num_rows > batch_size {
+            send_result(&response_tx, Ok(partition_batch.clone()))?;
+            num_rows -= batch_size;
+        }
+        let residual_batch = partition_batch.slice(0, num_rows);
+        send_result(&response_tx, Ok(residual_batch))?;
+    }
+    Ok(())
+}
+
 #[allow(clippy::too_many_arguments)]
 fn read_partition(
     object_store: &dyn ObjectStore,
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index 80610baf8..2e4000552 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -38,8 +38,139 @@ use datafusion::{
     prelude::SessionContext,
     test_util::{self, arrow_test_data, parquet_test_data},
 };
+use datafusion_common::ScalarValue;
 use futures::{stream, StreamExt};
 
+#[tokio::test]
+async fn parquet_distinct_partition_col() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    register_partitioned_alltypes_parquet(
+        &ctx,
+        &[
+            "year=2021/month=09/day=09/file.parquet",
+            "year=2021/month=10/day=09/file.parquet",
+            "year=2021/month=10/day=28/file.parquet",
+        ],
+        &["year", "month", "day"],
+        "",
+        "alltypes_plain.parquet",
+    )
+    .await;
+    //Test that only selecting partition columns is possible
+    let result = ctx
+        .sql("SELECT distinct year,month,day FROM t")
+        .await?
+        .collect()
+        .await?;
+
+    let expected = vec![
+        "+------+-------+-----+",
+        "| year | month | day |",
+        "+------+-------+-----+",
+        "| 2021 | 09    | 09  |",
+        "| 2021 | 10    | 09  |",
+        "| 2021 | 10    | 28  |",
+        "+------+-------+-----+",
+    ];
+    assert_batches_sorted_eq!(expected, &result);
+    //Test that the number of rows returned by partition column scan and actually reading the parquet file are the same
+    let actual_row_count: usize = ctx
+        .sql("SELECT id from t")
+        .await?
+        .collect()
+        .await?
+        .into_iter()
+        .map(|batch| batch.num_rows())
+        .sum();
+
+    let partition_row_count: usize = ctx
+        .sql("SELECT year from t")
+        .await?
+        .collect()
+        .await?
+        .into_iter()
+        .map(|batch| batch.num_rows())
+        .sum();
+    assert_eq!(actual_row_count, partition_row_count);
+
+    //Test limit logic. 3 test cases
+    //1. limit is contained within a single partition with leftover rows
+    //2. limit is contained within a single partition without leftover rows
+    //3. limit is not contained within a single partition
+    //The id column is included to ensure that the parquet file is actually scanned.
+    let results  = ctx
+        .sql("SELECT COUNT(*) as num_rows_per_month, month, MAX(id) from t group by month order by num_rows_per_month desc")
+        .await?
+        .collect()
+        .await?;
+
+    let mut max_limit = match ScalarValue::try_from_array(results[0].column(0), 0)? {
+        ScalarValue::UInt64(Some(count)) => count,
+        s => panic!("Expected count as Int64 found {}", s),
+    };
+
+    max_limit += 1;
+    let last_batch = results
+        .last()
+        .expect("There shouled be at least one record batch returned");
+    let last_row_idx = last_batch.num_rows() - 1;
+    let mut min_limit =
+        match ScalarValue::try_from_array(last_batch.column(0), last_row_idx)? {
+            ScalarValue::UInt64(Some(count)) => count,
+            s => panic!("Expected count as Int64 found {}", s),
+        };
+
+    min_limit -= 1;
+
+    let sql_cross_partition_boundary = format!("SELECT month FROM t limit {}", max_limit);
+    let resulting_limit: u64 = ctx
+        .sql(sql_cross_partition_boundary.as_str())
+        .await?
+        .collect()
+        .await?
+        .into_iter()
+        .map(|r| r.num_rows() as u64)
+        .sum();
+
+    assert_eq!(max_limit, resulting_limit);
+
+    let sql_within_partition_boundary =
+        format!("SELECT month from t limit {}", min_limit);
+    let resulting_limit: u64 = ctx
+        .sql(sql_within_partition_boundary.as_str())
+        .await?
+        .collect()
+        .await?
+        .into_iter()
+        .map(|r| r.num_rows() as u64)
+        .sum();
+
+    assert_eq!(min_limit, resulting_limit);
+
+    let month = match ScalarValue::try_from_array(results[0].column(1), 0)? {
+        ScalarValue::Utf8(Some(month)) => month,
+        s => panic!("Expected count as Int64 found {}", s),
+    };
+
+    let sql_on_partition_boundary = format!(
+        "SELECT month from t where month = '{}' LIMIT {}",
+        month,
+        max_limit - 1
+    );
+    let resulting_limit: u64 = ctx
+        .sql(sql_on_partition_boundary.as_str())
+        .await?
+        .collect()
+        .await?
+        .into_iter()
+        .map(|r| r.num_rows() as u64)
+        .sum();
+    let partition_row_count = max_limit - 1;
+    assert_eq!(partition_row_count, resulting_limit);
+    Ok(())
+}
+
 #[tokio::test]
 async fn csv_filter_with_file_col() -> Result<()> {
     let ctx = SessionContext::new();