You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/10/24 17:48:04 UTC

[GitHub] [iceberg] ahshahid opened a new issue, #6039: Perf enhancement by leveraging Dynamic Partition Pruning rule of spark for non partition columns used as join condition

ahshahid opened a new issue, #6039:
URL: https://github.com/apache/iceberg/issues/6039

   Spark has Partition Pruning rule which under right condition can fetch all the join keys of one side of the table,  and pass it as an In Clause filter to other table.
   
   For eg if the query is select * from  Table1 joinTable2  on  Table1. A =  Table2.B where …..
   
   In this case depending upon the stats / cardinality ,  the DPP rule may exeute a query like
   
   select  distinct A from Table1  
   
   such that it has all the unique keys  of Column A from Table1, which can be passed as an In filter to Table2.
   
   Thus on Table2 , there can be a Filter  B IN ( unique values of A………)
   
   Spark provides an Interface SupportsRuntimeFilter which if implemented  by the underlying Datasource , can pass all the columns which the Datasource can do to prune the  row space.
   Though the name of the rule is PartitionPruning rule, but spark is agnostic to whether the columns passed by data source have to be partitioned column or not.
   In case of iceberg ( if table is using parquet format , at least, not sure yet if it applies to orc or avro as I have  not yet investigated those 2 formats), if the column type is comparable ( at least for numerical types) we have info of max/min.
   These can be utilized to prune the row space further by skipping row groups, even if the column is not a partitioning column.
   
   Iceberg currently provides only partitioning columns as part of SupportsRuntimeFilter. This can be augmented to non partitioning columns too.
   
   Since iceberg storage format , at least if it is parquet ( not sure about avro or orc yet, afaik) , implies that for some column types ( at least numerical I suppose, though may be string),  stats are available at manifest level and Row Group level.
   
    
   Implementation idea:
   Lets assume col A & col B are Int.
   
   and unique values of Col B obtained by spark’s DPP query is say ( 10, 17, 18, 1, 5, 4)
   
   We can create a SortedSet of these values 1, 4, 5, 10, 17, 18
   
   Iceberg and parquet, invoke the filters passed , at multiple levels..
   
   Like, at time of manifest reading, Row Group reading etc.
   
   At such places, the “iceberg & parquet” code base, invoke the Filters ( via Visitor pattern) , passing data objects from which min & max values of the column are available.
   
   Thus say if the min passed is 3, and max passed is 6 for  column A,
   
   we can obtain a tailSet starting from (3) on the SortedSet.
   
   Open iterator on tail set,
   
   if iter.next exists and  its value is <= max, that means the File/RowGroup ( or whatever chunk for which the min/max are present) needs to be considered. else it can be ignored
   
   so here iter.next will be true & its value is 4 .. since 4 <= 6 , the block / chunk/ RowGroup will be read.
   
    
   
   This is the general idea.
   
   For this I have extended  BoundSetPredicate and created a new BoundRangeInPredicate
   
   At some point , I think one of them should suffice.
   
   For now I have kept it separate.
   
   Reasons :
   
   1) BoundSetPredicate does not keep sorted data . It keeps it as a Set..
   
   The places where it needs to do checks on lower Bound , uppper Bound, it uses comparator to sort on each call. This can be avoided if the set was sorted in the first place.
   
    
   
   2) The Parquet would convert the BoundSetPredicate to In Filter.  This would be evaluated on each row  too.
   
   But in our case , looking up the Set for each row would be expensive, because it is a sorted set and in any case it will do look up in Joining HashSet anyways. So no point in doing 2 lookups. With a UserDefined Filter , per row check can be skipped.
   
   But this needs to be looked into little more detail, as it appears that parquet would do pruning at page store level too.  and there is no callback recieved by filter at Page Store level. The filtering done at Page level is sort of tied to filter implementation in parquet, while the RangeIn filter which I created is user defined.
   
    
   
   I will be opening a PR in some time for  this, as I need to add tests and run tpcds benchmarks ( though as such code wise it is ready)
   
   But I would solicit your feedback as to what you all think?
   
   If this looks good, then I also have in mind to see the perf impact in case of general broadcast hash joins, where the joining key is  int, short etc , so that the hashkeys as sorted set can be passed to the stream side before opening the iterator and can be used to skip row groups ( or may be page level too)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] github-actions[bot] closed issue #6039: Spark : Perf enhancement by leveraging Dynamic Partition Pruning rule of spark for non partition columns used as join condition

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed issue #6039: Spark : Perf enhancement by leveraging Dynamic Partition Pruning rule of spark for non partition columns used as join condition
URL: https://github.com/apache/iceberg/issues/6039


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ahshahid commented on issue #6039: Spark : Perf enhancement by leveraging Dynamic Partition Pruning rule of spark for non partition columns used as join condition

Posted by GitBox <gi...@apache.org>.
ahshahid commented on issue #6039:
URL: https://github.com/apache/iceberg/issues/6039#issuecomment-1305869163

   @rdblue @aokolnychyi  is there any particular reason why colStats in TableContext is by default false ? With this flag false, for non partition cols the bounds are not being written in manifest file, which otherwise could be used for pruning at manifest level in case of pushed filters?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] github-actions[bot] commented on issue #6039: Spark : Perf enhancement by leveraging Dynamic Partition Pruning rule of spark for non partition columns used as join condition

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #6039:
URL: https://github.com/apache/iceberg/issues/6039#issuecomment-1599760544

   This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on issue #6039: Spark : Perf enhancement by leveraging Dynamic Partition Pruning rule of spark for non partition columns used as join condition

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on issue #6039:
URL: https://github.com/apache/iceberg/issues/6039#issuecomment-1292879652

   I had a proposal in similar direction : 
    * https://github.com/apache/iceberg/issues/4188
   
   It will be really nice to have to this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ahshahid commented on issue #6039: Spark : Perf enhancement by leveraging Dynamic Partition Pruning rule of spark for non partition columns used as join condition

Posted by GitBox <gi...@apache.org>.
ahshahid commented on issue #6039:
URL: https://github.com/apache/iceberg/issues/6039#issuecomment-1303883448

   Some update:
   For tpcds query with limited data and enabling stats at manifest level for non partition cols, still does not improve perf.. the cost of dpp query is pretty high, especially for queries 14a, 14b of tpcd.
   But there is one thing which I am going to try is:
   1) For non partition columns pruning, we do not need exact value of join keys in DPP. So I am going to modify the spark dpp query for non partitioning columns, to fetch max & min.  I am hoping that spark-iceberg code optimizes max/min queries by computing the answer using only the stats at manifest file level.. If so , this should reduce the cost of dpp query & still allow pruning on range at various levels in iceberg...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] github-actions[bot] commented on issue #6039: Spark : Perf enhancement by leveraging Dynamic Partition Pruning rule of spark for non partition columns used as join condition

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #6039:
URL: https://github.com/apache/iceberg/issues/6039#issuecomment-1537249684

   This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ahshahid commented on issue #6039: Spark : Perf enhancement by leveraging Dynamic Partition Pruning rule of spark for non partition columns used as join condition

Posted by GitBox <gi...@apache.org>.
ahshahid commented on issue #6039:
URL: https://github.com/apache/iceberg/issues/6039#issuecomment-1293878662

   I have a PR locally ready, doing some perf testing & general testing... will open an upstream PR soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ahshahid commented on issue #6039: Spark : Perf enhancement by leveraging Dynamic Partition Pruning rule of spark for non partition columns used as join condition

Posted by GitBox <gi...@apache.org>.
ahshahid commented on issue #6039:
URL: https://github.com/apache/iceberg/issues/6039#issuecomment-1305877706

   > Some update: For tpcds query with limited data and enabling stats at manifest level for non partition cols, still does not improve perf.. the cost of dpp query is pretty high, especially for queries 14a, 14b of tpcd. But there is one thing which I am going to try is:
   > 
   > 1. For non partition columns pruning, we do not need exact value of join keys in DPP. So I am going to modify the spark dpp query for non partitioning columns, to fetch max & min.  I am hoping that spark-iceberg code optimizes max/min queries by computing the answer using only the stats at manifest file level.. If so , this should reduce the cost of dpp query & still allow pruning on range at various levels in iceberg...
   
   I have been given to understand that there is no such mechanism in spark- DataSourceV2 to tell the DataSource to evaluate max/min using stats if available. So I will work part time to get a prototypical change to get the max/min directly from iceberg for dpp in these cases and see its impact on perf..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ahshahid commented on issue #6039: Spark : Perf enhancement by leveraging Dynamic Partition Pruning rule of spark for non partition columns used as join condition

Posted by GitBox <gi...@apache.org>.
ahshahid commented on issue #6039:
URL: https://github.com/apache/iceberg/issues/6039#issuecomment-1301385359

   I am still working on perf aspect.. In a limited tpcds test , it turns out that dpp query is way expensive.. But interestingly I noticed, that though the manifest file contains bounds information for non partition column, they are not being read / used ( may be bcox tableContext has boolean colStats as hardcoded false.. still going through the code.
   I want to check the impact on perf, if these stats are available at manifest level for non partitioning col..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org