You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/12/23 23:16:21 UTC

[arrow] branch master updated: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new f9efa02  ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec
f9efa02 is described below

commit f9efa02d37ee89da8b256d536a1f295f1c56df2f
Author: Andy Grove <an...@gmail.com>
AuthorDate: Wed Dec 23 16:15:05 2020 -0700

    ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec
    
    ParquetExec represents multiple files but we were calculating statistics based on the first file.
    
    I stumbled across this when working on https://issues.apache.org/jira/browse/ARROW-10995
    
    Closes #8992 from andygrove/ARROW-11014
    
    Authored-by: Andy Grove <an...@gmail.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 rust/datafusion/src/datasource/datasource.rs |   2 +-
 rust/datafusion/src/datasource/parquet.rs    |  23 +----
 rust/datafusion/src/physical_plan/parquet.rs | 137 ++++++++++++++++++++++-----
 3 files changed, 116 insertions(+), 46 deletions(-)

diff --git a/rust/datafusion/src/datasource/datasource.rs b/rust/datafusion/src/datasource/datasource.rs
index 93877f9..e6590e8 100644
--- a/rust/datafusion/src/datasource/datasource.rs
+++ b/rust/datafusion/src/datasource/datasource.rs
@@ -27,7 +27,7 @@ use crate::physical_plan::ExecutionPlan;
 
 /// This table statistics are estimates.
 /// It can not be used directly in the precise compute
-#[derive(Clone, Default)]
+#[derive(Debug, Clone, Default)]
 pub struct Statistics {
     /// The number of table rows
     pub num_rows: Option<usize>,
diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs
index 3666042..5986908 100644
--- a/rust/datafusion/src/datasource/parquet.rs
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -22,7 +22,6 @@ use std::string::String;
 use std::sync::Arc;
 
 use arrow::datatypes::*;
-use parquet::file::metadata::RowGroupMetaData;
 
 use crate::datasource::datasource::Statistics;
 use crate::datasource::TableProvider;
@@ -41,28 +40,12 @@ pub struct ParquetTable {
 impl ParquetTable {
     /// Attempt to initialize a new `ParquetTable` from a file path.
     pub fn try_new(path: &str) -> Result<Self> {
-        let parquet_exec = ParquetExec::try_new(path, None, 0)?;
+        let parquet_exec = ParquetExec::try_from_path(path, None, 0)?;
         let schema = parquet_exec.schema();
-
-        let metadata = parquet_exec.metadata();
-        let num_rows: i64 = metadata
-            .row_groups()
-            .iter()
-            .map(RowGroupMetaData::num_rows)
-            .sum();
-        let total_byte_size: i64 = metadata
-            .row_groups()
-            .iter()
-            .map(RowGroupMetaData::total_byte_size)
-            .sum();
-
         Ok(Self {
             path: path.to_string(),
             schema,
-            statistics: Statistics {
-                num_rows: Some(num_rows as usize),
-                total_byte_size: Some(total_byte_size as usize),
-            },
+            statistics: parquet_exec.statistics().to_owned(),
         })
     }
 }
@@ -85,7 +68,7 @@ impl TableProvider for ParquetTable {
         batch_size: usize,
         _filters: &[Expr],
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(ParquetExec::try_new(
+        Ok(Arc::new(ParquetExec::try_from_path(
             &self.path,
             projection.clone(),
             batch_size,
diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs
index 530dd1b..d11fba2 100644
--- a/rust/datafusion/src/physical_plan/parquet.rs
+++ b/rust/datafusion/src/physical_plan/parquet.rs
@@ -30,62 +30,133 @@ use crate::physical_plan::{common, Partitioning};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
-use parquet::file::metadata::ParquetMetaData;
 use parquet::file::reader::SerializedFileReader;
 
 use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
 use fmt::Debug;
 use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
 
+use crate::datasource::datasource::Statistics;
 use async_trait::async_trait;
 use futures::stream::Stream;
 
-/// Execution plan for scanning a Parquet file
+/// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
-    /// Path to directory containing partitioned Parquet files with the same schema
-    filenames: Vec<String>,
+    /// Parquet partitions to read
+    partitions: Vec<ParquetPartition>,
     /// Schema after projection is applied
     schema: SchemaRef,
     /// Projection for which columns to load
     projection: Vec<usize>,
     /// Batch size
     batch_size: usize,
-    /// Parquet metadata
-    metadata: ParquetMetaData,
+    /// Statistics for the data set (sum of statistics for all partitions)
+    statistics: Statistics,
+}
+
+/// Represents one partition of a Parquet data set and this currently means one Parquet file.
+///
+/// In the future it would be good to support subsets of files based on ranges of row groups
+/// so that we can better parallelize reads of large files across available cores (see
+/// https://issues.apache.org/jira/browse/ARROW-10995).
+///
+/// We may also want to support reading Parquet files that are partitioned based on a key and
+/// in this case we would want this partition struct to represent multiple files for a given
+/// partition key (see https://issues.apache.org/jira/browse/ARROW-11019).
+#[derive(Debug, Clone)]
+pub struct ParquetPartition {
+    /// The Parquet filename for this partition
+    filename: String,
+    /// Statistics for this partition
+    statistics: Statistics,
 }
 
 impl ParquetExec {
-    /// Create a new Parquet reader execution plan
-    pub fn try_new(
+    /// Create a new Parquet reader execution plan based on the specified Parquet filename or
+    /// directory containing Parquet files
+    pub fn try_from_path(
         path: &str,
         projection: Option<Vec<usize>>,
         batch_size: usize,
     ) -> Result<Self> {
+        // build a list of filenames from the specified path, which could be a single file or
+        // a directory containing one or more parquet files
         let mut filenames: Vec<String> = vec![];
         common::build_file_list(path, &mut filenames, ".parquet")?;
         if filenames.is_empty() {
-            Err(DataFusionError::Plan("No files found".to_string()))
+            Err(DataFusionError::Plan(format!(
+                "No Parquet files found at path {}",
+                path
+            )))
         } else {
-            let file = File::open(&filenames[0])?;
+            let filenames = filenames
+                .iter()
+                .map(|filename| filename.as_str())
+                .collect::<Vec<&str>>();
+            Self::try_from_files(&filenames, projection, batch_size)
+        }
+    }
+
+    /// Create a new Parquet reader execution plan based on the specified list of Parquet
+    /// files
+    pub fn try_from_files(
+        filenames: &[&str],
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+    ) -> Result<Self> {
+        // build a list of Parquet partitions with statistics and gather all unique schemas
+        // used in this data set
+        let mut schemas: Vec<Schema> = vec![];
+        let mut partitions = Vec::with_capacity(filenames.len());
+        for filename in filenames {
+            let file = File::open(filename)?;
             let file_reader = Arc::new(SerializedFileReader::new(file)?);
             let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+            let meta_data = arrow_reader.get_metadata();
+            // collect all the unique schemas in this data set
             let schema = arrow_reader.get_schema()?;
-            let metadata = arrow_reader.get_metadata();
+            if schemas.is_empty() || schema != schemas[0] {
+                schemas.push(schema);
+            }
+            let mut num_rows = 0;
+            let mut total_byte_size = 0;
+            for i in 0..meta_data.num_row_groups() {
+                let row_group_meta = meta_data.row_group(i);
+                num_rows += row_group_meta.num_rows();
+                total_byte_size += row_group_meta.total_byte_size();
+            }
+            let statistics = Statistics {
+                num_rows: Some(num_rows as usize),
+                total_byte_size: Some(total_byte_size as usize),
+            };
+            partitions.push(ParquetPartition {
+                filename: filename.to_owned().to_string(),
+                statistics,
+            });
+        }
 
-            Ok(Self::new(
-                filenames, schema, projection, batch_size, metadata,
-            ))
+        // we currently get the schema information from the first file rather than do
+        // schema merging and this is a limitation.
+        // See https://issues.apache.org/jira/browse/ARROW-11017
+        if schemas.len() > 1 {
+            return Err(DataFusionError::Plan(format!(
+                "The Parquet files have {} different schemas and DataFusion does \
+                not yet support schema merging",
+                schemas.len()
+            )));
         }
+        let schema = schemas[0].clone();
+
+        Ok(Self::new(partitions, schema, projection, batch_size))
     }
 
-    /// Create a new Parquet reader execution plan with provided files and schema
+    /// Create a new Parquet reader execution plan with provided partitions and schema
     pub fn new(
-        filenames: Vec<String>,
+        partitions: Vec<ParquetPartition>,
         schema: Schema,
         projection: Option<Vec<usize>>,
         batch_size: usize,
-        metadata: ParquetMetaData,
     ) -> Self {
         let projection = match projection {
             Some(p) => p,
@@ -99,18 +170,33 @@ impl ParquetExec {
                 .collect(),
         );
 
+        // sum the statistics
+        let mut num_rows: Option<usize> = None;
+        let mut total_byte_size: Option<usize> = None;
+        for part in &partitions {
+            if let Some(n) = part.statistics.num_rows {
+                num_rows = Some(num_rows.unwrap_or(0) + n)
+            }
+            if let Some(n) = part.statistics.total_byte_size {
+                total_byte_size = Some(total_byte_size.unwrap_or(0) + n)
+            }
+        }
+        let statistics = Statistics {
+            num_rows,
+            total_byte_size,
+        };
         Self {
-            filenames,
+            partitions,
             schema: Arc::new(projected_schema),
             projection,
             batch_size,
-            metadata,
+            statistics,
         }
     }
 
-    /// Expose the Parquet specific metadata
-    pub fn metadata(&self) -> ParquetMetaData {
-        self.metadata.clone()
+    /// Provide access to the statistics
+    pub fn statistics(&self) -> &Statistics {
+        &self.statistics
     }
 }
 
@@ -132,7 +218,7 @@ impl ExecutionPlan for ParquetExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(self.filenames.len())
+        Partitioning::UnknownPartitioning(self.partitions.len())
     }
 
     fn with_new_children(
@@ -157,7 +243,7 @@ impl ExecutionPlan for ParquetExec {
             Receiver<Option<ArrowResult<RecordBatch>>>,
         ) = bounded(2);
 
-        let filename = self.filenames[partition].clone();
+        let filename = self.partitions[partition].filename.clone();
         let projection = self.projection.clone();
         let batch_size = self.batch_size;
 
@@ -256,7 +342,8 @@ mod tests {
         let testdata =
             env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined");
         let filename = format!("{}/alltypes_plain.parquet", testdata);
-        let parquet_exec = ParquetExec::try_new(&filename, Some(vec![0, 1, 2]), 1024)?;
+        let parquet_exec =
+            ParquetExec::try_from_path(&filename, Some(vec![0, 1, 2]), 1024)?;
         assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
 
         let mut results = parquet_exec.execute(0).await?;