You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/22 23:22:20 UTC

[GitHub] [arrow] andygrove opened a new pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

andygrove opened a new pull request #8992:
URL: https://github.com/apache/arrow/pull/8992


   ParquetExec represents multiple files but we were calculating statistics based on the first file.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#discussion_r548227986



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -99,18 +154,38 @@ impl ParquetExec {
                 .collect(),
         );
 
+        // sum the statistics
+        let mut num_rows = 0;
+        let mut total_byte_size = 0;
+        for part in &partitions {
+            num_rows += part.statistics.num_rows.unwrap_or(0);
+            total_byte_size += part.statistics.total_byte_size.unwrap_or(0);
+        }
+        let statistics = Statistics {
+            num_rows: if num_rows == 0 {
+                None

Review comment:
       Yes, that was a bit sloppy. This is now fixed. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #8992:
URL: https://github.com/apache/arrow/pull/8992


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-750408839


   @alamb @jorgecarleitao I got a bit carried away with some other improvements in this PR but I am going to stop now. I filed a follow-up issue to add more comprehensive tests before we release 3.0.0


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-750408260


   > I wonder if there is some way to test this code -- I also remember being confused about the fact that `ParquetExec` can actually take a directory full of files rather than a single one.
   
   Yes, tests are definitely lacking here. I will take this on as a follow-up task for the release: https://issues.apache.org/jira/browse/ARROW-11020


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#discussion_r548208377



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -99,18 +154,38 @@ impl ParquetExec {
                 .collect(),
         );
 
+        // sum the statistics
+        let mut num_rows = 0;
+        let mut total_byte_size = 0;
+        for part in &partitions {
+            num_rows += part.statistics.num_rows.unwrap_or(0);
+            total_byte_size += part.statistics.total_byte_size.unwrap_or(0);
+        }
+        let statistics = Statistics {
+            num_rows: if num_rows == 0 {
+                None

Review comment:
       I think if the result is 0, it is best to know that there are 0 records/bytes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#discussion_r548195177



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -30,62 +30,117 @@ use crate::physical_plan::{common, Partitioning};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
-use parquet::file::metadata::ParquetMetaData;
 use parquet::file::reader::SerializedFileReader;
 
 use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
 use fmt::Debug;
 use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
 
+use crate::datasource::datasource::Statistics;
 use async_trait::async_trait;
 use futures::stream::Stream;
 
-/// Execution plan for scanning a Parquet file
+/// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
-    /// Path to directory containing partitioned Parquet files with the same schema
-    filenames: Vec<String>,
+    /// Parquet partitions to read
+    partitions: Vec<ParquetPartition>,
     /// Schema after projection is applied
     schema: SchemaRef,
     /// Projection for which columns to load
     projection: Vec<usize>,
     /// Batch size
     batch_size: usize,
-    /// Parquet metadata
-    metadata: ParquetMetaData,
+    /// Statistics for the data set (sum of statistics for all partitions)
+    statistics: Statistics,
+}
+
+/// Represents one partition of a Parquet data set and this currently means one Parquet file.
+///
+/// In the future it would be good to support subsets of files based on ranges of row groups
+/// so that we can better parallelize reads of large files across available cores (see
+/// https://issues.apache.org/jira/browse/ARROW-10995).
+///
+/// We may also want to support reading Parquet files that are partitioned based on a key and
+/// in this case we would want this partition struct to represent multiple files for a given
+/// partition key (see https://issues.apache.org/jira/browse/ARROW-11019).
+#[derive(Debug, Clone)]
+pub struct ParquetPartition {
+    /// The Parquet filename for this partition
+    filename: String,
+    /// Statistics for this partition
+    statistics: Statistics,
 }
 
 impl ParquetExec {
-    /// Create a new Parquet reader execution plan
+    /// Create a new Parquet reader execution plan based on the specified Parquet filename or
+    /// directory containing Parquet files
     pub fn try_new(
         path: &str,
         projection: Option<Vec<usize>>,
         batch_size: usize,
     ) -> Result<Self> {
+        // build a list of filenames from the specified path, which could be a single file or
+        // a directory containing one or more parquet files
         let mut filenames: Vec<String> = vec![];
         common::build_file_list(path, &mut filenames, ".parquet")?;
         if filenames.is_empty() {
-            Err(DataFusionError::Plan("No files found".to_string()))
+            Err(DataFusionError::Plan(format!(
+                "No Parquet files found at path {}",
+                path
+            )))
         } else {
-            let file = File::open(&filenames[0])?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let schema = arrow_reader.get_schema()?;
-            let metadata = arrow_reader.get_metadata();
-
-            Ok(Self::new(
-                filenames, schema, projection, batch_size, metadata,
-            ))
+            // build a list of Parquet partitions with statistics and gather all unique schemas
+            // used in this data set
+            let mut schemas: Vec<Schema> = vec![];
+            let mut partitions = vec![];
+            for filename in &filenames {
+                let file = File::open(filename)?;
+                let file_reader = Arc::new(SerializedFileReader::new(file)?);
+                let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+                let meta_data = arrow_reader.get_metadata();
+                // collect all the unique schemas in this data set
+                let schema = arrow_reader.get_schema()?;
+                if schemas.is_empty() || schema != schemas[0] {
+                    schemas.push(schema);
+                }
+                let mut num_rows = 0;
+                let mut total_byte_size = 0;
+                for i in 0..meta_data.num_row_groups() {
+                    let row_group_meta = meta_data.row_group(i);
+                    num_rows += row_group_meta.num_rows();
+                    total_byte_size += row_group_meta.total_byte_size();
+                }
+                let statistics = Statistics {
+                    num_rows: Some(num_rows as usize),
+                    total_byte_size: Some(total_byte_size as usize),
+                };
+                partitions.push(ParquetPartition {
+                    filename: filename.to_owned(),
+                    statistics,
+                });
+            }
+
+            // we currently get the schema information from the first file rather than do
+            // schema merging and this is a limitation.
+            // See https://issues.apache.org/jira/browse/ARROW-11017
+            if schemas.len() > 1 {
+                return Err(DataFusionError::Plan(format!(
+                    "The Parquet files in {} have {} different schemas and DataFusion does \
+                    not yet support schema merging", path, schemas.len())));
+            }
+            let schema = schemas[0].clone();
+
+            Ok(Self::new(partitions, schema, projection, batch_size))
         }
     }
 
-    /// Create a new Parquet reader execution plan with provided files and schema
+    /// Create a new Parquet reader execution plan with provided partitions and schema
     pub fn new(
-        filenames: Vec<String>,
+        partitions: Vec<ParquetPartition>,

Review comment:
       Interesting. I was not aware of this use case. Perhaps we need a specific constructor for that use case. I'll take a look.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-749836684


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=h1) Report
   > Merging [#8992](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=desc) (e50cb12) into [master](https://codecov.io/gh/apache/arrow/commit/0519c4c0ecccd7d84ce44bd3a3e7bcb4fef8f4d6?el=desc) (0519c4c) will **increase** coverage by `0.02%`.
   > The diff coverage is `85.71%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8992/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #8992      +/-   ##
   ==========================================
   + Coverage   82.64%   82.66%   +0.02%     
   ==========================================
     Files         200      200              
     Lines       49730    49794      +64     
   ==========================================
   + Hits        41098    41161      +63     
   - Misses       8632     8633       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/datasource/datasource.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL2RhdGFzb3VyY2UucnM=) | `100.00% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL3BhcnF1ZXQucnM=) | `80.48% <85.36%> (+0.91%)` | :arrow_up: |
   | [rust/datafusion/src/datasource/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL3BhcnF1ZXQucnM=) | `96.92% <100.00%> (+1.30%)` | :arrow_up: |
   | [rust/parquet/src/file/metadata.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL21ldGFkYXRhLnJz) | `91.05% <0.00%> (-0.78%)` | :arrow_down: |
   | [rust/parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9zY2hlbWEvdHlwZXMucnM=) | `89.93% <0.00%> (-0.27%)` | :arrow_down: |
   | [rust/parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9lbmNvZGluZ3MvZW5jb2RpbmcucnM=) | `95.43% <0.00%> (+0.19%)` | :arrow_up: |
   | [rust/datafusion/src/physical\_plan/expressions.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2V4cHJlc3Npb25zLnJz) | `84.49% <0.00%> (+0.31%)` | :arrow_up: |
   | [rust/arrow/src/util/test\_util.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvdXRpbC90ZXN0X3V0aWwucnM=) | `90.90% <0.00%> (+15.90%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=footer). Last update [081728f...e50cb12](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#discussion_r548202692



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -30,62 +30,117 @@ use crate::physical_plan::{common, Partitioning};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
-use parquet::file::metadata::ParquetMetaData;
 use parquet::file::reader::SerializedFileReader;
 
 use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
 use fmt::Debug;
 use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
 
+use crate::datasource::datasource::Statistics;
 use async_trait::async_trait;
 use futures::stream::Stream;
 
-/// Execution plan for scanning a Parquet file
+/// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
-    /// Path to directory containing partitioned Parquet files with the same schema
-    filenames: Vec<String>,
+    /// Parquet partitions to read
+    partitions: Vec<ParquetPartition>,
     /// Schema after projection is applied
     schema: SchemaRef,
     /// Projection for which columns to load
     projection: Vec<usize>,
     /// Batch size
     batch_size: usize,
-    /// Parquet metadata
-    metadata: ParquetMetaData,
+    /// Statistics for the data set (sum of statistics for all partitions)
+    statistics: Statistics,
+}
+
+/// Represents one partition of a Parquet data set and this currently means one Parquet file.
+///
+/// In the future it would be good to support subsets of files based on ranges of row groups
+/// so that we can better parallelize reads of large files across available cores (see
+/// https://issues.apache.org/jira/browse/ARROW-10995).
+///
+/// We may also want to support reading Parquet files that are partitioned based on a key and
+/// in this case we would want this partition struct to represent multiple files for a given
+/// partition key (see https://issues.apache.org/jira/browse/ARROW-11019).
+#[derive(Debug, Clone)]
+pub struct ParquetPartition {
+    /// The Parquet filename for this partition
+    filename: String,
+    /// Statistics for this partition
+    statistics: Statistics,
 }
 
 impl ParquetExec {
-    /// Create a new Parquet reader execution plan
+    /// Create a new Parquet reader execution plan based on the specified Parquet filename or
+    /// directory containing Parquet files
     pub fn try_new(
         path: &str,
         projection: Option<Vec<usize>>,
         batch_size: usize,
     ) -> Result<Self> {
+        // build a list of filenames from the specified path, which could be a single file or
+        // a directory containing one or more parquet files
         let mut filenames: Vec<String> = vec![];
         common::build_file_list(path, &mut filenames, ".parquet")?;
         if filenames.is_empty() {
-            Err(DataFusionError::Plan("No files found".to_string()))
+            Err(DataFusionError::Plan(format!(
+                "No Parquet files found at path {}",
+                path
+            )))
         } else {
-            let file = File::open(&filenames[0])?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let schema = arrow_reader.get_schema()?;
-            let metadata = arrow_reader.get_metadata();
-
-            Ok(Self::new(
-                filenames, schema, projection, batch_size, metadata,
-            ))
+            // build a list of Parquet partitions with statistics and gather all unique schemas
+            // used in this data set
+            let mut schemas: Vec<Schema> = vec![];
+            let mut partitions = vec![];
+            for filename in &filenames {
+                let file = File::open(filename)?;
+                let file_reader = Arc::new(SerializedFileReader::new(file)?);
+                let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+                let meta_data = arrow_reader.get_metadata();
+                // collect all the unique schemas in this data set
+                let schema = arrow_reader.get_schema()?;
+                if schemas.is_empty() || schema != schemas[0] {
+                    schemas.push(schema);
+                }
+                let mut num_rows = 0;
+                let mut total_byte_size = 0;
+                for i in 0..meta_data.num_row_groups() {
+                    let row_group_meta = meta_data.row_group(i);
+                    num_rows += row_group_meta.num_rows();
+                    total_byte_size += row_group_meta.total_byte_size();
+                }
+                let statistics = Statistics {
+                    num_rows: Some(num_rows as usize),
+                    total_byte_size: Some(total_byte_size as usize),
+                };
+                partitions.push(ParquetPartition {
+                    filename: filename.to_owned(),
+                    statistics,
+                });
+            }
+
+            // we currently get the schema information from the first file rather than do
+            // schema merging and this is a limitation.
+            // See https://issues.apache.org/jira/browse/ARROW-11017
+            if schemas.len() > 1 {
+                return Err(DataFusionError::Plan(format!(
+                    "The Parquet files in {} have {} different schemas and DataFusion does \
+                    not yet support schema merging", path, schemas.len())));
+            }
+            let schema = schemas[0].clone();
+
+            Ok(Self::new(partitions, schema, projection, batch_size))
         }
     }
 
-    /// Create a new Parquet reader execution plan with provided files and schema
+    /// Create a new Parquet reader execution plan with provided partitions and schema
     pub fn new(
-        filenames: Vec<String>,
+        partitions: Vec<ParquetPartition>,

Review comment:
       I pushed a change so there are now `try_from_path` and `try_from_files` constructors




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-749836684


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=h1) Report
   > Merging [#8992](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=desc) (f56cdca) into [master](https://codecov.io/gh/apache/arrow/commit/0519c4c0ecccd7d84ce44bd3a3e7bcb4fef8f4d6?el=desc) (0519c4c) will **increase** coverage by `0.01%`.
   > The diff coverage is `84.78%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8992/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #8992      +/-   ##
   ==========================================
   + Coverage   82.64%   82.65%   +0.01%     
   ==========================================
     Files         200      200              
     Lines       49730    49796      +66     
   ==========================================
   + Hits        41098    41161      +63     
   - Misses       8632     8635       +3     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/datasource/datasource.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL2RhdGFzb3VyY2UucnM=) | `100.00% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL3BhcnF1ZXQucnM=) | `80.00% <83.72%> (+0.43%)` | :arrow_up: |
   | [rust/datafusion/src/datasource/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL3BhcnF1ZXQucnM=) | `96.92% <100.00%> (+1.30%)` | :arrow_up: |
   | [rust/parquet/src/file/metadata.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL21ldGFkYXRhLnJz) | `91.05% <0.00%> (-0.78%)` | :arrow_down: |
   | [rust/parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9zY2hlbWEvdHlwZXMucnM=) | `89.93% <0.00%> (-0.27%)` | :arrow_down: |
   | [rust/datafusion/src/physical\_plan/expressions.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2V4cHJlc3Npb25zLnJz) | `84.49% <0.00%> (+0.31%)` | :arrow_up: |
   | [rust/arrow/src/util/test\_util.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvdXRpbC90ZXN0X3V0aWwucnM=) | `90.90% <0.00%> (+15.90%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=footer). Last update [081728f...f56cdca](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-750439538


   1 comment that I think should be resolved. The rest looks good, great that this is fixed.
   Agree also that there should be some tests :+1: 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-750435853


   Makes sense. Maybe we can reuse the summing stats for all partitions for different source types if it gets more complex.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-749836684


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=h1) Report
   > Merging [#8992](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=desc) (0f21d1a) into [master](https://codecov.io/gh/apache/arrow/commit/0519c4c0ecccd7d84ce44bd3a3e7bcb4fef8f4d6?el=desc) (0519c4c) will **increase** coverage by `0.01%`.
   > The diff coverage is `85.41%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8992/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #8992      +/-   ##
   ==========================================
   + Coverage   82.64%   82.66%   +0.01%     
   ==========================================
     Files         200      200              
     Lines       49730    49798      +68     
   ==========================================
   + Hits        41098    41164      +66     
   - Misses       8632     8634       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/datasource/datasource.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL2RhdGFzb3VyY2UucnM=) | `100.00% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL3BhcnF1ZXQucnM=) | `80.31% <84.44%> (+0.74%)` | :arrow_up: |
   | [rust/datafusion/src/datasource/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL3BhcnF1ZXQucnM=) | `96.92% <100.00%> (+1.30%)` | :arrow_up: |
   | [rust/parquet/src/file/metadata.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL21ldGFkYXRhLnJz) | `91.05% <0.00%> (-0.78%)` | :arrow_down: |
   | [rust/parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9zY2hlbWEvdHlwZXMucnM=) | `89.93% <0.00%> (-0.27%)` | :arrow_down: |
   | [rust/parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9lbmNvZGluZ3MvZW5jb2RpbmcucnM=) | `95.43% <0.00%> (+0.19%)` | :arrow_up: |
   | [rust/datafusion/src/physical\_plan/expressions.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2V4cHJlc3Npb25zLnJz) | `84.49% <0.00%> (+0.31%)` | :arrow_up: |
   | [rust/arrow/src/util/test\_util.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvdXRpbC90ZXN0X3V0aWwucnM=) | `90.90% <0.00%> (+15.90%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=footer). Last update [081728f...0f21d1a](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] seddonm1 commented on a change in pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
seddonm1 commented on a change in pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#discussion_r548159968



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -30,62 +30,117 @@ use crate::physical_plan::{common, Partitioning};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
-use parquet::file::metadata::ParquetMetaData;
 use parquet::file::reader::SerializedFileReader;
 
 use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
 use fmt::Debug;
 use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
 
+use crate::datasource::datasource::Statistics;
 use async_trait::async_trait;
 use futures::stream::Stream;
 
-/// Execution plan for scanning a Parquet file
+/// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
-    /// Path to directory containing partitioned Parquet files with the same schema
-    filenames: Vec<String>,
+    /// Parquet partitions to read
+    partitions: Vec<ParquetPartition>,
     /// Schema after projection is applied
     schema: SchemaRef,
     /// Projection for which columns to load
     projection: Vec<usize>,
     /// Batch size
     batch_size: usize,
-    /// Parquet metadata
-    metadata: ParquetMetaData,
+    /// Statistics for the data set (sum of statistics for all partitions)
+    statistics: Statistics,
+}
+
+/// Represents one partition of a Parquet data set and this currently means one Parquet file.
+///
+/// In the future it would be good to support subsets of files based on ranges of row groups
+/// so that we can better parallelize reads of large files across available cores (see
+/// https://issues.apache.org/jira/browse/ARROW-10995).
+///
+/// We may also want to support reading Parquet files that are partitioned based on a key and
+/// in this case we would want this partition struct to represent multiple files for a given
+/// partition key (see https://issues.apache.org/jira/browse/ARROW-11019).
+#[derive(Debug, Clone)]
+pub struct ParquetPartition {
+    /// The Parquet filename for this partition
+    filename: String,
+    /// Statistics for this partition
+    statistics: Statistics,
 }
 
 impl ParquetExec {
-    /// Create a new Parquet reader execution plan
+    /// Create a new Parquet reader execution plan based on the specified Parquet filename or
+    /// directory containing Parquet files
     pub fn try_new(
         path: &str,
         projection: Option<Vec<usize>>,
         batch_size: usize,
     ) -> Result<Self> {
+        // build a list of filenames from the specified path, which could be a single file or
+        // a directory containing one or more parquet files
         let mut filenames: Vec<String> = vec![];
         common::build_file_list(path, &mut filenames, ".parquet")?;
         if filenames.is_empty() {
-            Err(DataFusionError::Plan("No files found".to_string()))
+            Err(DataFusionError::Plan(format!(
+                "No Parquet files found at path {}",
+                path
+            )))
         } else {
-            let file = File::open(&filenames[0])?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let schema = arrow_reader.get_schema()?;
-            let metadata = arrow_reader.get_metadata();
-
-            Ok(Self::new(
-                filenames, schema, projection, batch_size, metadata,
-            ))
+            // build a list of Parquet partitions with statistics and gather all unique schemas
+            // used in this data set
+            let mut schemas: Vec<Schema> = vec![];
+            let mut partitions = vec![];
+            for filename in &filenames {
+                let file = File::open(filename)?;
+                let file_reader = Arc::new(SerializedFileReader::new(file)?);
+                let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+                let meta_data = arrow_reader.get_metadata();
+                // collect all the unique schemas in this data set
+                let schema = arrow_reader.get_schema()?;
+                if schemas.is_empty() || schema != schemas[0] {
+                    schemas.push(schema);
+                }
+                let mut num_rows = 0;
+                let mut total_byte_size = 0;
+                for i in 0..meta_data.num_row_groups() {
+                    let row_group_meta = meta_data.row_group(i);
+                    num_rows += row_group_meta.num_rows();
+                    total_byte_size += row_group_meta.total_byte_size();
+                }
+                let statistics = Statistics {
+                    num_rows: Some(num_rows as usize),
+                    total_byte_size: Some(total_byte_size as usize),
+                };
+                partitions.push(ParquetPartition {
+                    filename: filename.to_owned(),
+                    statistics,
+                });
+            }
+
+            // we currently get the schema information from the first file rather than do
+            // schema merging and this is a limitation.
+            // See https://issues.apache.org/jira/browse/ARROW-11017
+            if schemas.len() > 1 {
+                return Err(DataFusionError::Plan(format!(
+                    "The Parquet files in {} have {} different schemas and DataFusion does \
+                    not yet support schema merging", path, schemas.len())));
+            }
+            let schema = schemas[0].clone();
+
+            Ok(Self::new(partitions, schema, projection, batch_size))
         }
     }
 
-    /// Create a new Parquet reader execution plan with provided files and schema
+    /// Create a new Parquet reader execution plan with provided partitions and schema
     pub fn new(
-        filenames: Vec<String>,
+        partitions: Vec<ParquetPartition>,

Review comment:
       I thought that part of the reason for allowing a list of files was to support similar behavior to DeltaLake where an external file contains a list of filenames representing a version of data rather than grouping them by directory structure?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-749836684


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=h1) Report
   > Merging [#8992](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=desc) (e61fe43) into [master](https://codecov.io/gh/apache/arrow/commit/0519c4c0ecccd7d84ce44bd3a3e7bcb4fef8f4d6?el=desc) (0519c4c) will **increase** coverage by `0.01%`.
   > The diff coverage is `83.78%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8992/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #8992      +/-   ##
   ==========================================
   + Coverage   82.64%   82.65%   +0.01%     
   ==========================================
     Files         200      200              
     Lines       49730    49789      +59     
   ==========================================
   + Hits        41098    41155      +57     
   - Misses       8632     8634       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/datasource/datasource.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL2RhdGFzb3VyY2UucnM=) | `100.00% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL3BhcnF1ZXQucnM=) | `79.66% <83.33%> (+0.09%)` | :arrow_up: |
   | [rust/datafusion/src/datasource/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL3BhcnF1ZXQucnM=) | `96.92% <100.00%> (+1.30%)` | :arrow_up: |
   | [rust/parquet/src/file/metadata.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL21ldGFkYXRhLnJz) | `91.05% <0.00%> (-0.78%)` | :arrow_down: |
   | [rust/parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9zY2hlbWEvdHlwZXMucnM=) | `89.93% <0.00%> (-0.27%)` | :arrow_down: |
   | [rust/datafusion/src/physical\_plan/expressions.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2V4cHJlc3Npb25zLnJz) | `84.49% <0.00%> (+0.31%)` | :arrow_up: |
   | [rust/arrow/src/util/test\_util.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvdXRpbC90ZXN0X3V0aWwucnM=) | `90.90% <0.00%> (+15.90%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=footer). Last update [081728f...e50cb12](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-749833121


   @seddonm1 fyi


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#discussion_r548049010



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -67,14 +67,35 @@ impl ParquetExec {
         if filenames.is_empty() {
             Err(DataFusionError::Plan("No files found".to_string()))
         } else {
+            // Calculate statistics for the entire data set. Later, we will probably want to make
+            // statistics available on a per-partition basis.
+            let mut num_rows = 0;
+            let mut total_byte_size = 0;
+            for file in &filenames {
+                let file = File::open(file)?;
+                let file_reader = Arc::new(SerializedFileReader::new(file)?);

Review comment:
       I've gone a little further and introduced a `ParquetPartition` struct to make things more explicit about how partitioning works and added references to related issues for changing the partitioning strategy. I also improved an error message and added more documentation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] seddonm1 edited a comment on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
seddonm1 edited a comment on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-749837169


   Thanks. Good find. :facepalm: by me
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-749838710


   https://issues.apache.org/jira/browse/ARROW-11014


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#discussion_r547952117



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -30,17 +30,17 @@ use crate::physical_plan::{common, Partitioning};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
-use parquet::file::metadata::ParquetMetaData;
 use parquet::file::reader::SerializedFileReader;
 
 use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
 use fmt::Debug;
 use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
 
+use crate::datasource::datasource::Statistics;
 use async_trait::async_trait;
 use futures::stream::Stream;
 
-/// Execution plan for scanning a Parquet file
+/// Execution plan for scanning one or more Parquet files

Review comment:
       👍 

##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -67,14 +67,35 @@ impl ParquetExec {
         if filenames.is_empty() {
             Err(DataFusionError::Plan("No files found".to_string()))
         } else {
+            // Calculate statistics for the entire data set. Later, we will probably want to make
+            // statistics available on a per-partition basis.
+            let mut num_rows = 0;
+            let mut total_byte_size = 0;
+            for file in &filenames {
+                let file = File::open(file)?;
+                let file_reader = Arc::new(SerializedFileReader::new(file)?);

Review comment:
       It probably doesn't matter but we are creating `arrow_reader`s several times for the same file -- like here we create them just to read metadata, and then right below we (re)open the first one again to read the schema. And then we open them again to actually read data... 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#discussion_r548528639



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -30,62 +30,133 @@ use crate::physical_plan::{common, Partitioning};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
-use parquet::file::metadata::ParquetMetaData;
 use parquet::file::reader::SerializedFileReader;
 
 use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
 use fmt::Debug;
 use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
 
+use crate::datasource::datasource::Statistics;
 use async_trait::async_trait;
 use futures::stream::Stream;
 
-/// Execution plan for scanning a Parquet file
+/// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
-    /// Path to directory containing partitioned Parquet files with the same schema
-    filenames: Vec<String>,
+    /// Parquet partitions to read
+    partitions: Vec<ParquetPartition>,
     /// Schema after projection is applied
     schema: SchemaRef,
     /// Projection for which columns to load
     projection: Vec<usize>,
     /// Batch size
     batch_size: usize,
-    /// Parquet metadata
-    metadata: ParquetMetaData,
+    /// Statistics for the data set (sum of statistics for all partitions)
+    statistics: Statistics,
+}
+
+/// Represents one partition of a Parquet data set and this currently means one Parquet file.
+///
+/// In the future it would be good to support subsets of files based on ranges of row groups
+/// so that we can better parallelize reads of large files across available cores (see
+/// https://issues.apache.org/jira/browse/ARROW-10995).
+///
+/// We may also want to support reading Parquet files that are partitioned based on a key and
+/// in this case we would want this partition struct to represent multiple files for a given
+/// partition key (see https://issues.apache.org/jira/browse/ARROW-11019).
+#[derive(Debug, Clone)]
+pub struct ParquetPartition {
+    /// The Parquet filename for this partition
+    filename: String,
+    /// Statistics for this partition
+    statistics: Statistics,
 }
 
 impl ParquetExec {
-    /// Create a new Parquet reader execution plan
-    pub fn try_new(
+    /// Create a new Parquet reader execution plan based on the specified Parquet filename or
+    /// directory containing Parquet files
+    pub fn try_from_path(
         path: &str,
         projection: Option<Vec<usize>>,
         batch_size: usize,
     ) -> Result<Self> {
+        // build a list of filenames from the specified path, which could be a single file or
+        // a directory containing one or more parquet files
         let mut filenames: Vec<String> = vec![];
         common::build_file_list(path, &mut filenames, ".parquet")?;
         if filenames.is_empty() {
-            Err(DataFusionError::Plan("No files found".to_string()))
+            Err(DataFusionError::Plan(format!(
+                "No Parquet files found at path {}",
+                path
+            )))
         } else {
-            let file = File::open(&filenames[0])?;
+            let filenames = filenames
+                .iter()
+                .map(|filename| filename.as_str())
+                .collect::<Vec<&str>>();
+            Self::try_from_files(&filenames, projection, batch_size)
+        }
+    }
+
+    /// Create a new Parquet reader execution plan based on the specified list of Parquet
+    /// files
+    pub fn try_from_files(
+        filenames: &[&str],
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+    ) -> Result<Self> {
+        // build a list of Parquet partitions with statistics and gather all unique schemas
+        // used in this data set
+        let mut schemas: Vec<Schema> = vec![];
+        let mut partitions = Vec::with_capacity(filenames.len());
+        for filename in filenames {
+            let file = File::open(filename)?;
             let file_reader = Arc::new(SerializedFileReader::new(file)?);
             let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+            let meta_data = arrow_reader.get_metadata();
+            // collect all the unique schemas in this data set
             let schema = arrow_reader.get_schema()?;
-            let metadata = arrow_reader.get_metadata();
+            if schemas.is_empty() || schema != schemas[0] {
+                schemas.push(schema);
+            }
+            let mut num_rows = 0;
+            let mut total_byte_size = 0;
+            for i in 0..meta_data.num_row_groups() {
+                let row_group_meta = meta_data.row_group(i);
+                num_rows += row_group_meta.num_rows();
+                total_byte_size += row_group_meta.total_byte_size();
+            }
+            let statistics = Statistics {
+                num_rows: Some(num_rows as usize),
+                total_byte_size: Some(total_byte_size as usize),
+            };
+            partitions.push(ParquetPartition {

Review comment:
       👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] seddonm1 commented on a change in pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
seddonm1 commented on a change in pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#discussion_r548209682



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -30,62 +30,117 @@ use crate::physical_plan::{common, Partitioning};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
-use parquet::file::metadata::ParquetMetaData;
 use parquet::file::reader::SerializedFileReader;
 
 use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
 use fmt::Debug;
 use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
 
+use crate::datasource::datasource::Statistics;
 use async_trait::async_trait;
 use futures::stream::Stream;
 
-/// Execution plan for scanning a Parquet file
+/// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
-    /// Path to directory containing partitioned Parquet files with the same schema
-    filenames: Vec<String>,
+    /// Parquet partitions to read
+    partitions: Vec<ParquetPartition>,
     /// Schema after projection is applied
     schema: SchemaRef,
     /// Projection for which columns to load
     projection: Vec<usize>,
     /// Batch size
     batch_size: usize,
-    /// Parquet metadata
-    metadata: ParquetMetaData,
+    /// Statistics for the data set (sum of statistics for all partitions)
+    statistics: Statistics,
+}
+
+/// Represents one partition of a Parquet data set and this currently means one Parquet file.
+///
+/// In the future it would be good to support subsets of files based on ranges of row groups
+/// so that we can better parallelize reads of large files across available cores (see
+/// https://issues.apache.org/jira/browse/ARROW-10995).
+///
+/// We may also want to support reading Parquet files that are partitioned based on a key and
+/// in this case we would want this partition struct to represent multiple files for a given
+/// partition key (see https://issues.apache.org/jira/browse/ARROW-11019).
+#[derive(Debug, Clone)]
+pub struct ParquetPartition {
+    /// The Parquet filename for this partition
+    filename: String,
+    /// Statistics for this partition
+    statistics: Statistics,
 }
 
 impl ParquetExec {
-    /// Create a new Parquet reader execution plan
+    /// Create a new Parquet reader execution plan based on the specified Parquet filename or
+    /// directory containing Parquet files
     pub fn try_new(
         path: &str,
         projection: Option<Vec<usize>>,
         batch_size: usize,
     ) -> Result<Self> {
+        // build a list of filenames from the specified path, which could be a single file or
+        // a directory containing one or more parquet files
         let mut filenames: Vec<String> = vec![];
         common::build_file_list(path, &mut filenames, ".parquet")?;
         if filenames.is_empty() {
-            Err(DataFusionError::Plan("No files found".to_string()))
+            Err(DataFusionError::Plan(format!(
+                "No Parquet files found at path {}",
+                path
+            )))
         } else {
-            let file = File::open(&filenames[0])?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let schema = arrow_reader.get_schema()?;
-            let metadata = arrow_reader.get_metadata();
-
-            Ok(Self::new(
-                filenames, schema, projection, batch_size, metadata,
-            ))
+            // build a list of Parquet partitions with statistics and gather all unique schemas
+            // used in this data set
+            let mut schemas: Vec<Schema> = vec![];
+            let mut partitions = vec![];
+            for filename in &filenames {
+                let file = File::open(filename)?;
+                let file_reader = Arc::new(SerializedFileReader::new(file)?);
+                let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+                let meta_data = arrow_reader.get_metadata();
+                // collect all the unique schemas in this data set
+                let schema = arrow_reader.get_schema()?;
+                if schemas.is_empty() || schema != schemas[0] {
+                    schemas.push(schema);
+                }
+                let mut num_rows = 0;
+                let mut total_byte_size = 0;
+                for i in 0..meta_data.num_row_groups() {
+                    let row_group_meta = meta_data.row_group(i);
+                    num_rows += row_group_meta.num_rows();
+                    total_byte_size += row_group_meta.total_byte_size();
+                }
+                let statistics = Statistics {
+                    num_rows: Some(num_rows as usize),
+                    total_byte_size: Some(total_byte_size as usize),
+                };
+                partitions.push(ParquetPartition {
+                    filename: filename.to_owned(),
+                    statistics,
+                });
+            }
+
+            // we currently get the schema information from the first file rather than do
+            // schema merging and this is a limitation.
+            // See https://issues.apache.org/jira/browse/ARROW-11017
+            if schemas.len() > 1 {
+                return Err(DataFusionError::Plan(format!(
+                    "The Parquet files in {} have {} different schemas and DataFusion does \
+                    not yet support schema merging", path, schemas.len())));
+            }
+            let schema = schemas[0].clone();
+
+            Ok(Self::new(partitions, schema, projection, batch_size))
         }
     }
 
-    /// Create a new Parquet reader execution plan with provided files and schema
+    /// Create a new Parquet reader execution plan with provided partitions and schema
     pub fn new(
-        filenames: Vec<String>,
+        partitions: Vec<ParquetPartition>,

Review comment:
       Looks good 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-750429411


   Also @Dandandan this starts to introduce some per-partition stats now


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] seddonm1 commented on a change in pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
seddonm1 commented on a change in pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#discussion_r548198355



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -30,62 +30,117 @@ use crate::physical_plan::{common, Partitioning};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
-use parquet::file::metadata::ParquetMetaData;
 use parquet::file::reader::SerializedFileReader;
 
 use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
 use fmt::Debug;
 use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
 
+use crate::datasource::datasource::Statistics;
 use async_trait::async_trait;
 use futures::stream::Stream;
 
-/// Execution plan for scanning a Parquet file
+/// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
-    /// Path to directory containing partitioned Parquet files with the same schema
-    filenames: Vec<String>,
+    /// Parquet partitions to read
+    partitions: Vec<ParquetPartition>,
     /// Schema after projection is applied
     schema: SchemaRef,
     /// Projection for which columns to load
     projection: Vec<usize>,
     /// Batch size
     batch_size: usize,
-    /// Parquet metadata
-    metadata: ParquetMetaData,
+    /// Statistics for the data set (sum of statistics for all partitions)
+    statistics: Statistics,
+}
+
+/// Represents one partition of a Parquet data set and this currently means one Parquet file.
+///
+/// In the future it would be good to support subsets of files based on ranges of row groups
+/// so that we can better parallelize reads of large files across available cores (see
+/// https://issues.apache.org/jira/browse/ARROW-10995).
+///
+/// We may also want to support reading Parquet files that are partitioned based on a key and
+/// in this case we would want this partition struct to represent multiple files for a given
+/// partition key (see https://issues.apache.org/jira/browse/ARROW-11019).
+#[derive(Debug, Clone)]
+pub struct ParquetPartition {
+    /// The Parquet filename for this partition
+    filename: String,
+    /// Statistics for this partition
+    statistics: Statistics,
 }
 
 impl ParquetExec {
-    /// Create a new Parquet reader execution plan
+    /// Create a new Parquet reader execution plan based on the specified Parquet filename or
+    /// directory containing Parquet files
     pub fn try_new(
         path: &str,
         projection: Option<Vec<usize>>,
         batch_size: usize,
     ) -> Result<Self> {
+        // build a list of filenames from the specified path, which could be a single file or
+        // a directory containing one or more parquet files
         let mut filenames: Vec<String> = vec![];
         common::build_file_list(path, &mut filenames, ".parquet")?;
         if filenames.is_empty() {
-            Err(DataFusionError::Plan("No files found".to_string()))
+            Err(DataFusionError::Plan(format!(
+                "No Parquet files found at path {}",
+                path
+            )))
         } else {
-            let file = File::open(&filenames[0])?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let schema = arrow_reader.get_schema()?;
-            let metadata = arrow_reader.get_metadata();
-
-            Ok(Self::new(
-                filenames, schema, projection, batch_size, metadata,
-            ))
+            // build a list of Parquet partitions with statistics and gather all unique schemas
+            // used in this data set
+            let mut schemas: Vec<Schema> = vec![];
+            let mut partitions = vec![];
+            for filename in &filenames {
+                let file = File::open(filename)?;
+                let file_reader = Arc::new(SerializedFileReader::new(file)?);
+                let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+                let meta_data = arrow_reader.get_metadata();
+                // collect all the unique schemas in this data set
+                let schema = arrow_reader.get_schema()?;
+                if schemas.is_empty() || schema != schemas[0] {
+                    schemas.push(schema);
+                }
+                let mut num_rows = 0;
+                let mut total_byte_size = 0;
+                for i in 0..meta_data.num_row_groups() {
+                    let row_group_meta = meta_data.row_group(i);
+                    num_rows += row_group_meta.num_rows();
+                    total_byte_size += row_group_meta.total_byte_size();
+                }
+                let statistics = Statistics {
+                    num_rows: Some(num_rows as usize),
+                    total_byte_size: Some(total_byte_size as usize),
+                };
+                partitions.push(ParquetPartition {
+                    filename: filename.to_owned(),
+                    statistics,
+                });
+            }
+
+            // we currently get the schema information from the first file rather than do
+            // schema merging and this is a limitation.
+            // See https://issues.apache.org/jira/browse/ARROW-11017
+            if schemas.len() > 1 {
+                return Err(DataFusionError::Plan(format!(
+                    "The Parquet files in {} have {} different schemas and DataFusion does \
+                    not yet support schema merging", path, schemas.len())));
+            }
+            let schema = schemas[0].clone();
+
+            Ok(Self::new(partitions, schema, projection, batch_size))
         }
     }
 
-    /// Create a new Parquet reader execution plan with provided files and schema
+    /// Create a new Parquet reader execution plan with provided partitions and schema
     pub fn new(
-        filenames: Vec<String>,
+        partitions: Vec<ParquetPartition>,

Review comment:
       It may be premature to support that use case anyway until more of the core engine works.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] seddonm1 commented on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
seddonm1 commented on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-749837169


   Thanks. Good find. I didn't dig down too deeply through the Parquet library.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#discussion_r548157527



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -99,18 +154,38 @@ impl ParquetExec {
                 .collect(),
         );
 
+        // sum the statistics
+        let mut num_rows = 0;
+        let mut total_byte_size = 0;
+        for part in &partitions {
+            num_rows += part.statistics.num_rows.unwrap_or(0);
+            total_byte_size += part.statistics.total_byte_size.unwrap_or(0);
+        }
+        let statistics = Statistics {
+            num_rows: if num_rows == 0 {
+                None

Review comment:
       Why map rows and byte size to None here and not `Some(0)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#discussion_r548039078



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -67,14 +67,35 @@ impl ParquetExec {
         if filenames.is_empty() {
             Err(DataFusionError::Plan("No files found".to_string()))
         } else {
+            // Calculate statistics for the entire data set. Later, we will probably want to make
+            // statistics available on a per-partition basis.
+            let mut num_rows = 0;
+            let mut total_byte_size = 0;
+            for file in &filenames {
+                let file = File::open(file)?;
+                let file_reader = Arc::new(SerializedFileReader::new(file)?);

Review comment:
       This is a good point. I've pushed another change here to collect unique schemas during the scan of the files to avoid the separate read. This now also implements a check to make sure the schemas are the same. I have wasted time in the past tracking down issues due to incompatible files. I added a reference to the issue for implementing schema merging, which would be a nice addition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io commented on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-749836684


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=h1) Report
   > Merging [#8992](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=desc) (74a5750) into [master](https://codecov.io/gh/apache/arrow/commit/0519c4c0ecccd7d84ce44bd3a3e7bcb4fef8f4d6?el=desc) (0519c4c) will **decrease** coverage by `0.00%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8992/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #8992      +/-   ##
   ==========================================
   - Coverage   82.64%   82.64%   -0.01%     
   ==========================================
     Files         200      200              
     Lines       49730    49735       +5     
   ==========================================
   + Hits        41098    41102       +4     
   - Misses       8632     8633       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/datasource/datasource.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL2RhdGFzb3VyY2UucnM=) | `100.00% <ø> (ø)` | |
   | [rust/datafusion/src/datasource/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL3BhcnF1ZXQucnM=) | `96.92% <100.00%> (+1.30%)` | :arrow_up: |
   | [rust/datafusion/src/physical\_plan/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL3BhcnF1ZXQucnM=) | `81.90% <100.00%> (+2.33%)` | :arrow_up: |
   | [rust/parquet/src/file/metadata.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL21ldGFkYXRhLnJz) | `91.05% <0.00%> (-0.78%)` | :arrow_down: |
   | [rust/parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9zY2hlbWEvdHlwZXMucnM=) | `89.93% <0.00%> (-0.27%)` | :arrow_down: |
   | [rust/parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9lbmNvZGluZ3MvZW5jb2RpbmcucnM=) | `95.43% <0.00%> (+0.19%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=footer). Last update [081728f...74a5750](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#discussion_r548155265



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -30,62 +30,117 @@ use crate::physical_plan::{common, Partitioning};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
-use parquet::file::metadata::ParquetMetaData;
 use parquet::file::reader::SerializedFileReader;
 
 use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
 use fmt::Debug;
 use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
 
+use crate::datasource::datasource::Statistics;
 use async_trait::async_trait;
 use futures::stream::Stream;
 
-/// Execution plan for scanning a Parquet file
+/// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
-    /// Path to directory containing partitioned Parquet files with the same schema
-    filenames: Vec<String>,
+    /// Parquet partitions to read
+    partitions: Vec<ParquetPartition>,
     /// Schema after projection is applied
     schema: SchemaRef,
     /// Projection for which columns to load
     projection: Vec<usize>,
     /// Batch size
     batch_size: usize,
-    /// Parquet metadata
-    metadata: ParquetMetaData,
+    /// Statistics for the data set (sum of statistics for all partitions)
+    statistics: Statistics,
+}
+
+/// Represents one partition of a Parquet data set and this currently means one Parquet file.
+///
+/// In the future it would be good to support subsets of files based on ranges of row groups
+/// so that we can better parallelize reads of large files across available cores (see
+/// https://issues.apache.org/jira/browse/ARROW-10995).
+///
+/// We may also want to support reading Parquet files that are partitioned based on a key and
+/// in this case we would want this partition struct to represent multiple files for a given
+/// partition key (see https://issues.apache.org/jira/browse/ARROW-11019).
+#[derive(Debug, Clone)]
+pub struct ParquetPartition {
+    /// The Parquet filename for this partition
+    filename: String,
+    /// Statistics for this partition
+    statistics: Statistics,
 }
 
 impl ParquetExec {
-    /// Create a new Parquet reader execution plan
+    /// Create a new Parquet reader execution plan based on the specified Parquet filename or
+    /// directory containing Parquet files
     pub fn try_new(
         path: &str,
         projection: Option<Vec<usize>>,
         batch_size: usize,
     ) -> Result<Self> {
+        // build a list of filenames from the specified path, which could be a single file or
+        // a directory containing one or more parquet files
         let mut filenames: Vec<String> = vec![];
         common::build_file_list(path, &mut filenames, ".parquet")?;
         if filenames.is_empty() {
-            Err(DataFusionError::Plan("No files found".to_string()))
+            Err(DataFusionError::Plan(format!(
+                "No Parquet files found at path {}",
+                path
+            )))
         } else {
-            let file = File::open(&filenames[0])?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let schema = arrow_reader.get_schema()?;
-            let metadata = arrow_reader.get_metadata();
-
-            Ok(Self::new(
-                filenames, schema, projection, batch_size, metadata,
-            ))
+            // build a list of Parquet partitions with statistics and gather all unique schemas
+            // used in this data set
+            let mut schemas: Vec<Schema> = vec![];
+            let mut partitions = vec![];

Review comment:
       Could use `with_capacity`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #8992: ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#issuecomment-749836684


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=h1) Report
   > Merging [#8992](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=desc) (a2165eb) into [master](https://codecov.io/gh/apache/arrow/commit/0519c4c0ecccd7d84ce44bd3a3e7bcb4fef8f4d6?el=desc) (0519c4c) will **increase** coverage by `0.01%`.
   > The diff coverage is `84.78%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8992/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #8992      +/-   ##
   ==========================================
   + Coverage   82.64%   82.65%   +0.01%     
   ==========================================
     Files         200      200              
     Lines       49730    49796      +66     
   ==========================================
   + Hits        41098    41161      +63     
   - Misses       8632     8635       +3     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/datasource/datasource.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL2RhdGFzb3VyY2UucnM=) | `100.00% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL3BhcnF1ZXQucnM=) | `80.00% <83.72%> (+0.43%)` | :arrow_up: |
   | [rust/datafusion/src/datasource/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL3BhcnF1ZXQucnM=) | `96.92% <100.00%> (+1.30%)` | :arrow_up: |
   | [rust/parquet/src/file/metadata.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL21ldGFkYXRhLnJz) | `91.05% <0.00%> (-0.78%)` | :arrow_down: |
   | [rust/parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9zY2hlbWEvdHlwZXMucnM=) | `89.93% <0.00%> (-0.27%)` | :arrow_down: |
   | [rust/datafusion/src/physical\_plan/expressions.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2V4cHJlc3Npb25zLnJz) | `84.49% <0.00%> (+0.31%)` | :arrow_up: |
   | [rust/arrow/src/util/test\_util.rs](https://codecov.io/gh/apache/arrow/pull/8992/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvdXRpbC90ZXN0X3V0aWwucnM=) | `90.90% <0.00%> (+15.90%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=footer). Last update [081728f...f56cdca](https://codecov.io/gh/apache/arrow/pull/8992?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org