You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/16 14:24:23 UTC

[GitHub] [arrow-datafusion] andygrove opened a new pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

andygrove opened a new pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897


   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #896.
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   Refactor in preparation for making partition scans run in parallel.
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   Refactor to move some logic from ParquetExec down to ParquetPartition
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   No.
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove closed pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897#discussion_r690976184



##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -582,14 +389,244 @@ impl ParquetExec {
 
 impl ParquetPartition {
     /// Create a new parquet partition
-    pub fn new(filenames: Vec<String>, statistics: Statistics) -> Self {
+    pub fn new(
+        filenames: Vec<String>,
+        schemas: Vec<SchemaRef>,
+        statistics: Statistics,
+    ) -> Self {
         Self {
             filenames,
+            schemas,
             statistics,
             metrics: ParquetPartitionMetrics::new(),
         }
     }
 
+    /// Create a new parquet partition by scanning a set of files
+    pub fn try_from_files(filenames: Vec<String>, limit: Option<usize>) -> Result<Self> {
+        let mut num_rows = 0;
+        let mut num_fields = 0;
+        let mut fields = Vec::new();
+        let mut total_byte_size = 0;
+        let mut null_counts = Vec::new();
+        let mut max_values: Vec<Option<MaxAccumulator>> = Vec::new();
+        let mut min_values: Vec<Option<MinAccumulator>> = Vec::new();
+        let mut total_files = 0;
+        let mut schemas: Vec<SchemaRef> = vec![];
+        for filename in &filenames {
+            total_files += 1;
+            let file = File::open(filename)?;
+            let file_reader = Arc::new(SerializedFileReader::new(file)?);
+            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+            let meta_data = arrow_reader.get_metadata();
+            // collect all the unique schemas in this data set
+            let schema = arrow_reader.get_schema()?;
+            if schemas.is_empty() || &schema != schemas[0].as_ref() {
+                fields = schema.fields().to_vec();
+                num_fields = schema.fields().len();
+                null_counts = vec![0; num_fields];
+                max_values = schema
+                    .fields()
+                    .iter()
+                    .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
+                    .collect::<Vec<_>>();
+                min_values = schema
+                    .fields()
+                    .iter()
+                    .map(|field| MinAccumulator::try_new(field.data_type()).ok())
+                    .collect::<Vec<_>>();
+                schemas.push(Arc::new(schema));
+            }
+
+            for row_group_meta in meta_data.row_groups() {
+                num_rows += row_group_meta.num_rows();
+                total_byte_size += row_group_meta.total_byte_size();
+
+                // Currently assumes every Parquet file has same schema
+                // https://issues.apache.org/jira/browse/ARROW-11017
+                let columns_null_counts = row_group_meta
+                    .columns()
+                    .iter()
+                    .flat_map(|c| c.statistics().map(|stats| stats.null_count()));
+
+                for (i, cnt) in columns_null_counts.enumerate() {
+                    null_counts[i] += cnt
+                }
+
+                for (i, column) in row_group_meta.columns().iter().enumerate() {
+                    if let Some(stat) = column.statistics() {
+                        match stat {
+                            ParquetStatistics::Boolean(s) => {
+                                if let DataType::Boolean = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Boolean(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Boolean(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Int32(s) => {
+                                if let DataType::Int32 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[ScalarValue::Int32(
+                                                Some(*s.max()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[ScalarValue::Int32(
+                                                Some(*s.min()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Int64(s) => {
+                                if let DataType::Int64 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[ScalarValue::Int64(
+                                                Some(*s.max()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[ScalarValue::Int64(
+                                                Some(*s.min()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Float(s) => {
+                                if let DataType::Float32 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Float32(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Float32(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Double(s) => {
+                                if let DataType::Float64 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Float64(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Float64(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            _ => {}
+                        }
+                    }
+                }
+
+                if limit.map(|x| num_rows >= x as i64).unwrap_or(false) {
+                    break;
+                }
+            }
+        }
+        let column_stats = (0..num_fields)
+            .map(|i| {
+                let max_value = match &max_values[i] {
+                    Some(max_value) => max_value.evaluate().ok(),
+                    None => None,
+                };
+                let min_value = match &min_values[i] {
+                    Some(min_value) => min_value.evaluate().ok(),
+                    None => None,
+                };
+                ColumnStatistics {
+                    null_count: Some(null_counts[i] as usize),
+                    max_value,
+                    min_value,
+                    distinct_count: None,
+                }
+            })
+            .collect();
+
+        let statistics = Statistics {
+            num_rows: Some(num_rows as usize),
+            total_byte_size: Some(total_byte_size as usize),
+            column_statistics: Some(column_stats),
+        };
+        // remove files that are not needed in case of limit
+        let mut filenames = filenames;
+        filenames.truncate(total_files);

Review comment:
       looks like it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897#discussion_r690974538



##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -582,14 +389,244 @@ impl ParquetExec {
 
 impl ParquetPartition {
     /// Create a new parquet partition
-    pub fn new(filenames: Vec<String>, statistics: Statistics) -> Self {
+    pub fn new(
+        filenames: Vec<String>,

Review comment:
       It seems like the `PartitionedFile` abstraction proposed by @yjshen  in https://github.com/apache/arrow-datafusion/pull/811/files#diff-72f3a52c56e83e00d8c605d461f092617a3c205619376bb373069c662f9cfc93R54 would help solve this problem?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove commented on a change in pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897#discussion_r689584504



##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -167,235 +169,40 @@ impl ParquetExec {
                filenames, projection, predicate, limit);
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
-        let mut schemas: Vec<Schema> = vec![];
+        let mut schemas: Vec<SchemaRef> = vec![];
         let mut partitions = Vec::with_capacity(max_concurrency);
         let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
         let chunks = split_files(&filenames, max_concurrency);
-        let mut num_rows = 0;
-        let mut num_fields = 0;
-        let mut fields = Vec::new();
-        let mut total_byte_size = 0;
-        let mut null_counts = Vec::new();
-        let mut max_values: Vec<Option<MaxAccumulator>> = Vec::new();
-        let mut min_values: Vec<Option<MinAccumulator>> = Vec::new();
-        let mut limit_exhausted = false;
-        for chunk in chunks {
-            let mut filenames: Vec<String> =
-                chunk.iter().map(|x| x.to_string()).collect();
-            let mut total_files = 0;
-            for filename in &filenames {
-                total_files += 1;
-                let file = File::open(filename)?;
-                let file_reader = Arc::new(SerializedFileReader::new(file)?);
-                let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-                let meta_data = arrow_reader.get_metadata();
-                // collect all the unique schemas in this data set
-                let schema = arrow_reader.get_schema()?;
-                if schemas.is_empty() || schema != schemas[0] {
-                    fields = schema.fields().to_vec();
-                    num_fields = schema.fields().len();
-                    null_counts = vec![0; num_fields];
-                    max_values = schema
-                        .fields()
-                        .iter()
-                        .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
-                        .collect::<Vec<_>>();
-                    min_values = schema
-                        .fields()
-                        .iter()
-                        .map(|field| MinAccumulator::try_new(field.data_type()).ok())
-                        .collect::<Vec<_>>();
-                    schemas.push(schema);
-                }
 
-                for row_group_meta in meta_data.row_groups() {
-                    num_rows += row_group_meta.num_rows();
-                    total_byte_size += row_group_meta.total_byte_size();
-
-                    // Currently assumes every Parquet file has same schema
-                    // https://issues.apache.org/jira/browse/ARROW-11017
-                    let columns_null_counts = row_group_meta
-                        .columns()
-                        .iter()
-                        .flat_map(|c| c.statistics().map(|stats| stats.null_count()));
-
-                    for (i, cnt) in columns_null_counts.enumerate() {
-                        null_counts[i] += cnt
-                    }
-
-                    for (i, column) in row_group_meta.columns().iter().enumerate() {
-                        if let Some(stat) = column.statistics() {
-                            match stat {
-                                ParquetStatistics::Boolean(s) => {
-                                    if let DataType::Boolean = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Boolean(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Boolean(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                ParquetStatistics::Int32(s) => {
-                                    if let DataType::Int32 = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Int32(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Int32(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                ParquetStatistics::Int64(s) => {
-                                    if let DataType::Int64 = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Int64(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Int64(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                ParquetStatistics::Float(s) => {
-                                    if let DataType::Float32 = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Float32(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Float32(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                ParquetStatistics::Double(s) => {
-                                    if let DataType::Float64 = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Float64(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Float64(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                _ => {}
-                            }
-                        }
-                    }
+        // collect statistics for all partitions - we should really do this in parallel threads
+        for chunk in chunks {
+            let filenames: Vec<String> = chunk.iter().map(|x| x.to_string()).collect();
+            partitions.push(ParquetPartition::try_from_files(filenames, limit)?);

Review comment:
       The plan is to use `tokio::spawn` here but this requires making more methods async so I wanted to tackle that as a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove commented on pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897#issuecomment-901968496


   @yjshen I have changed this PR to a draft and will hold off on working on this for now and will review your PR when I have time - probably at the weekend. It looks like you are farther along than I was with this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897#discussion_r690734383



##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -582,14 +389,244 @@ impl ParquetExec {
 
 impl ParquetPartition {
     /// Create a new parquet partition
-    pub fn new(filenames: Vec<String>, statistics: Statistics) -> Self {
+    pub fn new(
+        filenames: Vec<String>,
+        schemas: Vec<SchemaRef>,
+        statistics: Statistics,
+    ) -> Self {
         Self {
             filenames,
+            schemas,
             statistics,
             metrics: ParquetPartitionMetrics::new(),
         }
     }
 
+    /// Create a new parquet partition by scanning a set of files
+    pub fn try_from_files(filenames: Vec<String>, limit: Option<usize>) -> Result<Self> {
+        let mut num_rows = 0;
+        let mut num_fields = 0;
+        let mut fields = Vec::new();
+        let mut total_byte_size = 0;
+        let mut null_counts = Vec::new();
+        let mut max_values: Vec<Option<MaxAccumulator>> = Vec::new();
+        let mut min_values: Vec<Option<MinAccumulator>> = Vec::new();
+        let mut total_files = 0;
+        let mut schemas: Vec<SchemaRef> = vec![];
+        for filename in &filenames {
+            total_files += 1;
+            let file = File::open(filename)?;
+            let file_reader = Arc::new(SerializedFileReader::new(file)?);
+            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+            let meta_data = arrow_reader.get_metadata();
+            // collect all the unique schemas in this data set
+            let schema = arrow_reader.get_schema()?;
+            if schemas.is_empty() || &schema != schemas[0].as_ref() {
+                fields = schema.fields().to_vec();
+                num_fields = schema.fields().len();
+                null_counts = vec![0; num_fields];
+                max_values = schema
+                    .fields()
+                    .iter()
+                    .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
+                    .collect::<Vec<_>>();
+                min_values = schema
+                    .fields()
+                    .iter()
+                    .map(|field| MinAccumulator::try_new(field.data_type()).ok())
+                    .collect::<Vec<_>>();
+                schemas.push(Arc::new(schema));
+            }
+
+            for row_group_meta in meta_data.row_groups() {
+                num_rows += row_group_meta.num_rows();
+                total_byte_size += row_group_meta.total_byte_size();
+
+                // Currently assumes every Parquet file has same schema
+                // https://issues.apache.org/jira/browse/ARROW-11017
+                let columns_null_counts = row_group_meta
+                    .columns()
+                    .iter()
+                    .flat_map(|c| c.statistics().map(|stats| stats.null_count()));
+
+                for (i, cnt) in columns_null_counts.enumerate() {
+                    null_counts[i] += cnt
+                }
+
+                for (i, column) in row_group_meta.columns().iter().enumerate() {
+                    if let Some(stat) = column.statistics() {
+                        match stat {
+                            ParquetStatistics::Boolean(s) => {
+                                if let DataType::Boolean = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Boolean(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Boolean(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Int32(s) => {
+                                if let DataType::Int32 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[ScalarValue::Int32(
+                                                Some(*s.max()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[ScalarValue::Int32(
+                                                Some(*s.min()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Int64(s) => {
+                                if let DataType::Int64 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[ScalarValue::Int64(
+                                                Some(*s.max()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[ScalarValue::Int64(
+                                                Some(*s.min()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Float(s) => {
+                                if let DataType::Float32 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Float32(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Float32(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Double(s) => {
+                                if let DataType::Float64 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Float64(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Float64(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            _ => {}
+                        }
+                    }
+                }
+
+                if limit.map(|x| num_rows >= x as i64).unwrap_or(false) {
+                    break;
+                }
+            }
+        }
+        let column_stats = (0..num_fields)
+            .map(|i| {
+                let max_value = match &max_values[i] {
+                    Some(max_value) => max_value.evaluate().ok(),
+                    None => None,
+                };
+                let min_value = match &min_values[i] {
+                    Some(min_value) => min_value.evaluate().ok(),
+                    None => None,
+                };
+                ColumnStatistics {
+                    null_count: Some(null_counts[i] as usize),
+                    max_value,
+                    min_value,
+                    distinct_count: None,
+                }
+            })
+            .collect();
+
+        let statistics = Statistics {
+            num_rows: Some(num_rows as usize),
+            total_byte_size: Some(total_byte_size as usize),
+            column_statistics: Some(column_stats),
+        };
+        // remove files that are not needed in case of limit
+        let mut filenames = filenames;
+        filenames.truncate(total_files);

Review comment:
       do you also need to truncate `schemas` as well?

##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -582,14 +389,244 @@ impl ParquetExec {
 
 impl ParquetPartition {
     /// Create a new parquet partition
-    pub fn new(filenames: Vec<String>, statistics: Statistics) -> Self {
+    pub fn new(
+        filenames: Vec<String>,

Review comment:
       I wonder if having a `Vec<FilenameAndSchema>` would be clearer than two parallel arrays (and then they could not get out of sync either)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove commented on pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897#issuecomment-903138891


   Closing this in favor of the work happening in https://github.com/apache/arrow-datafusion/pull/811


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897#discussion_r690974538



##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -582,14 +389,244 @@ impl ParquetExec {
 
 impl ParquetPartition {
     /// Create a new parquet partition
-    pub fn new(filenames: Vec<String>, statistics: Statistics) -> Self {
+    pub fn new(
+        filenames: Vec<String>,

Review comment:
       It seems like the `PartitionedFile` abstraction proposed by @yjshen  in https://github.com/apache/arrow-datafusion/pull/811/files#diff-72f3a52c56e83e00d8c605d461f092617a3c205619376bb373069c662f9cfc93R54 would help solve this problem?

##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -582,14 +389,244 @@ impl ParquetExec {
 
 impl ParquetPartition {
     /// Create a new parquet partition
-    pub fn new(filenames: Vec<String>, statistics: Statistics) -> Self {
+    pub fn new(
+        filenames: Vec<String>,
+        schemas: Vec<SchemaRef>,
+        statistics: Statistics,
+    ) -> Self {
         Self {
             filenames,
+            schemas,
             statistics,
             metrics: ParquetPartitionMetrics::new(),
         }
     }
 
+    /// Create a new parquet partition by scanning a set of files
+    pub fn try_from_files(filenames: Vec<String>, limit: Option<usize>) -> Result<Self> {
+        let mut num_rows = 0;
+        let mut num_fields = 0;
+        let mut fields = Vec::new();
+        let mut total_byte_size = 0;
+        let mut null_counts = Vec::new();
+        let mut max_values: Vec<Option<MaxAccumulator>> = Vec::new();
+        let mut min_values: Vec<Option<MinAccumulator>> = Vec::new();
+        let mut total_files = 0;
+        let mut schemas: Vec<SchemaRef> = vec![];
+        for filename in &filenames {
+            total_files += 1;
+            let file = File::open(filename)?;
+            let file_reader = Arc::new(SerializedFileReader::new(file)?);
+            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+            let meta_data = arrow_reader.get_metadata();
+            // collect all the unique schemas in this data set
+            let schema = arrow_reader.get_schema()?;
+            if schemas.is_empty() || &schema != schemas[0].as_ref() {
+                fields = schema.fields().to_vec();
+                num_fields = schema.fields().len();
+                null_counts = vec![0; num_fields];
+                max_values = schema
+                    .fields()
+                    .iter()
+                    .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
+                    .collect::<Vec<_>>();
+                min_values = schema
+                    .fields()
+                    .iter()
+                    .map(|field| MinAccumulator::try_new(field.data_type()).ok())
+                    .collect::<Vec<_>>();
+                schemas.push(Arc::new(schema));
+            }
+
+            for row_group_meta in meta_data.row_groups() {
+                num_rows += row_group_meta.num_rows();
+                total_byte_size += row_group_meta.total_byte_size();
+
+                // Currently assumes every Parquet file has same schema
+                // https://issues.apache.org/jira/browse/ARROW-11017
+                let columns_null_counts = row_group_meta
+                    .columns()
+                    .iter()
+                    .flat_map(|c| c.statistics().map(|stats| stats.null_count()));
+
+                for (i, cnt) in columns_null_counts.enumerate() {
+                    null_counts[i] += cnt
+                }
+
+                for (i, column) in row_group_meta.columns().iter().enumerate() {
+                    if let Some(stat) = column.statistics() {
+                        match stat {
+                            ParquetStatistics::Boolean(s) => {
+                                if let DataType::Boolean = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Boolean(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Boolean(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Int32(s) => {
+                                if let DataType::Int32 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[ScalarValue::Int32(
+                                                Some(*s.max()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[ScalarValue::Int32(
+                                                Some(*s.min()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Int64(s) => {
+                                if let DataType::Int64 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[ScalarValue::Int64(
+                                                Some(*s.max()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[ScalarValue::Int64(
+                                                Some(*s.min()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Float(s) => {
+                                if let DataType::Float32 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Float32(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Float32(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Double(s) => {
+                                if let DataType::Float64 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Float64(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Float64(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            _ => {}
+                        }
+                    }
+                }
+
+                if limit.map(|x| num_rows >= x as i64).unwrap_or(false) {
+                    break;
+                }
+            }
+        }
+        let column_stats = (0..num_fields)
+            .map(|i| {
+                let max_value = match &max_values[i] {
+                    Some(max_value) => max_value.evaluate().ok(),
+                    None => None,
+                };
+                let min_value = match &min_values[i] {
+                    Some(min_value) => min_value.evaluate().ok(),
+                    None => None,
+                };
+                ColumnStatistics {
+                    null_count: Some(null_counts[i] as usize),
+                    max_value,
+                    min_value,
+                    distinct_count: None,
+                }
+            })
+            .collect();
+
+        let statistics = Statistics {
+            num_rows: Some(num_rows as usize),
+            total_byte_size: Some(total_byte_size as usize),
+            column_statistics: Some(column_stats),
+        };
+        // remove files that are not needed in case of limit
+        let mut filenames = filenames;
+        filenames.truncate(total_files);

Review comment:
       looks like it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897#discussion_r690734383



##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -582,14 +389,244 @@ impl ParquetExec {
 
 impl ParquetPartition {
     /// Create a new parquet partition
-    pub fn new(filenames: Vec<String>, statistics: Statistics) -> Self {
+    pub fn new(
+        filenames: Vec<String>,
+        schemas: Vec<SchemaRef>,
+        statistics: Statistics,
+    ) -> Self {
         Self {
             filenames,
+            schemas,
             statistics,
             metrics: ParquetPartitionMetrics::new(),
         }
     }
 
+    /// Create a new parquet partition by scanning a set of files
+    pub fn try_from_files(filenames: Vec<String>, limit: Option<usize>) -> Result<Self> {
+        let mut num_rows = 0;
+        let mut num_fields = 0;
+        let mut fields = Vec::new();
+        let mut total_byte_size = 0;
+        let mut null_counts = Vec::new();
+        let mut max_values: Vec<Option<MaxAccumulator>> = Vec::new();
+        let mut min_values: Vec<Option<MinAccumulator>> = Vec::new();
+        let mut total_files = 0;
+        let mut schemas: Vec<SchemaRef> = vec![];
+        for filename in &filenames {
+            total_files += 1;
+            let file = File::open(filename)?;
+            let file_reader = Arc::new(SerializedFileReader::new(file)?);
+            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+            let meta_data = arrow_reader.get_metadata();
+            // collect all the unique schemas in this data set
+            let schema = arrow_reader.get_schema()?;
+            if schemas.is_empty() || &schema != schemas[0].as_ref() {
+                fields = schema.fields().to_vec();
+                num_fields = schema.fields().len();
+                null_counts = vec![0; num_fields];
+                max_values = schema
+                    .fields()
+                    .iter()
+                    .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
+                    .collect::<Vec<_>>();
+                min_values = schema
+                    .fields()
+                    .iter()
+                    .map(|field| MinAccumulator::try_new(field.data_type()).ok())
+                    .collect::<Vec<_>>();
+                schemas.push(Arc::new(schema));
+            }
+
+            for row_group_meta in meta_data.row_groups() {
+                num_rows += row_group_meta.num_rows();
+                total_byte_size += row_group_meta.total_byte_size();
+
+                // Currently assumes every Parquet file has same schema
+                // https://issues.apache.org/jira/browse/ARROW-11017
+                let columns_null_counts = row_group_meta
+                    .columns()
+                    .iter()
+                    .flat_map(|c| c.statistics().map(|stats| stats.null_count()));
+
+                for (i, cnt) in columns_null_counts.enumerate() {
+                    null_counts[i] += cnt
+                }
+
+                for (i, column) in row_group_meta.columns().iter().enumerate() {
+                    if let Some(stat) = column.statistics() {
+                        match stat {
+                            ParquetStatistics::Boolean(s) => {
+                                if let DataType::Boolean = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Boolean(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Boolean(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Int32(s) => {
+                                if let DataType::Int32 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[ScalarValue::Int32(
+                                                Some(*s.max()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[ScalarValue::Int32(
+                                                Some(*s.min()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Int64(s) => {
+                                if let DataType::Int64 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[ScalarValue::Int64(
+                                                Some(*s.max()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[ScalarValue::Int64(
+                                                Some(*s.min()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Float(s) => {
+                                if let DataType::Float32 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Float32(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Float32(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Double(s) => {
+                                if let DataType::Float64 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Float64(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Float64(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            _ => {}
+                        }
+                    }
+                }
+
+                if limit.map(|x| num_rows >= x as i64).unwrap_or(false) {
+                    break;
+                }
+            }
+        }
+        let column_stats = (0..num_fields)
+            .map(|i| {
+                let max_value = match &max_values[i] {
+                    Some(max_value) => max_value.evaluate().ok(),
+                    None => None,
+                };
+                let min_value = match &min_values[i] {
+                    Some(min_value) => min_value.evaluate().ok(),
+                    None => None,
+                };
+                ColumnStatistics {
+                    null_count: Some(null_counts[i] as usize),
+                    max_value,
+                    min_value,
+                    distinct_count: None,
+                }
+            })
+            .collect();
+
+        let statistics = Statistics {
+            num_rows: Some(num_rows as usize),
+            total_byte_size: Some(total_byte_size as usize),
+            column_statistics: Some(column_stats),
+        };
+        // remove files that are not needed in case of limit
+        let mut filenames = filenames;
+        filenames.truncate(total_files);

Review comment:
       do you also need to truncate `schemas` as well?

##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -582,14 +389,244 @@ impl ParquetExec {
 
 impl ParquetPartition {
     /// Create a new parquet partition
-    pub fn new(filenames: Vec<String>, statistics: Statistics) -> Self {
+    pub fn new(
+        filenames: Vec<String>,

Review comment:
       I wonder if having a `Vec<FilenameAndSchema>` would be clearer than two parallel arrays (and then they could not get out of sync either)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897#issuecomment-901308189


   Ah, seems I was addressing the same issue (parallel stats listing) pointed out by @rdettai here 😂: https://github.com/apache/arrow-datafusion/pull/811/files#diff-72f3a52c56e83e00d8c605d461f092617a3c205619376bb373069c662f9cfc93R189-R223


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897#issuecomment-901989326


   Thanks, @andygrove. It will be great to have your help. 
   
   As you mentioned here in the comment:
   
   > The plan is to use tokio::spawn here but this requires making more methods async so I wanted to tackle that as a separate PR.
   
   I run into the same situation while handling the async listing and reading. And we may need to decide on how async should be propagated through the API: limit async to remote storage accessing or change the user-facing API. Looking forward to hearing from you. :P


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen edited a comment on pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

Posted by GitBox <gi...@apache.org>.
yjshen edited a comment on pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897#issuecomment-901308189


   Sorry I didn't realize #896. 
   
   Actually, I was addressing the same issue (async and parallel parquet stats listing) pointed out by @rdettai here in #811: https://github.com/apache/arrow-datafusion/pull/811/files#diff-72f3a52c56e83e00d8c605d461f092617a3c205619376bb373069c662f9cfc93R189-R223, could you please take a look at this PR if you have time? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org