You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/12 22:25:00 UTC

[GitHub] [arrow-datafusion] pjmore opened a new pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

pjmore opened a new pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000


   # Which issue does this PR close?
   Partially closes #1999.
   
   
    # Rationale for this change
   Fix behaviour when querying only partitioning columns for parquet file format. 
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   Use row group level metadata to return the correct number of partition columns.
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r835982496



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -435,6 +448,50 @@ fn build_row_group_predicate(
     )
 }
 
+fn read_partition_no_file_columns(
+    object_store: &dyn ObjectStore,
+    partition: &[PartitionedFile],
+    batch_size: usize,
+    response_tx: Sender<ArrowResult<RecordBatch>>,
+    limit: Option<usize>,
+    mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+    use parquet::file::reader::FileReader;
+    let mut limit = limit.unwrap_or(usize::MAX);
+    for partitioned_file in partition {
+        let object_reader =
+            object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+        let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+        let mut file_rows: usize = file_reader
+            .metadata()
+            .file_metadata()
+            .num_rows()
+            .try_into()
+            .expect("Row count should always be greater than or equal to 0");
+        file_rows = limit.min(file_rows);
+        limit -= file_rows;
+        while file_rows >= batch_size {
+            send_result(
+                &response_tx,
+                partition_column_projector
+                    .project_empty(batch_size, &partitioned_file.partition_values),

Review comment:
       minor optimization, if it's sending the same recordbatch for a partition, we could create the record batch once at the beginning, then reuse it with `.clone()` for this loop and `.slide()` for the final send_result below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r829654497



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -236,32 +237,56 @@ impl ExecutionPlan for ParquetExec {
 
         let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
 
-        let join_handle = task::spawn_blocking(move || {
-            if let Err(e) = read_partition(
-                object_store.as_ref(),
-                adapter,
-                partition_index,
-                &partition,
-                metrics,
-                &projection,
-                &pruning_predicate,
-                batch_size,
-                response_tx.clone(),
-                limit,
-                partition_col_proj,
-            ) {
-                println!(
+        let join_handle = if projection.is_empty() {
+            task::spawn_blocking(move || {
+                if let Err(e) = read_partition_no_file_columns(
+                    object_store.as_ref(),
+                    &partition,
+                    batch_size,
+                    response_tx.clone(),

Review comment:
       Thanks for your patience and detailed explanation. I was wrong on this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] pjmore commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
pjmore commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r836013785



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -435,6 +448,50 @@ fn build_row_group_predicate(
     )
 }
 
+fn read_partition_no_file_columns(
+    object_store: &dyn ObjectStore,
+    partition: &[PartitionedFile],
+    batch_size: usize,
+    response_tx: Sender<ArrowResult<RecordBatch>>,
+    limit: Option<usize>,
+    mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+    use parquet::file::reader::FileReader;
+    let mut limit = limit.unwrap_or(usize::MAX);
+    for partitioned_file in partition {
+        let object_reader =
+            object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+        let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+        let mut file_rows: usize = file_reader
+            .metadata()
+            .file_metadata()
+            .num_rows()
+            .try_into()
+            .expect("Row count should always be greater than or equal to 0");
+        file_rows = limit.min(file_rows);
+        limit -= file_rows;
+        while file_rows >= batch_size {
+            send_result(
+                &response_tx,
+                partition_column_projector
+                    .project_empty(batch_size, &partitioned_file.partition_values),
+            )?;
+            file_rows -= batch_size;
+        }
+        if file_rows != 0 {
+            send_result(
+                &response_tx,
+                partition_column_projector
+                    .project_empty(batch_size, &partitioned_file.partition_values),

Review comment:
       Yeah, good catch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] pjmore commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
pjmore commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r833875526



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -446,6 +471,62 @@ fn build_row_group_predicate(
     }
 }
 
+fn read_partition_no_file_columns(

Review comment:
       Missed the file level metadata. I've reworked the function to use that, and it has simplified it quite a bit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r827672701



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -236,32 +237,56 @@ impl ExecutionPlan for ParquetExec {
 
         let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
 
-        let join_handle = task::spawn_blocking(move || {
-            if let Err(e) = read_partition(
-                object_store.as_ref(),
-                adapter,
-                partition_index,
-                &partition,
-                metrics,
-                &projection,
-                &pruning_predicate,
-                batch_size,
-                response_tx.clone(),
-                limit,
-                partition_col_proj,
-            ) {
-                println!(
+        let join_handle = if projection.is_empty() {
+            task::spawn_blocking(move || {
+                if let Err(e) = read_partition_no_file_columns(
+                    object_store.as_ref(),
+                    &partition,
+                    batch_size,
+                    response_tx.clone(),

Review comment:
       Let us pass down `pruning_predicate` and `metrics`, do row group level filtering in `read_partition_no_file_columns` since it's possible to eliminate the whole parquet file together with its partition column value when all its row groups fail the filter. At the same time, `limit` would also be affected.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r828721274



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -236,32 +237,56 @@ impl ExecutionPlan for ParquetExec {
 
         let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
 
-        let join_handle = task::spawn_blocking(move || {
-            if let Err(e) = read_partition(
-                object_store.as_ref(),
-                adapter,
-                partition_index,
-                &partition,
-                metrics,
-                &projection,
-                &pruning_predicate,
-                batch_size,
-                response_tx.clone(),
-                limit,
-                partition_col_proj,
-            ) {
-                println!(
+        let join_handle = if projection.is_empty() {
+            task::spawn_blocking(move || {
+                if let Err(e) = read_partition_no_file_columns(
+                    object_store.as_ref(),
+                    &partition,
+                    batch_size,
+                    response_tx.clone(),

Review comment:
       For `SELECT year,month,day FROM t where id > 0`, I think the id>0 filter would pass down to parquet scan even if it's not in the "select" list?
   
   
   Here's the output of `explain verbose ....`, I make changes to print pushed down filters in ParquetExec as well.
   ```
   +-------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type                                             | plan                                                                                                                                                                                                      |
   +-------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | initial_logical_plan                                  | Projection: #t.year, #t.month, #t.day                                                                                                                                                                     |
   |                                                       |   Filter: #t.id > Int64(0)                                                                                                                                                                                |
   |                                                       |     TableScan: t projection=None                                                                                                                                                                          |
   | logical_plan after simplify_expressions               | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | logical_plan after common_sub_expression_eliminate    | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | logical_plan after eliminate_limit                    | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | logical_plan after projection_push_down               | Projection: #t.year, #t.month, #t.day                                                                                                                                                                     |
   |                                                       |   Filter: #t.id > Int64(0)                                                                                                                                                                                |
   |                                                       |     TableScan: t projection=Some([0, 11, 12, 13])                                                                                                                                                         |
   | logical_plan after filter_push_down                   | Projection: #t.year, #t.month, #t.day                                                                                                                                                                     |
   |                                                       |   Filter: #t.id > Int64(0)                                                                                                                                                                                |
   |                                                       |     TableScan: t projection=Some([0, 11, 12, 13]), filters=[#t.id > Int64(0)]                                                                                                                             |
   | logical_plan after limit_push_down                    | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | logical_plan after SingleDistinctAggregationToGroupBy | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | logical_plan after ToApproxPerc                       | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | logical_plan after simplify_expressions               | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | logical_plan after common_sub_expression_eliminate    | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | logical_plan after eliminate_limit                    | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | logical_plan after projection_push_down               | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | logical_plan after filter_push_down                   | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | logical_plan after limit_push_down                    | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | logical_plan after SingleDistinctAggregationToGroupBy | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | logical_plan after ToApproxPerc                       | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | logical_plan                                          | Projection: #t.year, #t.month, #t.day                                                                                                                                                                     |
   |                                                       |   Filter: #t.id > Int64(0)                                                                                                                                                                                |
   |                                                       |     TableScan: t projection=Some([0, 11, 12, 13]), filters=[#t.id > Int64(0)]                                                                                                                             |
   | initial_physical_plan                                 | ProjectionExec: expr=[year@1 as year, month@2 as month, day@3 as day]                                                                                                                                     |
   |                                                       |   FilterExec: CAST(id@0 AS Int64) > 0                                                                                                                                                                     |
   |                                                       |     ParquetExec: limit=None, partitions=[year=2021/month=09/day=09/file.parquet, year=2021/month=10/day=09/file.parquet, year=2021/month=10/day=28/file.parquet], filters=CAST(id_max@0 AS Int64) > 0     |
   |                                                       |                                                                                                                                                                                                           |
   | physical_plan after aggregate_statistics              | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | physical_plan after hash_build_probe_order            | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | physical_plan after coalesce_batches                  | ProjectionExec: expr=[year@1 as year, month@2 as month, day@3 as day]                                                                                                                                     |
   |                                                       |   CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                             |
   |                                                       |     FilterExec: CAST(id@0 AS Int64) > 0                                                                                                                                                                   |
   |                                                       |       ParquetExec: limit=None, partitions=[year=2021/month=09/day=09/file.parquet, year=2021/month=10/day=09/file.parquet, year=2021/month=10/day=28/file.parquet], filters=CAST(id_max@0 AS Int64) > 0   |
   |                                                       |                                                                                                                                                                                                           |
   | physical_plan after repartition                       | ProjectionExec: expr=[year@1 as year, month@2 as month, day@3 as day]                                                                                                                                     |
   |                                                       |   CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                             |
   |                                                       |     FilterExec: CAST(id@0 AS Int64) > 0                                                                                                                                                                   |
   |                                                       |       RepartitionExec: partitioning=RoundRobinBatch(12)                                                                                                                                                   |
   |                                                       |         ParquetExec: limit=None, partitions=[year=2021/month=09/day=09/file.parquet, year=2021/month=10/day=09/file.parquet, year=2021/month=10/day=28/file.parquet], filters=CAST(id_max@0 AS Int64) > 0 |
   |                                                       |                                                                                                                                                                                                           |
   | physical_plan after add_merge_exec                    | SAME TEXT AS ABOVE                                                                                                                                                                                        |
   | physical_plan                                         | ProjectionExec: expr=[year@1 as year, month@2 as month, day@3 as day]                                                                                                                                     |
   |                                                       |   CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                             |
   |                                                       |     FilterExec: CAST(id@0 AS Int64) > 0                                                                                                                                                                   |
   |                                                       |       RepartitionExec: partitioning=RoundRobinBatch(12)                                                                                                                                                   |
   |                                                       |         ParquetExec: limit=None, partitions=[year=2021/month=09/day=09/file.parquet, year=2021/month=10/day=09/file.parquet, year=2021/month=10/day=28/file.parquet], filters=CAST(id_max@0 AS Int64) > 0 |
   |                                                       |                                                                                                                                                                                                           |
   +-------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] rdettai commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
rdettai commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r830852358



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -236,32 +237,56 @@ impl ExecutionPlan for ParquetExec {
 
         let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
 
-        let join_handle = task::spawn_blocking(move || {
-            if let Err(e) = read_partition(
-                object_store.as_ref(),
-                adapter,
-                partition_index,
-                &partition,
-                metrics,
-                &projection,
-                &pruning_predicate,
-                batch_size,
-                response_tx.clone(),
-                limit,
-                partition_col_proj,
-            ) {
-                println!(
+        let join_handle = if projection.is_empty() {

Review comment:
       can't we have this conditional within the spawn_blocking statement?

##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -446,6 +471,62 @@ fn build_row_group_predicate(
     }
 }
 
+fn read_partition_no_file_columns(

Review comment:
       You don't need to open all row group to get the number of rows in parquet, you have all this information in the footer. You should use `file_reader.metadata()` here. Once you do that, you can spare yourself the limit logic that is pretty verbose. You just iterate through all row groups in the metadata to count all the rows in the file, that's very cheap because the data structure is loaded to memory when the footer is parsed. This should simplify greatly this code path, then we can re-evaluate if we need to merge it with the one above or not 😉.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] pjmore commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
pjmore commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r835813180



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -435,6 +448,56 @@ fn build_row_group_predicate(
     )
 }
 
+fn read_partition_no_file_columns(
+    object_store: &dyn ObjectStore,
+    partition: &[PartitionedFile],
+    batch_size: usize,
+    response_tx: Sender<ArrowResult<RecordBatch>>,
+    mut limit: Option<usize>,
+    mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+    use parquet::file::reader::FileReader;
+    for partitioned_file in partition {
+        let object_reader =
+            object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+        let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+        let mut file_rows: usize = file_reader
+            .metadata()
+            .file_metadata()
+            .num_rows()
+            .try_into()
+            .expect("Row count should always be greater than or equal to 0");
+        let remaining_rows = limit.unwrap_or(usize::MAX);
+        if file_rows >= remaining_rows {
+            file_rows = remaining_rows;
+            limit = Some(0);
+        } else if let Some(remaining_limit) = &mut limit {
+            *remaining_limit -= file_rows;
+        }
+
+        while file_rows > batch_size {
+            send_result(
+                &response_tx,
+                partition_column_projector
+                    .project_empty(batch_size, &partitioned_file.partition_values),
+            )?;
+            file_rows -= batch_size;
+        }
+        if file_rows != 0 {
+            send_result(
+                &response_tx,
+                partition_column_projector
+                    .project_empty(batch_size, &partitioned_file.partition_values),
+            )?;
+        }
+
+        if limit == Some(0) {
+            break;
+        }
+    }
+    Ok(())
+}

Review comment:
        I couldn't find a good way to implement what you suggested. The error handling when opening the file was the main issue that I ran into. I couldn't figure out another way to short circuit when the limit was met and short circuit on any errors that occured. If you're okay scanning all of the partition files even on an error I'm okay with it, I just figured that for remote object stores that that might be a bad idea. 
   
   ```
       let mut res: Result<()> = Ok(());
       let mut batch_size_partition_iter = partition.iter() 
           .map(|partitioned_file|{
               let mut num_rows: usize = match object_store.file_reader(partitioned_file.file_meta.sized_file.clone()){
                   Ok(object_reader) => {
                       match SerializedFileReader::new(ChunkObjectReader(object_reader)){
                           Ok(file_reader) => {
                               file_reader
                                   .metadata()
                                   .file_metadata()
                                   .num_rows()
                                   .try_into()
                                   .expect("Row count should always be greater than or equal to 0 and less than usize::MAX")
                           },
                           Err(e) =>{
                               res = Err(e.into());
                               0
                           },
                       }
                   },
                   Err(e) => {
                       res = Err(e);
                       0
                   },
               };            
               num_rows = limit.min(num_rows); 
               limit -= num_rows;
               (num_rows, partitioned_file.partition_values.as_slice())
           })
           .take_while(|(num_rows, _)| *num_rows != 0)
           .flat_map(|(num_rows, partition_values)| BatchSizeIter::new(num_rows, batch_size).zip(std::iter::repeat(partition_values)));
           Iterator::try_for_each(&mut batch_size_partition_iter,|(batch_size, partition_values)| {
               send_result(&response_tx, partition_column_projector.project_empty(batch_size, partition_values))
           })?;
           res?;
           Ok(())
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] rdettai commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
rdettai commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r838628823



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -435,6 +448,56 @@ fn build_row_group_predicate(
     )
 }
 
+fn read_partition_no_file_columns(
+    object_store: &dyn ObjectStore,
+    partition: &[PartitionedFile],
+    batch_size: usize,
+    response_tx: Sender<ArrowResult<RecordBatch>>,
+    mut limit: Option<usize>,
+    mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+    use parquet::file::reader::FileReader;
+    for partitioned_file in partition {
+        let object_reader =
+            object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+        let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+        let mut file_rows: usize = file_reader
+            .metadata()
+            .file_metadata()
+            .num_rows()
+            .try_into()
+            .expect("Row count should always be greater than or equal to 0");
+        let remaining_rows = limit.unwrap_or(usize::MAX);
+        if file_rows >= remaining_rows {
+            file_rows = remaining_rows;
+            limit = Some(0);
+        } else if let Some(remaining_limit) = &mut limit {
+            *remaining_limit -= file_rows;
+        }
+
+        while file_rows > batch_size {
+            send_result(
+                &response_tx,
+                partition_column_projector
+                    .project_empty(batch_size, &partitioned_file.partition_values),
+            )?;
+            file_rows -= batch_size;
+        }
+        if file_rows != 0 {
+            send_result(
+                &response_tx,
+                partition_column_projector
+                    .project_empty(batch_size, &partitioned_file.partition_values),
+            )?;
+        }
+
+        if limit == Some(0) {
+            break;
+        }
+    }
+    Ok(())
+}

Review comment:
       Right, error management in iterators can quickly become annoying! Then I think the version with loop is fine for now. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] pjmore commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
pjmore commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r825497085



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -446,6 +473,78 @@ fn build_row_group_predicate(
     }
 }
 
+#[allow(clippy::too_many_arguments)]
+fn read_partition_no_file_columns(
+    object_store: &dyn ObjectStore,
+    partition_index: usize,
+    partition: &[PartitionedFile],
+    metrics: ExecutionPlanMetricsSet,
+    predicate_builder: &Option<PruningPredicate>,
+    batch_size: usize,
+    response_tx: Sender<ArrowResult<RecordBatch>>,
+    limit: Option<usize>,
+    mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+    let mut remaining_rows = limit.unwrap_or(usize::MAX);
+    for partitioned_file in partition {
+        let mut file_row_count = 0;
+        let file_metrics = ParquetFileMetrics::new(
+            partition_index,
+            &*partitioned_file.file_meta.path(),
+            &metrics,
+        );
+        let object_reader =
+            object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+        let mut file_reader =
+            SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+        if let Some(predicate_builder) = predicate_builder {

Review comment:
       Whoops that's right, I'll remove the filter/see if there are file level row counts I can use.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r825426463



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -236,32 +236,59 @@ impl ExecutionPlan for ParquetExec {
 
         let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
 
-        let join_handle = task::spawn_blocking(move || {
-            if let Err(e) = read_partition(
-                object_store.as_ref(),
-                adapter,
-                partition_index,
-                &partition,
-                metrics,
-                &projection,
-                &pruning_predicate,
-                batch_size,
-                response_tx.clone(),
-                limit,
-                partition_col_proj,
-            ) {
-                println!(
+        let join_handle = if projection.is_empty() {
+            task::spawn_blocking(move || {
+                if let Err(e) = read_partition_no_file_columns(
+                    object_store.as_ref(),
+                    partition_index,
+                    &partition,
+                    metrics,
+                    &pruning_predicate,
+                    batch_size,
+                    response_tx.clone(),
+                    limit,
+                    partition_col_proj,
+                ) {
+                    println!(

Review comment:
       Maybe use `error!("{}", msg);` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#issuecomment-1086834259


   Looks like it just needs some updating to resolve conflicts. @pjmore  I am happy to do so, let me know if you would like me to


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r838874385



##########
File path: datafusion/tests/path_partition.rs
##########
@@ -40,6 +40,63 @@ use datafusion::{
 };
 use futures::{stream, StreamExt};
 
+#[tokio::test]
+async fn parquet_distinct_partition_col() -> Result<()> {
+    let mut ctx = SessionContext::new();
+
+    register_partitioned_alltypes_parquet(
+        &mut ctx,
+        &[
+            "year=2021/month=09/day=09/file.parquet",
+            "year=2021/month=10/day=09/file.parquet",
+            "year=2021/month=10/day=28/file.parquet",
+        ],
+        &["year", "month", "day"],
+        "",
+        "alltypes_plain.parquet",
+    )
+    .await;
+
+    let result = ctx
+        .sql("SELECT distinct year,month,day FROM t")
+        .await?
+        .collect()
+        .await?;
+
+    let expected = vec![
+        "+------+-------+-----+",
+        "| year | month | day |",
+        "+------+-------+-----+",
+        "| 2021 | 09    | 09  |",
+        "| 2021 | 10    | 09  |",
+        "| 2021 | 10    | 28  |",
+        "+------+-------+-----+",
+    ];
+    assert_batches_sorted_eq!(expected, &result);
+
+    let actual_row_count: usize = ctx
+        .sql("SELECT id from t")
+        .await?
+        .collect()
+        .await?
+        .into_iter()
+        .map(|batch| batch.num_rows())
+        .sum();
+
+    let partition_row_count: usize = ctx

Review comment:
       Given there is special case handling for `LIMIT` perhaps we should also add a test that uses a limit, such as `LIMIT 1`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r830593962



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -446,6 +471,62 @@ fn build_row_group_predicate(
     }
 }
 
+fn read_partition_no_file_columns(

Review comment:
       I haven't looked carefully at this code, but it seems like this has a lot of duplication with `read_partition` -- I wonder if it is possible somehow to unify the code paths, perhaps by checking `projection.is_empty()` in `read_partition`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] pjmore commented on pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
pjmore commented on pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#issuecomment-1086922548


   @alamb  I had some extra test cases to add for the limit logic so I just fixed the conflicts then. Should be good to go now!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] pjmore commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
pjmore commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r833883389



##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -293,6 +293,34 @@ impl PartitionColumnProjector {
         }
     }
 
+    // Creates a RecordBatch with values from the partition_values. Used when no non-partition values are read
+    fn project_empty(
+        &mut self,
+        batch_size: usize,
+        partition_values: &[ScalarValue],
+    ) -> ArrowResult<RecordBatch> {
+        let expected_cols = self.projected_schema.fields().len();
+        if expected_cols != self.projected_partition_indexes.len() {
+            return Err(ArrowError::SchemaError(format!(
+                "Unexepected number of partition values, expected {} but got {}",
+                expected_cols,
+                partition_values.len()
+            )));
+        }
+        let mut cols = Vec::with_capacity(partition_values.len());

Review comment:
       Yeah we definitely can. I've updated it to a iterator version.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] pjmore commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
pjmore commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r829570286



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -236,32 +237,56 @@ impl ExecutionPlan for ParquetExec {
 
         let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
 
-        let join_handle = task::spawn_blocking(move || {
-            if let Err(e) = read_partition(
-                object_store.as_ref(),
-                adapter,
-                partition_index,
-                &partition,
-                metrics,
-                &projection,
-                &pruning_predicate,
-                batch_size,
-                response_tx.clone(),
-                limit,
-                partition_col_proj,
-            ) {
-                println!(
+        let join_handle = if projection.is_empty() {
+            task::spawn_blocking(move || {
+                if let Err(e) = read_partition_no_file_columns(
+                    object_store.as_ref(),
+                    &partition,
+                    batch_size,
+                    response_tx.clone(),

Review comment:
       From your example above note that the TableScan projects 4 columns and not 3. This means that datafusion is reading 4 columns from the table which is more than the number of partition columns.  
   ```
   Projection: #t.year, #t.month, #t.day                                                                                                                                                                     
      Filter: #t.id > Int64(0)                                                                                                                                                                                
        TableScan: t projection=Some([0, 11, 12, 13]), filters=[#t.id > Int64(0)]
   ```
   
   This is because datafusion knows that it needs to read the id column from the source to evaluate the query even if it doesn't exist in the final results. 0 is the id column while 11,12, and 13 are the virtual partition columns. This will never hit the code path that I've added since I check if the file projection is empty. This is calculated from the snippet below.
   ```
   let projection = match self.base_config.file_column_projection_indices() {
       Some(proj) => proj,
       None => (0..self.base_config.file_schema.fields().len()).collect(),
   };
   ```
   This projection for the example provided will be ```[0]``` and so will be executed using the existing code path. The additional code that I've added will only be executed for the example table provided above if the query only refers to the partition columns year, month, or day. If I modify the test in path_partition.rs to use this query ``` SELECT distinct year,month,day FROM t where id > 0 ``` and add some print statements inside the if statement that decides which function to use, the output is:
   ```
   Non-empty projection. using read_partition
   ```
   
   And if I revert the query back to  ``` SELECT distinct year,month,day FROM t ``` the output is:
   ```
   Empty projection. using read_partition_no_file_columns
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] pjmore commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
pjmore commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r829570286



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -236,32 +237,56 @@ impl ExecutionPlan for ParquetExec {
 
         let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
 
-        let join_handle = task::spawn_blocking(move || {
-            if let Err(e) = read_partition(
-                object_store.as_ref(),
-                adapter,
-                partition_index,
-                &partition,
-                metrics,
-                &projection,
-                &pruning_predicate,
-                batch_size,
-                response_tx.clone(),
-                limit,
-                partition_col_proj,
-            ) {
-                println!(
+        let join_handle = if projection.is_empty() {
+            task::spawn_blocking(move || {
+                if let Err(e) = read_partition_no_file_columns(
+                    object_store.as_ref(),
+                    &partition,
+                    batch_size,
+                    response_tx.clone(),

Review comment:
       From your example above note that the TableScan projects 4 columns and not 3. This means that datafusion is reading 4 columns from the table which is more than the number of partition columns.  
   ```
   Projection: #t.year, #t.month, #t.day                                                                                                                                                                     
      Filter: #t.id > Int64(0)                                                                                                                                                                                
        TableScan: t projection=Some([0, 11, 12, 13]), filters=[#t.id > Int64(0)]
   ```
   
   This is because datafusion knows that it needs to read the id column from the source to evaluate the query even if it doesn't exist in the final results. 0 is the id column while 11,12, and 13 are the virtual partition columns. This will never hit the code path that I've added since I check if the file projection is empty. This is calculated from the snippet below.
   ```
   let projection = match self.base_config.file_column_projection_indices() {
       Some(proj) => proj,
       None => (0..self.base_config.file_schema.fields().len()).collect(),
   };
   ```
   This projection for the example provided will be ```[0]``` and so will be executed using the existing code path. The additional code that I've added will only be executed for the example table provided above if the query only refers to the partition columns year, month, or day. If I modify the test in path_partition.rs to use this query ``` SELECT distinct year,month,day FROM t where id > 0 ``` and add some print statements inside the if statement that decides which function to use, the output is:
   ```
   Non-empty projection. using read_partition
   ```
   
   And if I revert the query back to  ``` SELECT distinct year,month,day FROM t ``` the output is:
   ```
   Empty projection. using read_partition_no_file_columns
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#issuecomment-1076748693


   I think this PR is waiting on responses to @rdettai 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r835982541



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -435,6 +448,50 @@ fn build_row_group_predicate(
     )
 }
 
+fn read_partition_no_file_columns(
+    object_store: &dyn ObjectStore,
+    partition: &[PartitionedFile],
+    batch_size: usize,
+    response_tx: Sender<ArrowResult<RecordBatch>>,
+    limit: Option<usize>,
+    mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+    use parquet::file::reader::FileReader;
+    let mut limit = limit.unwrap_or(usize::MAX);
+    for partitioned_file in partition {
+        let object_reader =
+            object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+        let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+        let mut file_rows: usize = file_reader
+            .metadata()
+            .file_metadata()
+            .num_rows()
+            .try_into()
+            .expect("Row count should always be greater than or equal to 0");
+        file_rows = limit.min(file_rows);
+        limit -= file_rows;
+        while file_rows >= batch_size {
+            send_result(
+                &response_tx,
+                partition_column_projector
+                    .project_empty(batch_size, &partitioned_file.partition_values),
+            )?;
+            file_rows -= batch_size;
+        }
+        if file_rows != 0 {
+            send_result(
+                &response_tx,
+                partition_column_projector
+                    .project_empty(batch_size, &partitioned_file.partition_values),

Review comment:
       do we not need to slice the recordbatch here to `file_rows`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] pjmore commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
pjmore commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r835813264



##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -293,6 +293,36 @@ impl PartitionColumnProjector {
         }
     }
 
+    // Creates a RecordBatch with values from the partition_values. Used when no non-partition values are read
+    fn project_empty(

Review comment:
       That's probably a better name project_empty doesn't really describe the method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] rdettai commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
rdettai commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r834021207



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -435,6 +448,56 @@ fn build_row_group_predicate(
     )
 }
 
+fn read_partition_no_file_columns(
+    object_store: &dyn ObjectStore,
+    partition: &[PartitionedFile],
+    batch_size: usize,
+    response_tx: Sender<ArrowResult<RecordBatch>>,
+    mut limit: Option<usize>,
+    mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+    use parquet::file::reader::FileReader;
+    for partitioned_file in partition {
+        let object_reader =
+            object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+        let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+        let mut file_rows: usize = file_reader
+            .metadata()
+            .file_metadata()
+            .num_rows()
+            .try_into()
+            .expect("Row count should always be greater than or equal to 0");
+        let remaining_rows = limit.unwrap_or(usize::MAX);
+        if file_rows >= remaining_rows {
+            file_rows = remaining_rows;
+            limit = Some(0);
+        } else if let Some(remaining_limit) = &mut limit {
+            *remaining_limit -= file_rows;
+        }

Review comment:
       Unwrapping the limit `Option` outside the loop would avoid having to mutate limit and decrease complexity a bit more.

##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -293,6 +293,36 @@ impl PartitionColumnProjector {
         }
     }
 
+    // Creates a RecordBatch with values from the partition_values. Used when no non-partition values are read
+    fn project_empty(

Review comment:
       what about `project_from_size` instead?

##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -435,6 +448,56 @@ fn build_row_group_predicate(
     )
 }
 
+fn read_partition_no_file_columns(
+    object_store: &dyn ObjectStore,
+    partition: &[PartitionedFile],
+    batch_size: usize,
+    response_tx: Sender<ArrowResult<RecordBatch>>,
+    mut limit: Option<usize>,
+    mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+    use parquet::file::reader::FileReader;
+    for partitioned_file in partition {
+        let object_reader =
+            object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+        let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+        let mut file_rows: usize = file_reader
+            .metadata()
+            .file_metadata()
+            .num_rows()
+            .try_into()
+            .expect("Row count should always be greater than or equal to 0");
+        let remaining_rows = limit.unwrap_or(usize::MAX);
+        if file_rows >= remaining_rows {
+            file_rows = remaining_rows;
+            limit = Some(0);
+        } else if let Some(remaining_limit) = &mut limit {
+            *remaining_limit -= file_rows;
+        }
+
+        while file_rows > batch_size {
+            send_result(
+                &response_tx,
+                partition_column_projector
+                    .project_empty(batch_size, &partitioned_file.partition_values),
+            )?;
+            file_rows -= batch_size;
+        }
+        if file_rows != 0 {
+            send_result(
+                &response_tx,
+                partition_column_projector
+                    .project_empty(batch_size, &partitioned_file.partition_values),
+            )?;
+        }
+
+        if limit == Some(0) {
+            break;
+        }
+    }
+    Ok(())
+}

Review comment:
       I still feel this could be simplified and made more readable by using more iterators: 
   - iterate over file
   - map them to their size
   - map each size to an iterator that repeats the batch size file_rows/batch_size times + residual
   - flat map the whole thing
   - apply limit with `take(limit)`
   - for_each(send)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#issuecomment-1073220057


   cc @rdettai 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r833691608



##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -293,6 +293,34 @@ impl PartitionColumnProjector {
         }
     }
 
+    // Creates a RecordBatch with values from the partition_values. Used when no non-partition values are read
+    fn project_empty(
+        &mut self,
+        batch_size: usize,
+        partition_values: &[ScalarValue],
+    ) -> ArrowResult<RecordBatch> {
+        let expected_cols = self.projected_schema.fields().len();
+        if expected_cols != self.projected_partition_indexes.len() {
+            return Err(ArrowError::SchemaError(format!(
+                "Unexepected number of partition values, expected {} but got {}",
+                expected_cols,
+                partition_values.len()
+            )));
+        }
+        let mut cols = Vec::with_capacity(partition_values.len());

Review comment:
       Small style suggestion: I think we can rewrite this to use `.collect()` on an iterator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] pjmore commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
pjmore commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r841259199



##########
File path: datafusion/tests/path_partition.rs
##########
@@ -40,6 +40,63 @@ use datafusion::{
 };
 use futures::{stream, StreamExt};
 
+#[tokio::test]
+async fn parquet_distinct_partition_col() -> Result<()> {
+    let mut ctx = SessionContext::new();
+
+    register_partitioned_alltypes_parquet(
+        &mut ctx,
+        &[
+            "year=2021/month=09/day=09/file.parquet",
+            "year=2021/month=10/day=09/file.parquet",
+            "year=2021/month=10/day=28/file.parquet",
+        ],
+        &["year", "month", "day"],
+        "",
+        "alltypes_plain.parquet",
+    )
+    .await;
+
+    let result = ctx
+        .sql("SELECT distinct year,month,day FROM t")
+        .await?
+        .collect()
+        .await?;
+
+    let expected = vec![
+        "+------+-------+-----+",
+        "| year | month | day |",
+        "+------+-------+-----+",
+        "| 2021 | 09    | 09  |",
+        "| 2021 | 10    | 09  |",
+        "| 2021 | 10    | 28  |",
+        "+------+-------+-----+",
+    ];
+    assert_batches_sorted_eq!(expected, &result);
+
+    let actual_row_count: usize = ctx
+        .sql("SELECT id from t")
+        .await?
+        .collect()
+        .await?
+        .into_iter()
+        .map(|batch| batch.num_rows())
+        .sum();
+
+    let partition_row_count: usize = ctx

Review comment:
       I've added some tests for this in the most recent commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r825426463



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -236,32 +236,59 @@ impl ExecutionPlan for ParquetExec {
 
         let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
 
-        let join_handle = task::spawn_blocking(move || {
-            if let Err(e) = read_partition(
-                object_store.as_ref(),
-                adapter,
-                partition_index,
-                &partition,
-                metrics,
-                &projection,
-                &pruning_predicate,
-                batch_size,
-                response_tx.clone(),
-                limit,
-                partition_col_proj,
-            ) {
-                println!(
+        let join_handle = if projection.is_empty() {
+            task::spawn_blocking(move || {
+                if let Err(e) = read_partition_no_file_columns(
+                    object_store.as_ref(),
+                    partition_index,
+                    &partition,
+                    metrics,
+                    &pruning_predicate,
+                    batch_size,
+                    response_tx.clone(),
+                    limit,
+                    partition_col_proj,
+                ) {
+                    println!(

Review comment:
       Maybe use `error!("{}", msg);` 

##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -446,6 +473,78 @@ fn build_row_group_predicate(
     }
 }
 
+#[allow(clippy::too_many_arguments)]
+fn read_partition_no_file_columns(
+    object_store: &dyn ObjectStore,
+    partition_index: usize,
+    partition: &[PartitionedFile],
+    metrics: ExecutionPlanMetricsSet,
+    predicate_builder: &Option<PruningPredicate>,
+    batch_size: usize,
+    response_tx: Sender<ArrowResult<RecordBatch>>,
+    limit: Option<usize>,
+    mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+    let mut remaining_rows = limit.unwrap_or(usize::MAX);
+    for partitioned_file in partition {
+        let mut file_row_count = 0;
+        let file_metrics = ParquetFileMetrics::new(
+            partition_index,
+            &*partitioned_file.file_meta.path(),
+            &metrics,
+        );
+        let object_reader =
+            object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+        let mut file_reader =
+            SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+        if let Some(predicate_builder) = predicate_builder {

Review comment:
       IMO, If we use `read_partition_no_file_columns `, no need to build any `build_row_group_predicate` and through all `row_groups`. If  wrong plz correct me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r825439911



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -236,32 +236,59 @@ impl ExecutionPlan for ParquetExec {
 
         let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
 
-        let join_handle = task::spawn_blocking(move || {
-            if let Err(e) = read_partition(
-                object_store.as_ref(),
-                adapter,
-                partition_index,
-                &partition,
-                metrics,
-                &projection,
-                &pruning_predicate,
-                batch_size,
-                response_tx.clone(),
-                limit,
-                partition_col_proj,
-            ) {
-                println!(
+        let join_handle = if projection.is_empty() {
+            task::spawn_blocking(move || {
+                if let Err(e) = read_partition_no_file_columns(
+                    object_store.as_ref(),
+                    partition_index,
+                    &partition,
+                    metrics,
+                    &pruning_predicate,
+                    batch_size,
+                    response_tx.clone(),
+                    limit,
+                    partition_col_proj,
+                ) {
+                    println!(

Review comment:
       ```suggestion
                       error!(
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] pjmore commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
pjmore commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r828493446



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -236,32 +237,56 @@ impl ExecutionPlan for ParquetExec {
 
         let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
 
-        let join_handle = task::spawn_blocking(move || {
-            if let Err(e) = read_partition(
-                object_store.as_ref(),
-                adapter,
-                partition_index,
-                &partition,
-                metrics,
-                &projection,
-                &pruning_predicate,
-                batch_size,
-                response_tx.clone(),
-                limit,
-                partition_col_proj,
-            ) {
-                println!(
+        let join_handle = if projection.is_empty() {
+            task::spawn_blocking(move || {
+                if let Err(e) = read_partition_no_file_columns(
+                    object_store.as_ref(),
+                    &partition,
+                    batch_size,
+                    response_tx.clone(),

Review comment:
        Row group level filtering is not required. Since only partition columns are queried there will be no filters which will operate on the columns contained within the parquet file. The file list will already be pruned due to the listing logic in ListingTable.  Similarly the ParquetFileMetrics only contain metrics about the evaluation of row group predicates evaluation and filtering I could add the metrics back in but they won't be used for anything.
   ```
   struct ParquetFileMetrics {
       /// Number of times the predicate could not be evaluated
       pub predicate_evaluation_errors: metrics::Count,
       /// Number of row groups pruned using
       pub row_groups_pruned: metrics::Count,
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #2000: Fixed parquet path partitioning when only selecting partitioned columns

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r829654497



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -236,32 +237,56 @@ impl ExecutionPlan for ParquetExec {
 
         let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
 
-        let join_handle = task::spawn_blocking(move || {
-            if let Err(e) = read_partition(
-                object_store.as_ref(),
-                adapter,
-                partition_index,
-                &partition,
-                metrics,
-                &projection,
-                &pruning_predicate,
-                batch_size,
-                response_tx.clone(),
-                limit,
-                partition_col_proj,
-            ) {
-                println!(
+        let join_handle = if projection.is_empty() {
+            task::spawn_blocking(move || {
+                if let Err(e) = read_partition_no_file_columns(
+                    object_store.as_ref(),
+                    &partition,
+                    batch_size,
+                    response_tx.clone(),

Review comment:
       Thanks for your patience and detailed explanation. I was wrong on this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org