You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/12/23 23:16:21 UTC
[arrow] branch master updated: ARROW-11014: [Rust] [DataFusion] Use
correct statistics for ParquetExec
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new f9efa02 ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec
f9efa02 is described below
commit f9efa02d37ee89da8b256d536a1f295f1c56df2f
Author: Andy Grove <an...@gmail.com>
AuthorDate: Wed Dec 23 16:15:05 2020 -0700
ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec
ParquetExec represents multiple files but we were calculating statistics based on the first file.
I stumbled across this when working on https://issues.apache.org/jira/browse/ARROW-10995
Closes #8992 from andygrove/ARROW-11014
Authored-by: Andy Grove <an...@gmail.com>
Signed-off-by: Andy Grove <an...@gmail.com>
---
rust/datafusion/src/datasource/datasource.rs | 2 +-
rust/datafusion/src/datasource/parquet.rs | 23 +----
rust/datafusion/src/physical_plan/parquet.rs | 137 ++++++++++++++++++++++-----
3 files changed, 116 insertions(+), 46 deletions(-)
diff --git a/rust/datafusion/src/datasource/datasource.rs b/rust/datafusion/src/datasource/datasource.rs
index 93877f9..e6590e8 100644
--- a/rust/datafusion/src/datasource/datasource.rs
+++ b/rust/datafusion/src/datasource/datasource.rs
@@ -27,7 +27,7 @@ use crate::physical_plan::ExecutionPlan;
/// This table statistics are estimates.
/// It can not be used directly in the precise compute
-#[derive(Clone, Default)]
+#[derive(Debug, Clone, Default)]
pub struct Statistics {
/// The number of table rows
pub num_rows: Option<usize>,
diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs
index 3666042..5986908 100644
--- a/rust/datafusion/src/datasource/parquet.rs
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -22,7 +22,6 @@ use std::string::String;
use std::sync::Arc;
use arrow::datatypes::*;
-use parquet::file::metadata::RowGroupMetaData;
use crate::datasource::datasource::Statistics;
use crate::datasource::TableProvider;
@@ -41,28 +40,12 @@ pub struct ParquetTable {
impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path.
pub fn try_new(path: &str) -> Result<Self> {
- let parquet_exec = ParquetExec::try_new(path, None, 0)?;
+ let parquet_exec = ParquetExec::try_from_path(path, None, 0)?;
let schema = parquet_exec.schema();
-
- let metadata = parquet_exec.metadata();
- let num_rows: i64 = metadata
- .row_groups()
- .iter()
- .map(RowGroupMetaData::num_rows)
- .sum();
- let total_byte_size: i64 = metadata
- .row_groups()
- .iter()
- .map(RowGroupMetaData::total_byte_size)
- .sum();
-
Ok(Self {
path: path.to_string(),
schema,
- statistics: Statistics {
- num_rows: Some(num_rows as usize),
- total_byte_size: Some(total_byte_size as usize),
- },
+ statistics: parquet_exec.statistics().to_owned(),
})
}
}
@@ -85,7 +68,7 @@ impl TableProvider for ParquetTable {
batch_size: usize,
_filters: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(Arc::new(ParquetExec::try_new(
+ Ok(Arc::new(ParquetExec::try_from_path(
&self.path,
projection.clone(),
batch_size,
diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs
index 530dd1b..d11fba2 100644
--- a/rust/datafusion/src/physical_plan/parquet.rs
+++ b/rust/datafusion/src/physical_plan/parquet.rs
@@ -30,62 +30,133 @@ use crate::physical_plan::{common, Partitioning};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
-use parquet::file::metadata::ParquetMetaData;
use parquet::file::reader::SerializedFileReader;
use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
use fmt::Debug;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
+use crate::datasource::datasource::Statistics;
use async_trait::async_trait;
use futures::stream::Stream;
-/// Execution plan for scanning a Parquet file
+/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
pub struct ParquetExec {
- /// Path to directory containing partitioned Parquet files with the same schema
- filenames: Vec<String>,
+ /// Parquet partitions to read
+ partitions: Vec<ParquetPartition>,
/// Schema after projection is applied
schema: SchemaRef,
/// Projection for which columns to load
projection: Vec<usize>,
/// Batch size
batch_size: usize,
- /// Parquet metadata
- metadata: ParquetMetaData,
+ /// Statistics for the data set (sum of statistics for all partitions)
+ statistics: Statistics,
+}
+
+/// Represents one partition of a Parquet data set and this currently means one Parquet file.
+///
+/// In the future it would be good to support subsets of files based on ranges of row groups
+/// so that we can better parallelize reads of large files across available cores (see
+/// https://issues.apache.org/jira/browse/ARROW-10995).
+///
+/// We may also want to support reading Parquet files that are partitioned based on a key and
+/// in this case we would want this partition struct to represent multiple files for a given
+/// partition key (see https://issues.apache.org/jira/browse/ARROW-11019).
+#[derive(Debug, Clone)]
+pub struct ParquetPartition {
+ /// The Parquet filename for this partition
+ filename: String,
+ /// Statistics for this partition
+ statistics: Statistics,
}
impl ParquetExec {
- /// Create a new Parquet reader execution plan
- pub fn try_new(
+ /// Create a new Parquet reader execution plan based on the specified Parquet filename or
+ /// directory containing Parquet files
+ pub fn try_from_path(
path: &str,
projection: Option<Vec<usize>>,
batch_size: usize,
) -> Result<Self> {
+ // build a list of filenames from the specified path, which could be a single file or
+ // a directory containing one or more parquet files
let mut filenames: Vec<String> = vec![];
common::build_file_list(path, &mut filenames, ".parquet")?;
if filenames.is_empty() {
- Err(DataFusionError::Plan("No files found".to_string()))
+ Err(DataFusionError::Plan(format!(
+ "No Parquet files found at path {}",
+ path
+ )))
} else {
- let file = File::open(&filenames[0])?;
+ let filenames = filenames
+ .iter()
+ .map(|filename| filename.as_str())
+ .collect::<Vec<&str>>();
+ Self::try_from_files(&filenames, projection, batch_size)
+ }
+ }
+
+ /// Create a new Parquet reader execution plan based on the specified list of Parquet
+ /// files
+ pub fn try_from_files(
+ filenames: &[&str],
+ projection: Option<Vec<usize>>,
+ batch_size: usize,
+ ) -> Result<Self> {
+ // build a list of Parquet partitions with statistics and gather all unique schemas
+ // used in this data set
+ let mut schemas: Vec<Schema> = vec![];
+ let mut partitions = Vec::with_capacity(filenames.len());
+ for filename in filenames {
+ let file = File::open(filename)?;
let file_reader = Arc::new(SerializedFileReader::new(file)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+ let meta_data = arrow_reader.get_metadata();
+ // collect all the unique schemas in this data set
let schema = arrow_reader.get_schema()?;
- let metadata = arrow_reader.get_metadata();
+ if schemas.is_empty() || schema != schemas[0] {
+ schemas.push(schema);
+ }
+ let mut num_rows = 0;
+ let mut total_byte_size = 0;
+ for i in 0..meta_data.num_row_groups() {
+ let row_group_meta = meta_data.row_group(i);
+ num_rows += row_group_meta.num_rows();
+ total_byte_size += row_group_meta.total_byte_size();
+ }
+ let statistics = Statistics {
+ num_rows: Some(num_rows as usize),
+ total_byte_size: Some(total_byte_size as usize),
+ };
+ partitions.push(ParquetPartition {
+ filename: filename.to_owned().to_string(),
+ statistics,
+ });
+ }
- Ok(Self::new(
- filenames, schema, projection, batch_size, metadata,
- ))
+ // we currently get the schema information from the first file rather than do
+ // schema merging and this is a limitation.
+ // See https://issues.apache.org/jira/browse/ARROW-11017
+ if schemas.len() > 1 {
+ return Err(DataFusionError::Plan(format!(
+ "The Parquet files have {} different schemas and DataFusion does \
+ not yet support schema merging",
+ schemas.len()
+ )));
}
+ let schema = schemas[0].clone();
+
+ Ok(Self::new(partitions, schema, projection, batch_size))
}
- /// Create a new Parquet reader execution plan with provided files and schema
+ /// Create a new Parquet reader execution plan with provided partitions and schema
pub fn new(
- filenames: Vec<String>,
+ partitions: Vec<ParquetPartition>,
schema: Schema,
projection: Option<Vec<usize>>,
batch_size: usize,
- metadata: ParquetMetaData,
) -> Self {
let projection = match projection {
Some(p) => p,
@@ -99,18 +170,33 @@ impl ParquetExec {
.collect(),
);
+ // sum the statistics
+ let mut num_rows: Option<usize> = None;
+ let mut total_byte_size: Option<usize> = None;
+ for part in &partitions {
+ if let Some(n) = part.statistics.num_rows {
+ num_rows = Some(num_rows.unwrap_or(0) + n)
+ }
+ if let Some(n) = part.statistics.total_byte_size {
+ total_byte_size = Some(total_byte_size.unwrap_or(0) + n)
+ }
+ }
+ let statistics = Statistics {
+ num_rows,
+ total_byte_size,
+ };
Self {
- filenames,
+ partitions,
schema: Arc::new(projected_schema),
projection,
batch_size,
- metadata,
+ statistics,
}
}
- /// Expose the Parquet specific metadata
- pub fn metadata(&self) -> ParquetMetaData {
- self.metadata.clone()
+ /// Provide access to the statistics
+ pub fn statistics(&self) -> &Statistics {
+ &self.statistics
}
}
@@ -132,7 +218,7 @@ impl ExecutionPlan for ParquetExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(self.filenames.len())
+ Partitioning::UnknownPartitioning(self.partitions.len())
}
fn with_new_children(
@@ -157,7 +243,7 @@ impl ExecutionPlan for ParquetExec {
Receiver<Option<ArrowResult<RecordBatch>>>,
) = bounded(2);
- let filename = self.filenames[partition].clone();
+ let filename = self.partitions[partition].filename.clone();
let projection = self.projection.clone();
let batch_size = self.batch_size;
@@ -256,7 +342,8 @@ mod tests {
let testdata =
env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined");
let filename = format!("{}/alltypes_plain.parquet", testdata);
- let parquet_exec = ParquetExec::try_new(&filename, Some(vec![0, 1, 2]), 1024)?;
+ let parquet_exec =
+ ParquetExec::try_from_path(&filename, Some(vec![0, 1, 2]), 1024)?;
assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
let mut results = parquet_exec.execute(0).await?;