You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/04 18:13:26 UTC
[arrow-datafusion] branch master updated: Fixed parquet path partitioning when only selecting partitioned columns (#2000)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new fa5cef8c9 Fixed parquet path partitioning when only selecting partitioned columns (#2000)
fa5cef8c9 is described below
commit fa5cef8c96e16094aa4379f1c4c4fa08b4d48e37
Author: Patrick More <34...@users.noreply.github.com>
AuthorDate: Mon Apr 4 11:13:21 2022 -0700
Fixed parquet path partitioning when only selecting partitioned columns (#2000)
* Fixed parquet path partitioning when only selecting partitioned columns
* Removed unnecesary row group pruning and file metrics
* Ran cargo fmt
* Switched from row group level metadata to file level metadata to determine number of rows to emit
* Reworked task spawning in ParquetExec::execute
* Changed index based partition column generating to iterator version
* Moved limit unwrap outside of loop
* Fixed bug about number of rows emitted when querying only partition columns and reuse partition record batch
* Added limit logic tests for partitioned hive partitioned parquet file
* Formatted code
* Fixed clippy lint
* Fixed other clippy lint
---
.../core/src/physical_plan/file_format/mod.rs | 31 ++++-
.../core/src/physical_plan/file_format/parquet.rs | 89 ++++++++++++--
datafusion/core/tests/path_partition.rs | 131 +++++++++++++++++++++
3 files changed, 237 insertions(+), 14 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 1e8f9e2d8..cd2bb7fc1 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -310,6 +310,36 @@ impl PartitionColumnProjector {
}
}
+ // Creates a RecordBatch with values from the partition_values. Used when no non-partition values are read
+ fn project_from_size(
+ &mut self,
+ batch_size: usize,
+ partition_values: &[ScalarValue],
+ ) -> ArrowResult<RecordBatch> {
+ let expected_cols = self.projected_schema.fields().len();
+ if expected_cols != self.projected_partition_indexes.len() {
+ return Err(ArrowError::SchemaError(format!(
+ "Unexepected number of partition values, expected {} but got {}",
+ expected_cols,
+ partition_values.len()
+ )));
+ }
+ //The destination index is not needed. Since there are no non-partition columns it will simply be equivalent to
+ //the index that would be provided by .enumerate()
+ let cols = self
+ .projected_partition_indexes
+ .iter()
+ .map(|(pidx, _)| {
+ create_dict_array(
+ &mut self.key_buffer_cache,
+ &partition_values[*pidx],
+ batch_size,
+ )
+ })
+ .collect();
+ RecordBatch::try_new(Arc::clone(&self.projected_schema), cols)
+ }
+
// Transform the batch read from the file by inserting the partitioning columns
// to the right positions as deduced from `projected_schema`
// - file_batch: batch read from the file, with internal projection applied
@@ -329,7 +359,6 @@ impl PartitionColumnProjector {
file_batch.columns().len()
)));
}
-
let mut cols = file_batch.columns().to_vec();
for &(pidx, sidx) in &self.projected_partition_indexes {
cols.insert(
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 310c5e77e..d6800af9f 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -235,19 +235,32 @@ impl ExecutionPlan for ParquetExec {
let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
let join_handle = task::spawn_blocking(move || {
- if let Err(e) = read_partition(
- object_store.as_ref(),
- adapter,
- partition_index,
- &partition,
- metrics,
- &projection,
- &pruning_predicate,
- batch_size,
- response_tx.clone(),
- limit,
- partition_col_proj,
- ) {
+ let res = if projection.is_empty() {
+ read_partition_no_file_columns(
+ object_store.as_ref(),
+ &partition,
+ batch_size,
+ response_tx.clone(),
+ limit,
+ partition_col_proj,
+ )
+ } else {
+ read_partition(
+ object_store.as_ref(),
+ adapter,
+ partition_index,
+ &partition,
+ metrics,
+ &projection,
+ &pruning_predicate,
+ batch_size,
+ response_tx.clone(),
+ limit,
+ partition_col_proj,
+ )
+ };
+
+ if let Err(e) = res {
warn!(
"Parquet reader thread terminated due to error: {:?} for files: {:?}",
e, partition
@@ -448,6 +461,56 @@ fn build_row_group_predicate(
)
}
+fn read_partition_no_file_columns(
+ object_store: &dyn ObjectStore,
+ partition: &[PartitionedFile],
+ batch_size: usize,
+ response_tx: Sender<ArrowResult<RecordBatch>>,
+ limit: Option<usize>,
+ mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+ use parquet::file::reader::FileReader;
+ let mut limit = limit.unwrap_or(usize::MAX);
+
+ for partitioned_file in partition {
+ if limit == 0 {
+ break;
+ }
+ let object_reader =
+ object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+ let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+ let mut num_rows = usize::min(limit, file_reader
+ .metadata()
+ .file_metadata()
+ .num_rows()
+ .try_into()
+ .expect("Row count should always be greater than or equal to 0 and less than usize::MAX"));
+ limit -= num_rows;
+
+ let partition_batch = partition_column_projector
+ .project_from_size(batch_size, &partitioned_file.partition_values)
+ .map_err(|e| {
+ let err_msg =
+ format!("Error reading batch from {}: {}", partitioned_file, e);
+ if let Err(send_err) = send_result(
+ &response_tx,
+ Err(ArrowError::ParquetError(err_msg.clone())),
+ ) {
+ return send_err;
+ }
+ DataFusionError::Execution(err_msg)
+ })?;
+
+ while num_rows > batch_size {
+ send_result(&response_tx, Ok(partition_batch.clone()))?;
+ num_rows -= batch_size;
+ }
+ let residual_batch = partition_batch.slice(0, num_rows);
+ send_result(&response_tx, Ok(residual_batch))?;
+ }
+ Ok(())
+}
+
#[allow(clippy::too_many_arguments)]
fn read_partition(
object_store: &dyn ObjectStore,
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index 80610baf8..2e4000552 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -38,8 +38,139 @@ use datafusion::{
prelude::SessionContext,
test_util::{self, arrow_test_data, parquet_test_data},
};
+use datafusion_common::ScalarValue;
use futures::{stream, StreamExt};
+#[tokio::test]
+async fn parquet_distinct_partition_col() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ register_partitioned_alltypes_parquet(
+ &ctx,
+ &[
+ "year=2021/month=09/day=09/file.parquet",
+ "year=2021/month=10/day=09/file.parquet",
+ "year=2021/month=10/day=28/file.parquet",
+ ],
+ &["year", "month", "day"],
+ "",
+ "alltypes_plain.parquet",
+ )
+ .await;
+ //Test that only selecting partition columns is possible
+ let result = ctx
+ .sql("SELECT distinct year,month,day FROM t")
+ .await?
+ .collect()
+ .await?;
+
+ let expected = vec![
+ "+------+-------+-----+",
+ "| year | month | day |",
+ "+------+-------+-----+",
+ "| 2021 | 09 | 09 |",
+ "| 2021 | 10 | 09 |",
+ "| 2021 | 10 | 28 |",
+ "+------+-------+-----+",
+ ];
+ assert_batches_sorted_eq!(expected, &result);
+ //Test that the number of rows returned by partition column scan and actually reading the parquet file are the same
+ let actual_row_count: usize = ctx
+ .sql("SELECT id from t")
+ .await?
+ .collect()
+ .await?
+ .into_iter()
+ .map(|batch| batch.num_rows())
+ .sum();
+
+ let partition_row_count: usize = ctx
+ .sql("SELECT year from t")
+ .await?
+ .collect()
+ .await?
+ .into_iter()
+ .map(|batch| batch.num_rows())
+ .sum();
+ assert_eq!(actual_row_count, partition_row_count);
+
+ //Test limit logic. 3 test cases
+ //1. limit is contained within a single partition with leftover rows
+ //2. limit is contained within a single partition without leftover rows
+ //3. limit is not contained within a single partition
+ //The id column is included to ensure that the parquet file is actually scanned.
+ let results = ctx
+ .sql("SELECT COUNT(*) as num_rows_per_month, month, MAX(id) from t group by month order by num_rows_per_month desc")
+ .await?
+ .collect()
+ .await?;
+
+ let mut max_limit = match ScalarValue::try_from_array(results[0].column(0), 0)? {
+ ScalarValue::UInt64(Some(count)) => count,
+ s => panic!("Expected count as Int64 found {}", s),
+ };
+
+ max_limit += 1;
+ let last_batch = results
+ .last()
+ .expect("There shouled be at least one record batch returned");
+ let last_row_idx = last_batch.num_rows() - 1;
+ let mut min_limit =
+ match ScalarValue::try_from_array(last_batch.column(0), last_row_idx)? {
+ ScalarValue::UInt64(Some(count)) => count,
+ s => panic!("Expected count as Int64 found {}", s),
+ };
+
+ min_limit -= 1;
+
+ let sql_cross_partition_boundary = format!("SELECT month FROM t limit {}", max_limit);
+ let resulting_limit: u64 = ctx
+ .sql(sql_cross_partition_boundary.as_str())
+ .await?
+ .collect()
+ .await?
+ .into_iter()
+ .map(|r| r.num_rows() as u64)
+ .sum();
+
+ assert_eq!(max_limit, resulting_limit);
+
+ let sql_within_partition_boundary =
+ format!("SELECT month from t limit {}", min_limit);
+ let resulting_limit: u64 = ctx
+ .sql(sql_within_partition_boundary.as_str())
+ .await?
+ .collect()
+ .await?
+ .into_iter()
+ .map(|r| r.num_rows() as u64)
+ .sum();
+
+ assert_eq!(min_limit, resulting_limit);
+
+ let month = match ScalarValue::try_from_array(results[0].column(1), 0)? {
+ ScalarValue::Utf8(Some(month)) => month,
+ s => panic!("Expected count as Int64 found {}", s),
+ };
+
+ let sql_on_partition_boundary = format!(
+ "SELECT month from t where month = '{}' LIMIT {}",
+ month,
+ max_limit - 1
+ );
+ let resulting_limit: u64 = ctx
+ .sql(sql_on_partition_boundary.as_str())
+ .await?
+ .collect()
+ .await?
+ .into_iter()
+ .map(|r| r.num_rows() as u64)
+ .sum();
+ let partition_row_count = max_limit - 1;
+ assert_eq!(partition_row_count, resulting_limit);
+ Ok(())
+}
+
#[tokio::test]
async fn csv_filter_with_file_col() -> Result<()> {
let ctx = SessionContext::new();