You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by godfrey he <go...@gmail.com> on 2022/07/08 14:12:05 UTC

[DISCUSS] FLIP-248: Introduce dynamic partition pruning

Hi all,

I would like to open a discussion on FLIP-248: Introduce dynamic
partition pruning.

 Currently, Flink supports static partition pruning: the conditions in
the WHERE clause are analyzed
to determine in advance which partitions can be safely skipped in the
optimization phase.
Another common scenario: the partitions information is not available
in the optimization phase but in the execution phase.
That's the problem this FLIP is trying to solve: dynamic partition
pruning, which could reduce the partition table source IO.

The query pattern looks like:
select * from store_returns, date_dim where sr_returned_date_sk =
d_date_sk and d_year = 2000

We will introduce a mechanism for detecting dynamic partition pruning
patterns in optimization phase
and performing partition pruning at runtime by sending the dimension
table results to the SplitEnumerator
of fact table via existing coordinator mechanism.

You can find more details in FLIP-248 document[1].
Looking forward to your any feedback.

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
[2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248


Best,
Godfrey

Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning

Posted by Jingsong Li <ji...@gmail.com>.
Thanks for the update.

Looks good to me.

Best,
Jingsong

On Tue, Jul 26, 2022 at 3:21 PM Jark Wu <im...@gmail.com> wrote:
>
> Thank you Godfrey, the FLIP looks good to me.
>
> Best,
> Jark
>
> On Tue, 26 Jul 2022 at 12:34, godfrey he <go...@gmail.com> wrote:
>
> > Thanks for all the inputs, I have updated the document and POC code.
> >
> >
> > Best,
> > Godfrey
> >
> > Yun Gao <yu...@aliyun.com.invalid> 于2022年7月26日周二 11:11写道:
> > >
> > > Hi,
> > >
> > > Thanks all for all the valuable discussion on this FLIP, +1 for
> > implementing
> > > dynamic partition pruning / dynamic filtering pushdown since it is a key
> > optimization
> > > to improve the performance on batch processing.
> > >
> > > Also due to introducing the speculative execution for the batch
> > processing, we
> > > might also need some consideration for the case with speculative
> > execution enabled:
> > > 1. The operator coordinator of DynamicFilteringDataCollector should
> > ignore the following
> > > filtering data in consider of the task might executes for multiple
> > attempts.
> > > 2. The DynamicFileSplitEnumerator should also implements the
> > `SupportsHandleExecutionAttemptSourceEvent`
> > > interface, otherwise it would throws exception when received the
> > filtering data source event.
> > >
> > > Best,
> > > Yun Gao
> > >
> > >
> > >
> > > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > >
> > >
> > >
> > > ------------------------------------------------------------------
> > > From:Jing Ge <ji...@ververica.com>
> > > Send Time:2022 Jul. 21 (Thu.) 18:56
> > > To:dev <de...@flink.apache.org>
> > > Subject:Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning
> > >
> > > Hi,
> > >
> > > Thanks for the informative discussion! Looking forward to using dynamic
> > > filtering provided by Flink.
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Tue, Jul 19, 2022 at 3:22 AM godfrey he <go...@gmail.com> wrote:
> > >
> > > > Hi, Jingong, Jark, Jing,
> > > >
> > > > Thanks for for the important inputs.
> > > > Lake storage is a very important scenario, and consider more generic
> > > > and extended case,
> > > > I also would like to use "dynamic filtering" concept instead of
> > > > "dynamic partition".
> > > >
> > > > >maybe the FLIP should also demonstrate the EXPLAIN result, which
> > > > is also an API.
> > > > I will add a section to describe the EXPLAIN result.
> > > >
> > > > >Does DPP also support streaming queries?
> > > > Yes, but for bounded source.
> > > >
> > > > >it requires the SplitEnumerator must implements new introduced
> > > > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > > > +1
> > > >
> > > > I will update the document and the poc code.
> > > >
> > > > Best,
> > > > Godfrey
> > > >
> > > > Jing Zhang <be...@gmail.com> 于2022年7月13日周三 20:22写道:
> > > > >
> > > > > Hi Godfrey,
> > > > > Thanks for driving this discussion.
> > > > > This is an important improvement for batch sql jobs.
> > > > > I agree with Jingsong to expand the capability to more than just
> > > > partitions.
> > > > > Besides, I have two points:
> > > > > 1. Based on FLIP-248[1],
> > > > >
> > > > > > Dynamic partition pruning mechanism can improve performance by
> > avoiding
> > > > > > reading large amounts of irrelevant data, and it works for both
> > batch
> > > > and
> > > > > > streaming queries.
> > > > >
> > > > > Does DPP also support streaming queries?
> > > > > It seems the proposed changes in the FLIP-248 does not work for
> > streaming
> > > > > queries,
> > > > > because the dimension table might be an unbounded inputs.
> > > > > Or does it require all dimension tables to be bounded inputs for
> > > > streaming
> > > > > jobs if the job wanna enable DPP?
> > > > >
> > > > > 2. I notice there are changes on SplitEnumerator for Hive source and
> > File
> > > > > source.
> > > > > And they now depend on SourceEvent to pass PartitionData.
> > > > > In FLIP-245, if enable speculative execution for sources based on
> > FLIP-27
> > > > > which use SourceEvent,
> > > > > it requires the SplitEnumerator must implements new introduced
> > > > > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > > > > otherwise an exception would be thrown out.
> > > > > Since hive and File sources are commonly used for batch jobs, it's
> > better
> > > > > to take this point into consideration.
> > > > >
> > > > > Best,
> > > > > Jing Zhang
> > > > >
> > > > > [1] FLIP-248:
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > > [2] FLIP-245:
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> > > > >
> > > > >
> > > > > Jark Wu <im...@gmail.com> 于2022年7月12日周二 13:16写道:
> > > > >
> > > > > > I agree with Jingsong. DPP is a particular case of Dynamic Filter
> > > > Pushdown
> > > > > > that the join key contains partition fields.  Extending this FLIP
> > to
> > > > > > general filter
> > > > > > pushdown can benefit more optimizations, and they can share the
> > same
> > > > > > interface.
> > > > > >
> > > > > > For example, Trino Hive Connector leverages dynamic filtering to
> > > > support:
> > > > > > - dynamic partition pruning for partitioned tables
> > > > > > - and dynamic bucket pruning for bucket tables
> > > > > > - and dynamic filter pushed into the ORC and Parquet readers to
> > perform
> > > > > > stripe
> > > > > >   or row-group pruning and save on disk I/O.
> > > > > >
> > > > > > Therefore, +1 to extend this FLIP to Dynamic Filter Pushdown (or
> > > > Dynamic
> > > > > > Filtering),
> > > > > > just like Trino [1].  The interfaces should also be adapted for
> > that.
> > > > > >
> > > > > > Besides, maybe the FLIP should also demonstrate the EXPLAIN result,
> > > > which
> > > > > > is also an API.
> > > > > >
> > > > > > Best,
> > > > > > Jark
> > > > > >
> > > > > > [1]: https://trino.io/docs/current/admin/dynamic-filtering.html
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, 12 Jul 2022 at 09:59, Jingsong Li <ji...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Thanks Godfrey for driving.
> > > > > > >
> > > > > > > I like this FLIP.
> > > > > > >
> > > > > > > We can restrict this capability to more than just partitions.
> > > > > > > Here are some inputs from Lake Storage.
> > > > > > >
> > > > > > > The format of the splits generated by Lake Storage is roughly as
> > > > follows:
> > > > > > > Split {
> > > > > > >    Path filePath;
> > > > > > >    Statistics[] fieldStats;
> > > > > > > }
> > > > > > >
> > > > > > > Stats contain the min and max of each column.
> > > > > > >
> > > > > > > If the storage is sorted by a column, this means that the split
> > > > > > > filtering on that column will be very good, so not only the
> > partition
> > > > > > > field, but also this column is worthy of being pushed down the
> > > > > > > RuntimeFilter.
> > > > > > > This information can only be known by source, so I suggest that
> > > > source
> > > > > > > return which fields are worthy of being pushed down.
> > > > > > >
> > > > > > > My overall point is:
> > > > > > > This FLIP can be extended to support Source Runtime Filter
> > push-down
> > > > > > > for all fields, not just dynamic partition pruning.
> > > > > > >
> > > > > > > What do you think?
> > > > > > >
> > > > > > > Best,
> > > > > > > Jingsong
> > > > > > >
> > > > > > > On Fri, Jul 8, 2022 at 10:12 PM godfrey he <go...@gmail.com>
> > > > wrote:
> > > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I would like to open a discussion on FLIP-248: Introduce
> > dynamic
> > > > > > > > partition pruning.
> > > > > > > >
> > > > > > > >  Currently, Flink supports static partition pruning: the
> > > > conditions in
> > > > > > > > the WHERE clause are analyzed
> > > > > > > > to determine in advance which partitions can be safely skipped
> > in
> > > > the
> > > > > > > > optimization phase.
> > > > > > > > Another common scenario: the partitions information is not
> > > > available
> > > > > > > > in the optimization phase but in the execution phase.
> > > > > > > > That's the problem this FLIP is trying to solve: dynamic
> > partition
> > > > > > > > pruning, which could reduce the partition table source IO.
> > > > > > > >
> > > > > > > > The query pattern looks like:
> > > > > > > > select * from store_returns, date_dim where
> > sr_returned_date_sk =
> > > > > > > > d_date_sk and d_year = 2000
> > > > > > > >
> > > > > > > > We will introduce a mechanism for detecting dynamic partition
> > > > pruning
> > > > > > > > patterns in optimization phase
> > > > > > > > and performing partition pruning at runtime by sending the
> > > > dimension
> > > > > > > > table results to the SplitEnumerator
> > > > > > > > of fact table via existing coordinator mechanism.
> > > > > > > >
> > > > > > > > You can find more details in FLIP-248 document[1].
> > > > > > > > Looking forward to your any feedback.
> > > > > > > >
> > > > > > > > [1]
> > > > > > >
> > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > > > > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Godfrey
> > > > > > >
> > > > > >SupportsHandleExecutionAttemptSourceEvent>
> > >
> >

Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning

Posted by Jark Wu <im...@gmail.com>.
Thank you Godfrey, the FLIP looks good to me.

Best,
Jark

On Tue, 26 Jul 2022 at 12:34, godfrey he <go...@gmail.com> wrote:

> Thanks for all the inputs, I have updated the document and POC code.
>
>
> Best,
> Godfrey
>
> Yun Gao <yu...@aliyun.com.invalid> 于2022年7月26日周二 11:11写道:
> >
> > Hi,
> >
> > Thanks all for all the valuable discussion on this FLIP, +1 for
> implementing
> > dynamic partition pruning / dynamic filtering pushdown since it is a key
> optimization
> > to improve the performance on batch processing.
> >
> > Also due to introducing the speculative execution for the batch
> processing, we
> > might also need some consideration for the case with speculative
> execution enabled:
> > 1. The operator coordinator of DynamicFilteringDataCollector should
> ignore the following
> > filtering data in consider of the task might executes for multiple
> attempts.
> > 2. The DynamicFileSplitEnumerator should also implements the
> `SupportsHandleExecutionAttemptSourceEvent`
> > interface, otherwise it would throws exception when received the
> filtering data source event.
> >
> > Best,
> > Yun Gao
> >
> >
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> >
> >
> >
> > ------------------------------------------------------------------
> > From:Jing Ge <ji...@ververica.com>
> > Send Time:2022 Jul. 21 (Thu.) 18:56
> > To:dev <de...@flink.apache.org>
> > Subject:Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning
> >
> > Hi,
> >
> > Thanks for the informative discussion! Looking forward to using dynamic
> > filtering provided by Flink.
> >
> > Best regards,
> > Jing
> >
> > On Tue, Jul 19, 2022 at 3:22 AM godfrey he <go...@gmail.com> wrote:
> >
> > > Hi, Jingong, Jark, Jing,
> > >
> > > Thanks for for the important inputs.
> > > Lake storage is a very important scenario, and consider more generic
> > > and extended case,
> > > I also would like to use "dynamic filtering" concept instead of
> > > "dynamic partition".
> > >
> > > >maybe the FLIP should also demonstrate the EXPLAIN result, which
> > > is also an API.
> > > I will add a section to describe the EXPLAIN result.
> > >
> > > >Does DPP also support streaming queries?
> > > Yes, but for bounded source.
> > >
> > > >it requires the SplitEnumerator must implements new introduced
> > > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > > +1
> > >
> > > I will update the document and the poc code.
> > >
> > > Best,
> > > Godfrey
> > >
> > > Jing Zhang <be...@gmail.com> 于2022年7月13日周三 20:22写道:
> > > >
> > > > Hi Godfrey,
> > > > Thanks for driving this discussion.
> > > > This is an important improvement for batch sql jobs.
> > > > I agree with Jingsong to expand the capability to more than just
> > > partitions.
> > > > Besides, I have two points:
> > > > 1. Based on FLIP-248[1],
> > > >
> > > > > Dynamic partition pruning mechanism can improve performance by
> avoiding
> > > > > reading large amounts of irrelevant data, and it works for both
> batch
> > > and
> > > > > streaming queries.
> > > >
> > > > Does DPP also support streaming queries?
> > > > It seems the proposed changes in the FLIP-248 does not work for
> streaming
> > > > queries,
> > > > because the dimension table might be an unbounded inputs.
> > > > Or does it require all dimension tables to be bounded inputs for
> > > streaming
> > > > jobs if the job wanna enable DPP?
> > > >
> > > > 2. I notice there are changes on SplitEnumerator for Hive source and
> File
> > > > source.
> > > > And they now depend on SourceEvent to pass PartitionData.
> > > > In FLIP-245, if enable speculative execution for sources based on
> FLIP-27
> > > > which use SourceEvent,
> > > > it requires the SplitEnumerator must implements new introduced
> > > > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > > > otherwise an exception would be thrown out.
> > > > Since hive and File sources are commonly used for batch jobs, it's
> better
> > > > to take this point into consideration.
> > > >
> > > > Best,
> > > > Jing Zhang
> > > >
> > > > [1] FLIP-248:
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > [2] FLIP-245:
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> > > >
> > > >
> > > > Jark Wu <im...@gmail.com> 于2022年7月12日周二 13:16写道:
> > > >
> > > > > I agree with Jingsong. DPP is a particular case of Dynamic Filter
> > > Pushdown
> > > > > that the join key contains partition fields.  Extending this FLIP
> to
> > > > > general filter
> > > > > pushdown can benefit more optimizations, and they can share the
> same
> > > > > interface.
> > > > >
> > > > > For example, Trino Hive Connector leverages dynamic filtering to
> > > support:
> > > > > - dynamic partition pruning for partitioned tables
> > > > > - and dynamic bucket pruning for bucket tables
> > > > > - and dynamic filter pushed into the ORC and Parquet readers to
> perform
> > > > > stripe
> > > > >   or row-group pruning and save on disk I/O.
> > > > >
> > > > > Therefore, +1 to extend this FLIP to Dynamic Filter Pushdown (or
> > > Dynamic
> > > > > Filtering),
> > > > > just like Trino [1].  The interfaces should also be adapted for
> that.
> > > > >
> > > > > Besides, maybe the FLIP should also demonstrate the EXPLAIN result,
> > > which
> > > > > is also an API.
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > > [1]: https://trino.io/docs/current/admin/dynamic-filtering.html
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Tue, 12 Jul 2022 at 09:59, Jingsong Li <ji...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Thanks Godfrey for driving.
> > > > > >
> > > > > > I like this FLIP.
> > > > > >
> > > > > > We can restrict this capability to more than just partitions.
> > > > > > Here are some inputs from Lake Storage.
> > > > > >
> > > > > > The format of the splits generated by Lake Storage is roughly as
> > > follows:
> > > > > > Split {
> > > > > >    Path filePath;
> > > > > >    Statistics[] fieldStats;
> > > > > > }
> > > > > >
> > > > > > Stats contain the min and max of each column.
> > > > > >
> > > > > > If the storage is sorted by a column, this means that the split
> > > > > > filtering on that column will be very good, so not only the
> partition
> > > > > > field, but also this column is worthy of being pushed down the
> > > > > > RuntimeFilter.
> > > > > > This information can only be known by source, so I suggest that
> > > source
> > > > > > return which fields are worthy of being pushed down.
> > > > > >
> > > > > > My overall point is:
> > > > > > This FLIP can be extended to support Source Runtime Filter
> push-down
> > > > > > for all fields, not just dynamic partition pruning.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Best,
> > > > > > Jingsong
> > > > > >
> > > > > > On Fri, Jul 8, 2022 at 10:12 PM godfrey he <go...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I would like to open a discussion on FLIP-248: Introduce
> dynamic
> > > > > > > partition pruning.
> > > > > > >
> > > > > > >  Currently, Flink supports static partition pruning: the
> > > conditions in
> > > > > > > the WHERE clause are analyzed
> > > > > > > to determine in advance which partitions can be safely skipped
> in
> > > the
> > > > > > > optimization phase.
> > > > > > > Another common scenario: the partitions information is not
> > > available
> > > > > > > in the optimization phase but in the execution phase.
> > > > > > > That's the problem this FLIP is trying to solve: dynamic
> partition
> > > > > > > pruning, which could reduce the partition table source IO.
> > > > > > >
> > > > > > > The query pattern looks like:
> > > > > > > select * from store_returns, date_dim where
> sr_returned_date_sk =
> > > > > > > d_date_sk and d_year = 2000
> > > > > > >
> > > > > > > We will introduce a mechanism for detecting dynamic partition
> > > pruning
> > > > > > > patterns in optimization phase
> > > > > > > and performing partition pruning at runtime by sending the
> > > dimension
> > > > > > > table results to the SplitEnumerator
> > > > > > > of fact table via existing coordinator mechanism.
> > > > > > >
> > > > > > > You can find more details in FLIP-248 document[1].
> > > > > > > Looking forward to your any feedback.
> > > > > > >
> > > > > > > [1]
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > > > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Godfrey
> > > > > >
> > > > >SupportsHandleExecutionAttemptSourceEvent>
> >
>

Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning

Posted by godfrey he <go...@gmail.com>.
Thanks for your confirmation.
I will start the vote.

Best,
Godfrey

Jing Zhang <be...@gmail.com> 于2022年7月26日周二 19:24写道:
>
> Hi, Godfrey
> Thanks for updating the FLIP.
> It looks good to me now.
>
> Best,
> Jing Zhang
>
> godfrey he <go...@gmail.com> 于2022年7月26日周二 12:33写道:
>
> > Thanks for all the inputs, I have updated the document and POC code.
> >
> >
> > Best,
> > Godfrey
> >
> > Yun Gao <yu...@aliyun.com.invalid> 于2022年7月26日周二 11:11写道:
> > >
> > > Hi,
> > >
> > > Thanks all for all the valuable discussion on this FLIP, +1 for
> > implementing
> > > dynamic partition pruning / dynamic filtering pushdown since it is a key
> > optimization
> > > to improve the performance on batch processing.
> > >
> > > Also due to introducing the speculative execution for the batch
> > processing, we
> > > might also need some consideration for the case with speculative
> > execution enabled:
> > > 1. The operator coordinator of DynamicFilteringDataCollector should
> > ignore the following
> > > filtering data in consider of the task might executes for multiple
> > attempts.
> > > 2. The DynamicFileSplitEnumerator should also implements the
> > `SupportsHandleExecutionAttemptSourceEvent`
> > > interface, otherwise it would throws exception when received the
> > filtering data source event.
> > >
> > > Best,
> > > Yun Gao
> > >
> > >
> > >
> > > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > >
> > >
> > >
> > > ------------------------------------------------------------------
> > > From:Jing Ge <ji...@ververica.com>
> > > Send Time:2022 Jul. 21 (Thu.) 18:56
> > > To:dev <de...@flink.apache.org>
> > > Subject:Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning
> > >
> > > Hi,
> > >
> > > Thanks for the informative discussion! Looking forward to using dynamic
> > > filtering provided by Flink.
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Tue, Jul 19, 2022 at 3:22 AM godfrey he <go...@gmail.com> wrote:
> > >
> > > > Hi, Jingong, Jark, Jing,
> > > >
> > > > Thanks for for the important inputs.
> > > > Lake storage is a very important scenario, and consider more generic
> > > > and extended case,
> > > > I also would like to use "dynamic filtering" concept instead of
> > > > "dynamic partition".
> > > >
> > > > >maybe the FLIP should also demonstrate the EXPLAIN result, which
> > > > is also an API.
> > > > I will add a section to describe the EXPLAIN result.
> > > >
> > > > >Does DPP also support streaming queries?
> > > > Yes, but for bounded source.
> > > >
> > > > >it requires the SplitEnumerator must implements new introduced
> > > > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > > > +1
> > > >
> > > > I will update the document and the poc code.
> > > >
> > > > Best,
> > > > Godfrey
> > > >
> > > > Jing Zhang <be...@gmail.com> 于2022年7月13日周三 20:22写道:
> > > > >
> > > > > Hi Godfrey,
> > > > > Thanks for driving this discussion.
> > > > > This is an important improvement for batch sql jobs.
> > > > > I agree with Jingsong to expand the capability to more than just
> > > > partitions.
> > > > > Besides, I have two points:
> > > > > 1. Based on FLIP-248[1],
> > > > >
> > > > > > Dynamic partition pruning mechanism can improve performance by
> > avoiding
> > > > > > reading large amounts of irrelevant data, and it works for both
> > batch
> > > > and
> > > > > > streaming queries.
> > > > >
> > > > > Does DPP also support streaming queries?
> > > > > It seems the proposed changes in the FLIP-248 does not work for
> > streaming
> > > > > queries,
> > > > > because the dimension table might be an unbounded inputs.
> > > > > Or does it require all dimension tables to be bounded inputs for
> > > > streaming
> > > > > jobs if the job wanna enable DPP?
> > > > >
> > > > > 2. I notice there are changes on SplitEnumerator for Hive source and
> > File
> > > > > source.
> > > > > And they now depend on SourceEvent to pass PartitionData.
> > > > > In FLIP-245, if enable speculative execution for sources based on
> > FLIP-27
> > > > > which use SourceEvent,
> > > > > it requires the SplitEnumerator must implements new introduced
> > > > > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > > > > otherwise an exception would be thrown out.
> > > > > Since hive and File sources are commonly used for batch jobs, it's
> > better
> > > > > to take this point into consideration.
> > > > >
> > > > > Best,
> > > > > Jing Zhang
> > > > >
> > > > > [1] FLIP-248:
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > > [2] FLIP-245:
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> > > > >
> > > > >
> > > > > Jark Wu <im...@gmail.com> 于2022年7月12日周二 13:16写道:
> > > > >
> > > > > > I agree with Jingsong. DPP is a particular case of Dynamic Filter
> > > > Pushdown
> > > > > > that the join key contains partition fields.  Extending this FLIP
> > to
> > > > > > general filter
> > > > > > pushdown can benefit more optimizations, and they can share the
> > same
> > > > > > interface.
> > > > > >
> > > > > > For example, Trino Hive Connector leverages dynamic filtering to
> > > > support:
> > > > > > - dynamic partition pruning for partitioned tables
> > > > > > - and dynamic bucket pruning for bucket tables
> > > > > > - and dynamic filter pushed into the ORC and Parquet readers to
> > perform
> > > > > > stripe
> > > > > >   or row-group pruning and save on disk I/O.
> > > > > >
> > > > > > Therefore, +1 to extend this FLIP to Dynamic Filter Pushdown (or
> > > > Dynamic
> > > > > > Filtering),
> > > > > > just like Trino [1].  The interfaces should also be adapted for
> > that.
> > > > > >
> > > > > > Besides, maybe the FLIP should also demonstrate the EXPLAIN result,
> > > > which
> > > > > > is also an API.
> > > > > >
> > > > > > Best,
> > > > > > Jark
> > > > > >
> > > > > > [1]: https://trino.io/docs/current/admin/dynamic-filtering.html
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, 12 Jul 2022 at 09:59, Jingsong Li <ji...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Thanks Godfrey for driving.
> > > > > > >
> > > > > > > I like this FLIP.
> > > > > > >
> > > > > > > We can restrict this capability to more than just partitions.
> > > > > > > Here are some inputs from Lake Storage.
> > > > > > >
> > > > > > > The format of the splits generated by Lake Storage is roughly as
> > > > follows:
> > > > > > > Split {
> > > > > > >    Path filePath;
> > > > > > >    Statistics[] fieldStats;
> > > > > > > }
> > > > > > >
> > > > > > > Stats contain the min and max of each column.
> > > > > > >
> > > > > > > If the storage is sorted by a column, this means that the split
> > > > > > > filtering on that column will be very good, so not only the
> > partition
> > > > > > > field, but also this column is worthy of being pushed down the
> > > > > > > RuntimeFilter.
> > > > > > > This information can only be known by source, so I suggest that
> > > > source
> > > > > > > return which fields are worthy of being pushed down.
> > > > > > >
> > > > > > > My overall point is:
> > > > > > > This FLIP can be extended to support Source Runtime Filter
> > push-down
> > > > > > > for all fields, not just dynamic partition pruning.
> > > > > > >
> > > > > > > What do you think?
> > > > > > >
> > > > > > > Best,
> > > > > > > Jingsong
> > > > > > >
> > > > > > > On Fri, Jul 8, 2022 at 10:12 PM godfrey he <go...@gmail.com>
> > > > wrote:
> > > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I would like to open a discussion on FLIP-248: Introduce
> > dynamic
> > > > > > > > partition pruning.
> > > > > > > >
> > > > > > > >  Currently, Flink supports static partition pruning: the
> > > > conditions in
> > > > > > > > the WHERE clause are analyzed
> > > > > > > > to determine in advance which partitions can be safely skipped
> > in
> > > > the
> > > > > > > > optimization phase.
> > > > > > > > Another common scenario: the partitions information is not
> > > > available
> > > > > > > > in the optimization phase but in the execution phase.
> > > > > > > > That's the problem this FLIP is trying to solve: dynamic
> > partition
> > > > > > > > pruning, which could reduce the partition table source IO.
> > > > > > > >
> > > > > > > > The query pattern looks like:
> > > > > > > > select * from store_returns, date_dim where
> > sr_returned_date_sk =
> > > > > > > > d_date_sk and d_year = 2000
> > > > > > > >
> > > > > > > > We will introduce a mechanism for detecting dynamic partition
> > > > pruning
> > > > > > > > patterns in optimization phase
> > > > > > > > and performing partition pruning at runtime by sending the
> > > > dimension
> > > > > > > > table results to the SplitEnumerator
> > > > > > > > of fact table via existing coordinator mechanism.
> > > > > > > >
> > > > > > > > You can find more details in FLIP-248 document[1].
> > > > > > > > Looking forward to your any feedback.
> > > > > > > >
> > > > > > > > [1]
> > > > > > >
> > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > > > > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Godfrey
> > > > > > >
> > > > > >SupportsHandleExecutionAttemptSourceEvent>
> > >
> >

Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning

Posted by Jing Zhang <be...@gmail.com>.
Hi, Godfrey
Thanks for updating the FLIP.
It looks good to me now.

Best,
Jing Zhang

godfrey he <go...@gmail.com> 于2022年7月26日周二 12:33写道:

> Thanks for all the inputs, I have updated the document and POC code.
>
>
> Best,
> Godfrey
>
> Yun Gao <yu...@aliyun.com.invalid> 于2022年7月26日周二 11:11写道:
> >
> > Hi,
> >
> > Thanks all for all the valuable discussion on this FLIP, +1 for
> implementing
> > dynamic partition pruning / dynamic filtering pushdown since it is a key
> optimization
> > to improve the performance on batch processing.
> >
> > Also due to introducing the speculative execution for the batch
> processing, we
> > might also need some consideration for the case with speculative
> execution enabled:
> > 1. The operator coordinator of DynamicFilteringDataCollector should
> ignore the following
> > filtering data in consider of the task might executes for multiple
> attempts.
> > 2. The DynamicFileSplitEnumerator should also implements the
> `SupportsHandleExecutionAttemptSourceEvent`
> > interface, otherwise it would throws exception when received the
> filtering data source event.
> >
> > Best,
> > Yun Gao
> >
> >
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> >
> >
> >
> > ------------------------------------------------------------------
> > From:Jing Ge <ji...@ververica.com>
> > Send Time:2022 Jul. 21 (Thu.) 18:56
> > To:dev <de...@flink.apache.org>
> > Subject:Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning
> >
> > Hi,
> >
> > Thanks for the informative discussion! Looking forward to using dynamic
> > filtering provided by Flink.
> >
> > Best regards,
> > Jing
> >
> > On Tue, Jul 19, 2022 at 3:22 AM godfrey he <go...@gmail.com> wrote:
> >
> > > Hi, Jingong, Jark, Jing,
> > >
> > > Thanks for for the important inputs.
> > > Lake storage is a very important scenario, and consider more generic
> > > and extended case,
> > > I also would like to use "dynamic filtering" concept instead of
> > > "dynamic partition".
> > >
> > > >maybe the FLIP should also demonstrate the EXPLAIN result, which
> > > is also an API.
> > > I will add a section to describe the EXPLAIN result.
> > >
> > > >Does DPP also support streaming queries?
> > > Yes, but for bounded source.
> > >
> > > >it requires the SplitEnumerator must implements new introduced
> > > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > > +1
> > >
> > > I will update the document and the poc code.
> > >
> > > Best,
> > > Godfrey
> > >
> > > Jing Zhang <be...@gmail.com> 于2022年7月13日周三 20:22写道:
> > > >
> > > > Hi Godfrey,
> > > > Thanks for driving this discussion.
> > > > This is an important improvement for batch sql jobs.
> > > > I agree with Jingsong to expand the capability to more than just
> > > partitions.
> > > > Besides, I have two points:
> > > > 1. Based on FLIP-248[1],
> > > >
> > > > > Dynamic partition pruning mechanism can improve performance by
> avoiding
> > > > > reading large amounts of irrelevant data, and it works for both
> batch
> > > and
> > > > > streaming queries.
> > > >
> > > > Does DPP also support streaming queries?
> > > > It seems the proposed changes in the FLIP-248 does not work for
> streaming
> > > > queries,
> > > > because the dimension table might be an unbounded inputs.
> > > > Or does it require all dimension tables to be bounded inputs for
> > > streaming
> > > > jobs if the job wanna enable DPP?
> > > >
> > > > 2. I notice there are changes on SplitEnumerator for Hive source and
> File
> > > > source.
> > > > And they now depend on SourceEvent to pass PartitionData.
> > > > In FLIP-245, if enable speculative execution for sources based on
> FLIP-27
> > > > which use SourceEvent,
> > > > it requires the SplitEnumerator must implements new introduced
> > > > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > > > otherwise an exception would be thrown out.
> > > > Since hive and File sources are commonly used for batch jobs, it's
> better
> > > > to take this point into consideration.
> > > >
> > > > Best,
> > > > Jing Zhang
> > > >
> > > > [1] FLIP-248:
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > [2] FLIP-245:
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> > > >
> > > >
> > > > Jark Wu <im...@gmail.com> 于2022年7月12日周二 13:16写道:
> > > >
> > > > > I agree with Jingsong. DPP is a particular case of Dynamic Filter
> > > Pushdown
> > > > > that the join key contains partition fields.  Extending this FLIP
> to
> > > > > general filter
> > > > > pushdown can benefit more optimizations, and they can share the
> same
> > > > > interface.
> > > > >
> > > > > For example, Trino Hive Connector leverages dynamic filtering to
> > > support:
> > > > > - dynamic partition pruning for partitioned tables
> > > > > - and dynamic bucket pruning for bucket tables
> > > > > - and dynamic filter pushed into the ORC and Parquet readers to
> perform
> > > > > stripe
> > > > >   or row-group pruning and save on disk I/O.
> > > > >
> > > > > Therefore, +1 to extend this FLIP to Dynamic Filter Pushdown (or
> > > Dynamic
> > > > > Filtering),
> > > > > just like Trino [1].  The interfaces should also be adapted for
> that.
> > > > >
> > > > > Besides, maybe the FLIP should also demonstrate the EXPLAIN result,
> > > which
> > > > > is also an API.
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > > [1]: https://trino.io/docs/current/admin/dynamic-filtering.html
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Tue, 12 Jul 2022 at 09:59, Jingsong Li <ji...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Thanks Godfrey for driving.
> > > > > >
> > > > > > I like this FLIP.
> > > > > >
> > > > > > We can restrict this capability to more than just partitions.
> > > > > > Here are some inputs from Lake Storage.
> > > > > >
> > > > > > The format of the splits generated by Lake Storage is roughly as
> > > follows:
> > > > > > Split {
> > > > > >    Path filePath;
> > > > > >    Statistics[] fieldStats;
> > > > > > }
> > > > > >
> > > > > > Stats contain the min and max of each column.
> > > > > >
> > > > > > If the storage is sorted by a column, this means that the split
> > > > > > filtering on that column will be very good, so not only the
> partition
> > > > > > field, but also this column is worthy of being pushed down the
> > > > > > RuntimeFilter.
> > > > > > This information can only be known by source, so I suggest that
> > > source
> > > > > > return which fields are worthy of being pushed down.
> > > > > >
> > > > > > My overall point is:
> > > > > > This FLIP can be extended to support Source Runtime Filter
> push-down
> > > > > > for all fields, not just dynamic partition pruning.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Best,
> > > > > > Jingsong
> > > > > >
> > > > > > On Fri, Jul 8, 2022 at 10:12 PM godfrey he <go...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I would like to open a discussion on FLIP-248: Introduce
> dynamic
> > > > > > > partition pruning.
> > > > > > >
> > > > > > >  Currently, Flink supports static partition pruning: the
> > > conditions in
> > > > > > > the WHERE clause are analyzed
> > > > > > > to determine in advance which partitions can be safely skipped
> in
> > > the
> > > > > > > optimization phase.
> > > > > > > Another common scenario: the partitions information is not
> > > available
> > > > > > > in the optimization phase but in the execution phase.
> > > > > > > That's the problem this FLIP is trying to solve: dynamic
> partition
> > > > > > > pruning, which could reduce the partition table source IO.
> > > > > > >
> > > > > > > The query pattern looks like:
> > > > > > > select * from store_returns, date_dim where
> sr_returned_date_sk =
> > > > > > > d_date_sk and d_year = 2000
> > > > > > >
> > > > > > > We will introduce a mechanism for detecting dynamic partition
> > > pruning
> > > > > > > patterns in optimization phase
> > > > > > > and performing partition pruning at runtime by sending the
> > > dimension
> > > > > > > table results to the SplitEnumerator
> > > > > > > of fact table via existing coordinator mechanism.
> > > > > > >
> > > > > > > You can find more details in FLIP-248 document[1].
> > > > > > > Looking forward to your any feedback.
> > > > > > >
> > > > > > > [1]
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > > > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Godfrey
> > > > > >
> > > > >SupportsHandleExecutionAttemptSourceEvent>
> >
>

Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning

Posted by godfrey he <go...@gmail.com>.
Thanks for all the inputs, I have updated the document and POC code.


Best,
Godfrey

Yun Gao <yu...@aliyun.com.invalid> 于2022年7月26日周二 11:11写道:
>
> Hi,
>
> Thanks all for all the valuable discussion on this FLIP, +1 for implementing
> dynamic partition pruning / dynamic filtering pushdown since it is a key optimization
> to improve the performance on batch processing.
>
> Also due to introducing the speculative execution for the batch processing, we
> might also need some consideration for the case with speculative execution enabled:
> 1. The operator coordinator of DynamicFilteringDataCollector should ignore the following
> filtering data in consider of the task might executes for multiple attempts.
> 2. The DynamicFileSplitEnumerator should also implements the `SupportsHandleExecutionAttemptSourceEvent`
> interface, otherwise it would throws exception when received the filtering data source event.
>
> Best,
> Yun Gao
>
>
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
>
>
>
> ------------------------------------------------------------------
> From:Jing Ge <ji...@ververica.com>
> Send Time:2022 Jul. 21 (Thu.) 18:56
> To:dev <de...@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning
>
> Hi,
>
> Thanks for the informative discussion! Looking forward to using dynamic
> filtering provided by Flink.
>
> Best regards,
> Jing
>
> On Tue, Jul 19, 2022 at 3:22 AM godfrey he <go...@gmail.com> wrote:
>
> > Hi, Jingong, Jark, Jing,
> >
> > Thanks for for the important inputs.
> > Lake storage is a very important scenario, and consider more generic
> > and extended case,
> > I also would like to use "dynamic filtering" concept instead of
> > "dynamic partition".
> >
> > >maybe the FLIP should also demonstrate the EXPLAIN result, which
> > is also an API.
> > I will add a section to describe the EXPLAIN result.
> >
> > >Does DPP also support streaming queries?
> > Yes, but for bounded source.
> >
> > >it requires the SplitEnumerator must implements new introduced
> > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > +1
> >
> > I will update the document and the poc code.
> >
> > Best,
> > Godfrey
> >
> > Jing Zhang <be...@gmail.com> 于2022年7月13日周三 20:22写道:
> > >
> > > Hi Godfrey,
> > > Thanks for driving this discussion.
> > > This is an important improvement for batch sql jobs.
> > > I agree with Jingsong to expand the capability to more than just
> > partitions.
> > > Besides, I have two points:
> > > 1. Based on FLIP-248[1],
> > >
> > > > Dynamic partition pruning mechanism can improve performance by avoiding
> > > > reading large amounts of irrelevant data, and it works for both batch
> > and
> > > > streaming queries.
> > >
> > > Does DPP also support streaming queries?
> > > It seems the proposed changes in the FLIP-248 does not work for streaming
> > > queries,
> > > because the dimension table might be an unbounded inputs.
> > > Or does it require all dimension tables to be bounded inputs for
> > streaming
> > > jobs if the job wanna enable DPP?
> > >
> > > 2. I notice there are changes on SplitEnumerator for Hive source and File
> > > source.
> > > And they now depend on SourceEvent to pass PartitionData.
> > > In FLIP-245, if enable speculative execution for sources based on FLIP-27
> > > which use SourceEvent,
> > > it requires the SplitEnumerator must implements new introduced
> > > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > > otherwise an exception would be thrown out.
> > > Since hive and File sources are commonly used for batch jobs, it's better
> > > to take this point into consideration.
> > >
> > > Best,
> > > Jing Zhang
> > >
> > > [1] FLIP-248:
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > [2] FLIP-245:
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> > >
> > >
> > > Jark Wu <im...@gmail.com> 于2022年7月12日周二 13:16写道:
> > >
> > > > I agree with Jingsong. DPP is a particular case of Dynamic Filter
> > Pushdown
> > > > that the join key contains partition fields.  Extending this FLIP to
> > > > general filter
> > > > pushdown can benefit more optimizations, and they can share the same
> > > > interface.
> > > >
> > > > For example, Trino Hive Connector leverages dynamic filtering to
> > support:
> > > > - dynamic partition pruning for partitioned tables
> > > > - and dynamic bucket pruning for bucket tables
> > > > - and dynamic filter pushed into the ORC and Parquet readers to perform
> > > > stripe
> > > >   or row-group pruning and save on disk I/O.
> > > >
> > > > Therefore, +1 to extend this FLIP to Dynamic Filter Pushdown (or
> > Dynamic
> > > > Filtering),
> > > > just like Trino [1].  The interfaces should also be adapted for that.
> > > >
> > > > Besides, maybe the FLIP should also demonstrate the EXPLAIN result,
> > which
> > > > is also an API.
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > [1]: https://trino.io/docs/current/admin/dynamic-filtering.html
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, 12 Jul 2022 at 09:59, Jingsong Li <ji...@gmail.com>
> > wrote:
> > > >
> > > > > Thanks Godfrey for driving.
> > > > >
> > > > > I like this FLIP.
> > > > >
> > > > > We can restrict this capability to more than just partitions.
> > > > > Here are some inputs from Lake Storage.
> > > > >
> > > > > The format of the splits generated by Lake Storage is roughly as
> > follows:
> > > > > Split {
> > > > >    Path filePath;
> > > > >    Statistics[] fieldStats;
> > > > > }
> > > > >
> > > > > Stats contain the min and max of each column.
> > > > >
> > > > > If the storage is sorted by a column, this means that the split
> > > > > filtering on that column will be very good, so not only the partition
> > > > > field, but also this column is worthy of being pushed down the
> > > > > RuntimeFilter.
> > > > > This information can only be known by source, so I suggest that
> > source
> > > > > return which fields are worthy of being pushed down.
> > > > >
> > > > > My overall point is:
> > > > > This FLIP can be extended to support Source Runtime Filter push-down
> > > > > for all fields, not just dynamic partition pruning.
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Best,
> > > > > Jingsong
> > > > >
> > > > > On Fri, Jul 8, 2022 at 10:12 PM godfrey he <go...@gmail.com>
> > wrote:
> > > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I would like to open a discussion on FLIP-248: Introduce dynamic
> > > > > > partition pruning.
> > > > > >
> > > > > >  Currently, Flink supports static partition pruning: the
> > conditions in
> > > > > > the WHERE clause are analyzed
> > > > > > to determine in advance which partitions can be safely skipped in
> > the
> > > > > > optimization phase.
> > > > > > Another common scenario: the partitions information is not
> > available
> > > > > > in the optimization phase but in the execution phase.
> > > > > > That's the problem this FLIP is trying to solve: dynamic partition
> > > > > > pruning, which could reduce the partition table source IO.
> > > > > >
> > > > > > The query pattern looks like:
> > > > > > select * from store_returns, date_dim where sr_returned_date_sk =
> > > > > > d_date_sk and d_year = 2000
> > > > > >
> > > > > > We will introduce a mechanism for detecting dynamic partition
> > pruning
> > > > > > patterns in optimization phase
> > > > > > and performing partition pruning at runtime by sending the
> > dimension
> > > > > > table results to the SplitEnumerator
> > > > > > of fact table via existing coordinator mechanism.
> > > > > >
> > > > > > You can find more details in FLIP-248 document[1].
> > > > > > Looking forward to your any feedback.
> > > > > >
> > > > > > [1]
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Godfrey
> > > > >
> > > >SupportsHandleExecutionAttemptSourceEvent>
>

Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Hi,

Thanks all for all the valuable discussion on this FLIP, +1 for implementing 
dynamic partition pruning / dynamic filtering pushdown since it is a key optimization 
to improve the performance on batch processing.

Also due to introducing the speculative execution for the batch processing, we
might also need some consideration for the case with speculative execution enabled:
1. The operator coordinator of DynamicFilteringDataCollector should ignore the following
filtering data in consider of the task might executes for multiple attempts.
2. The DynamicFileSplitEnumerator should also implements the `SupportsHandleExecutionAttemptSourceEvent`
interface, otherwise it would throws exception when received the filtering data source event. 

Best,
Yun Gao



[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job



------------------------------------------------------------------
From:Jing Ge <ji...@ververica.com>
Send Time:2022 Jul. 21 (Thu.) 18:56
To:dev <de...@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning

Hi,

Thanks for the informative discussion! Looking forward to using dynamic
filtering provided by Flink.

Best regards,
Jing

On Tue, Jul 19, 2022 at 3:22 AM godfrey he <go...@gmail.com> wrote:

> Hi, Jingong, Jark, Jing,
>
> Thanks for for the important inputs.
> Lake storage is a very important scenario, and consider more generic
> and extended case,
> I also would like to use "dynamic filtering" concept instead of
> "dynamic partition".
>
> >maybe the FLIP should also demonstrate the EXPLAIN result, which
> is also an API.
> I will add a section to describe the EXPLAIN result.
>
> >Does DPP also support streaming queries?
> Yes, but for bounded source.
>
> >it requires the SplitEnumerator must implements new introduced
> `SupportsHandleExecutionAttemptSourceEvent` interface,
> +1
>
> I will update the document and the poc code.
>
> Best,
> Godfrey
>
> Jing Zhang <be...@gmail.com> 于2022年7月13日周三 20:22写道:
> >
> > Hi Godfrey,
> > Thanks for driving this discussion.
> > This is an important improvement for batch sql jobs.
> > I agree with Jingsong to expand the capability to more than just
> partitions.
> > Besides, I have two points:
> > 1. Based on FLIP-248[1],
> >
> > > Dynamic partition pruning mechanism can improve performance by avoiding
> > > reading large amounts of irrelevant data, and it works for both batch
> and
> > > streaming queries.
> >
> > Does DPP also support streaming queries?
> > It seems the proposed changes in the FLIP-248 does not work for streaming
> > queries,
> > because the dimension table might be an unbounded inputs.
> > Or does it require all dimension tables to be bounded inputs for
> streaming
> > jobs if the job wanna enable DPP?
> >
> > 2. I notice there are changes on SplitEnumerator for Hive source and File
> > source.
> > And they now depend on SourceEvent to pass PartitionData.
> > In FLIP-245, if enable speculative execution for sources based on FLIP-27
> > which use SourceEvent,
> > it requires the SplitEnumerator must implements new introduced
> > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > otherwise an exception would be thrown out.
> > Since hive and File sources are commonly used for batch jobs, it's better
> > to take this point into consideration.
> >
> > Best,
> > Jing Zhang
> >
> > [1] FLIP-248:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > [2] FLIP-245:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> >
> >
> > Jark Wu <im...@gmail.com> 于2022年7月12日周二 13:16写道:
> >
> > > I agree with Jingsong. DPP is a particular case of Dynamic Filter
> Pushdown
> > > that the join key contains partition fields.  Extending this FLIP to
> > > general filter
> > > pushdown can benefit more optimizations, and they can share the same
> > > interface.
> > >
> > > For example, Trino Hive Connector leverages dynamic filtering to
> support:
> > > - dynamic partition pruning for partitioned tables
> > > - and dynamic bucket pruning for bucket tables
> > > - and dynamic filter pushed into the ORC and Parquet readers to perform
> > > stripe
> > >   or row-group pruning and save on disk I/O.
> > >
> > > Therefore, +1 to extend this FLIP to Dynamic Filter Pushdown (or
> Dynamic
> > > Filtering),
> > > just like Trino [1].  The interfaces should also be adapted for that.
> > >
> > > Besides, maybe the FLIP should also demonstrate the EXPLAIN result,
> which
> > > is also an API.
> > >
> > > Best,
> > > Jark
> > >
> > > [1]: https://trino.io/docs/current/admin/dynamic-filtering.html
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Tue, 12 Jul 2022 at 09:59, Jingsong Li <ji...@gmail.com>
> wrote:
> > >
> > > > Thanks Godfrey for driving.
> > > >
> > > > I like this FLIP.
> > > >
> > > > We can restrict this capability to more than just partitions.
> > > > Here are some inputs from Lake Storage.
> > > >
> > > > The format of the splits generated by Lake Storage is roughly as
> follows:
> > > > Split {
> > > >    Path filePath;
> > > >    Statistics[] fieldStats;
> > > > }
> > > >
> > > > Stats contain the min and max of each column.
> > > >
> > > > If the storage is sorted by a column, this means that the split
> > > > filtering on that column will be very good, so not only the partition
> > > > field, but also this column is worthy of being pushed down the
> > > > RuntimeFilter.
> > > > This information can only be known by source, so I suggest that
> source
> > > > return which fields are worthy of being pushed down.
> > > >
> > > > My overall point is:
> > > > This FLIP can be extended to support Source Runtime Filter push-down
> > > > for all fields, not just dynamic partition pruning.
> > > >
> > > > What do you think?
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Fri, Jul 8, 2022 at 10:12 PM godfrey he <go...@gmail.com>
> wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I would like to open a discussion on FLIP-248: Introduce dynamic
> > > > > partition pruning.
> > > > >
> > > > >  Currently, Flink supports static partition pruning: the
> conditions in
> > > > > the WHERE clause are analyzed
> > > > > to determine in advance which partitions can be safely skipped in
> the
> > > > > optimization phase.
> > > > > Another common scenario: the partitions information is not
> available
> > > > > in the optimization phase but in the execution phase.
> > > > > That's the problem this FLIP is trying to solve: dynamic partition
> > > > > pruning, which could reduce the partition table source IO.
> > > > >
> > > > > The query pattern looks like:
> > > > > select * from store_returns, date_dim where sr_returned_date_sk =
> > > > > d_date_sk and d_year = 2000
> > > > >
> > > > > We will introduce a mechanism for detecting dynamic partition
> pruning
> > > > > patterns in optimization phase
> > > > > and performing partition pruning at runtime by sending the
> dimension
> > > > > table results to the SplitEnumerator
> > > > > of fact table via existing coordinator mechanism.
> > > > >
> > > > > You can find more details in FLIP-248 document[1].
> > > > > Looking forward to your any feedback.
> > > > >
> > > > > [1]
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248
> > > > >
> > > > >
> > > > > Best,
> > > > > Godfrey
> > > >
> > >SupportsHandleExecutionAttemptSourceEvent>


Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning

Posted by Jing Ge <ji...@ververica.com>.
Hi,

Thanks for the informative discussion! Looking forward to using dynamic
filtering provided by Flink.

Best regards,
Jing

On Tue, Jul 19, 2022 at 3:22 AM godfrey he <go...@gmail.com> wrote:

> Hi, Jingong, Jark, Jing,
>
> Thanks for for the important inputs.
> Lake storage is a very important scenario, and consider more generic
> and extended case,
> I also would like to use "dynamic filtering" concept instead of
> "dynamic partition".
>
> >maybe the FLIP should also demonstrate the EXPLAIN result, which
> is also an API.
> I will add a section to describe the EXPLAIN result.
>
> >Does DPP also support streaming queries?
> Yes, but for bounded source.
>
> >it requires the SplitEnumerator must implements new introduced
> `SupportsHandleExecutionAttemptSourceEvent` interface,
> +1
>
> I will update the document and the poc code.
>
> Best,
> Godfrey
>
> Jing Zhang <be...@gmail.com> 于2022年7月13日周三 20:22写道:
> >
> > Hi Godfrey,
> > Thanks for driving this discussion.
> > This is an important improvement for batch sql jobs.
> > I agree with Jingsong to expand the capability to more than just
> partitions.
> > Besides, I have two points:
> > 1. Based on FLIP-248[1],
> >
> > > Dynamic partition pruning mechanism can improve performance by avoiding
> > > reading large amounts of irrelevant data, and it works for both batch
> and
> > > streaming queries.
> >
> > Does DPP also support streaming queries?
> > It seems the proposed changes in the FLIP-248 does not work for streaming
> > queries,
> > because the dimension table might be an unbounded inputs.
> > Or does it require all dimension tables to be bounded inputs for
> streaming
> > jobs if the job wanna enable DPP?
> >
> > 2. I notice there are changes on SplitEnumerator for Hive source and File
> > source.
> > And they now depend on SourceEvent to pass PartitionData.
> > In FLIP-245, if enable speculative execution for sources based on FLIP-27
> > which use SourceEvent,
> > it requires the SplitEnumerator must implements new introduced
> > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > otherwise an exception would be thrown out.
> > Since hive and File sources are commonly used for batch jobs, it's better
> > to take this point into consideration.
> >
> > Best,
> > Jing Zhang
> >
> > [1] FLIP-248:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > [2] FLIP-245:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> >
> >
> > Jark Wu <im...@gmail.com> 于2022年7月12日周二 13:16写道:
> >
> > > I agree with Jingsong. DPP is a particular case of Dynamic Filter
> Pushdown
> > > that the join key contains partition fields.  Extending this FLIP to
> > > general filter
> > > pushdown can benefit more optimizations, and they can share the same
> > > interface.
> > >
> > > For example, Trino Hive Connector leverages dynamic filtering to
> support:
> > > - dynamic partition pruning for partitioned tables
> > > - and dynamic bucket pruning for bucket tables
> > > - and dynamic filter pushed into the ORC and Parquet readers to perform
> > > stripe
> > >   or row-group pruning and save on disk I/O.
> > >
> > > Therefore, +1 to extend this FLIP to Dynamic Filter Pushdown (or
> Dynamic
> > > Filtering),
> > > just like Trino [1].  The interfaces should also be adapted for that.
> > >
> > > Besides, maybe the FLIP should also demonstrate the EXPLAIN result,
> which
> > > is also an API.
> > >
> > > Best,
> > > Jark
> > >
> > > [1]: https://trino.io/docs/current/admin/dynamic-filtering.html
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Tue, 12 Jul 2022 at 09:59, Jingsong Li <ji...@gmail.com>
> wrote:
> > >
> > > > Thanks Godfrey for driving.
> > > >
> > > > I like this FLIP.
> > > >
> > > > We can restrict this capability to more than just partitions.
> > > > Here are some inputs from Lake Storage.
> > > >
> > > > The format of the splits generated by Lake Storage is roughly as
> follows:
> > > > Split {
> > > >    Path filePath;
> > > >    Statistics[] fieldStats;
> > > > }
> > > >
> > > > Stats contain the min and max of each column.
> > > >
> > > > If the storage is sorted by a column, this means that the split
> > > > filtering on that column will be very good, so not only the partition
> > > > field, but also this column is worthy of being pushed down the
> > > > RuntimeFilter.
> > > > This information can only be known by source, so I suggest that
> source
> > > > return which fields are worthy of being pushed down.
> > > >
> > > > My overall point is:
> > > > This FLIP can be extended to support Source Runtime Filter push-down
> > > > for all fields, not just dynamic partition pruning.
> > > >
> > > > What do you think?
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Fri, Jul 8, 2022 at 10:12 PM godfrey he <go...@gmail.com>
> wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I would like to open a discussion on FLIP-248: Introduce dynamic
> > > > > partition pruning.
> > > > >
> > > > >  Currently, Flink supports static partition pruning: the
> conditions in
> > > > > the WHERE clause are analyzed
> > > > > to determine in advance which partitions can be safely skipped in
> the
> > > > > optimization phase.
> > > > > Another common scenario: the partitions information is not
> available
> > > > > in the optimization phase but in the execution phase.
> > > > > That's the problem this FLIP is trying to solve: dynamic partition
> > > > > pruning, which could reduce the partition table source IO.
> > > > >
> > > > > The query pattern looks like:
> > > > > select * from store_returns, date_dim where sr_returned_date_sk =
> > > > > d_date_sk and d_year = 2000
> > > > >
> > > > > We will introduce a mechanism for detecting dynamic partition
> pruning
> > > > > patterns in optimization phase
> > > > > and performing partition pruning at runtime by sending the
> dimension
> > > > > table results to the SplitEnumerator
> > > > > of fact table via existing coordinator mechanism.
> > > > >
> > > > > You can find more details in FLIP-248 document[1].
> > > > > Looking forward to your any feedback.
> > > > >
> > > > > [1]
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248
> > > > >
> > > > >
> > > > > Best,
> > > > > Godfrey
> > > >
> > >
>

Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning

Posted by godfrey he <go...@gmail.com>.
Hi, Jingong, Jark, Jing,

Thanks for for the important inputs.
Lake storage is a very important scenario, and consider more generic
and extended case,
I also would like to use "dynamic filtering" concept instead of
"dynamic partition".

>maybe the FLIP should also demonstrate the EXPLAIN result, which
is also an API.
I will add a section to describe the EXPLAIN result.

>Does DPP also support streaming queries?
Yes, but for bounded source.

>it requires the SplitEnumerator must implements new introduced
`SupportsHandleExecutionAttemptSourceEvent` interface,
+1

I will update the document and the poc code.

Best,
Godfrey

Jing Zhang <be...@gmail.com> 于2022年7月13日周三 20:22写道:
>
> Hi Godfrey,
> Thanks for driving this discussion.
> This is an important improvement for batch sql jobs.
> I agree with Jingsong to expand the capability to more than just partitions.
> Besides, I have two points:
> 1. Based on FLIP-248[1],
>
> > Dynamic partition pruning mechanism can improve performance by avoiding
> > reading large amounts of irrelevant data, and it works for both batch and
> > streaming queries.
>
> Does DPP also support streaming queries?
> It seems the proposed changes in the FLIP-248 does not work for streaming
> queries,
> because the dimension table might be an unbounded inputs.
> Or does it require all dimension tables to be bounded inputs for streaming
> jobs if the job wanna enable DPP?
>
> 2. I notice there are changes on SplitEnumerator for Hive source and File
> source.
> And they now depend on SourceEvent to pass PartitionData.
> In FLIP-245, if enable speculative execution for sources based on FLIP-27
> which use SourceEvent,
> it requires the SplitEnumerator must implements new introduced
> `SupportsHandleExecutionAttemptSourceEvent` interface,
> otherwise an exception would be thrown out.
> Since hive and File sources are commonly used for batch jobs, it's better
> to take this point into consideration.
>
> Best,
> Jing Zhang
>
> [1] FLIP-248:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> [2] FLIP-245:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
>
>
> Jark Wu <im...@gmail.com> 于2022年7月12日周二 13:16写道:
>
> > I agree with Jingsong. DPP is a particular case of Dynamic Filter Pushdown
> > that the join key contains partition fields.  Extending this FLIP to
> > general filter
> > pushdown can benefit more optimizations, and they can share the same
> > interface.
> >
> > For example, Trino Hive Connector leverages dynamic filtering to support:
> > - dynamic partition pruning for partitioned tables
> > - and dynamic bucket pruning for bucket tables
> > - and dynamic filter pushed into the ORC and Parquet readers to perform
> > stripe
> >   or row-group pruning and save on disk I/O.
> >
> > Therefore, +1 to extend this FLIP to Dynamic Filter Pushdown (or Dynamic
> > Filtering),
> > just like Trino [1].  The interfaces should also be adapted for that.
> >
> > Besides, maybe the FLIP should also demonstrate the EXPLAIN result, which
> > is also an API.
> >
> > Best,
> > Jark
> >
> > [1]: https://trino.io/docs/current/admin/dynamic-filtering.html
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Tue, 12 Jul 2022 at 09:59, Jingsong Li <ji...@gmail.com> wrote:
> >
> > > Thanks Godfrey for driving.
> > >
> > > I like this FLIP.
> > >
> > > We can restrict this capability to more than just partitions.
> > > Here are some inputs from Lake Storage.
> > >
> > > The format of the splits generated by Lake Storage is roughly as follows:
> > > Split {
> > >    Path filePath;
> > >    Statistics[] fieldStats;
> > > }
> > >
> > > Stats contain the min and max of each column.
> > >
> > > If the storage is sorted by a column, this means that the split
> > > filtering on that column will be very good, so not only the partition
> > > field, but also this column is worthy of being pushed down the
> > > RuntimeFilter.
> > > This information can only be known by source, so I suggest that source
> > > return which fields are worthy of being pushed down.
> > >
> > > My overall point is:
> > > This FLIP can be extended to support Source Runtime Filter push-down
> > > for all fields, not just dynamic partition pruning.
> > >
> > > What do you think?
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Fri, Jul 8, 2022 at 10:12 PM godfrey he <go...@gmail.com> wrote:
> > > >
> > > > Hi all,
> > > >
> > > > I would like to open a discussion on FLIP-248: Introduce dynamic
> > > > partition pruning.
> > > >
> > > >  Currently, Flink supports static partition pruning: the conditions in
> > > > the WHERE clause are analyzed
> > > > to determine in advance which partitions can be safely skipped in the
> > > > optimization phase.
> > > > Another common scenario: the partitions information is not available
> > > > in the optimization phase but in the execution phase.
> > > > That's the problem this FLIP is trying to solve: dynamic partition
> > > > pruning, which could reduce the partition table source IO.
> > > >
> > > > The query pattern looks like:
> > > > select * from store_returns, date_dim where sr_returned_date_sk =
> > > > d_date_sk and d_year = 2000
> > > >
> > > > We will introduce a mechanism for detecting dynamic partition pruning
> > > > patterns in optimization phase
> > > > and performing partition pruning at runtime by sending the dimension
> > > > table results to the SplitEnumerator
> > > > of fact table via existing coordinator mechanism.
> > > >
> > > > You can find more details in FLIP-248 document[1].
> > > > Looking forward to your any feedback.
> > > >
> > > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248
> > > >
> > > >
> > > > Best,
> > > > Godfrey
> > >
> >

Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning

Posted by Jing Zhang <be...@gmail.com>.
Hi Godfrey,
Thanks for driving this discussion.
This is an important improvement for batch sql jobs.
I agree with Jingsong to expand the capability to more than just partitions.
Besides, I have two points:
1. Based on FLIP-248[1],

> Dynamic partition pruning mechanism can improve performance by avoiding
> reading large amounts of irrelevant data, and it works for both batch and
> streaming queries.

Does DPP also support streaming queries?
It seems the proposed changes in the FLIP-248 does not work for streaming
queries,
because the dimension table might be an unbounded inputs.
Or does it require all dimension tables to be bounded inputs for streaming
jobs if the job wanna enable DPP?

2. I notice there are changes on SplitEnumerator for Hive source and File
source.
And they now depend on SourceEvent to pass PartitionData.
In FLIP-245, if enable speculative execution for sources based on FLIP-27
which use SourceEvent,
it requires the SplitEnumerator must implements new introduced
`SupportsHandleExecutionAttemptSourceEvent` interface,
otherwise an exception would be thrown out.
Since hive and File sources are commonly used for batch jobs, it's better
to take this point into consideration.

Best,
Jing Zhang

[1] FLIP-248:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
[2] FLIP-245:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job


Jark Wu <im...@gmail.com> 于2022年7月12日周二 13:16写道:

> I agree with Jingsong. DPP is a particular case of Dynamic Filter Pushdown
> that the join key contains partition fields.  Extending this FLIP to
> general filter
> pushdown can benefit more optimizations, and they can share the same
> interface.
>
> For example, Trino Hive Connector leverages dynamic filtering to support:
> - dynamic partition pruning for partitioned tables
> - and dynamic bucket pruning for bucket tables
> - and dynamic filter pushed into the ORC and Parquet readers to perform
> stripe
>   or row-group pruning and save on disk I/O.
>
> Therefore, +1 to extend this FLIP to Dynamic Filter Pushdown (or Dynamic
> Filtering),
> just like Trino [1].  The interfaces should also be adapted for that.
>
> Besides, maybe the FLIP should also demonstrate the EXPLAIN result, which
> is also an API.
>
> Best,
> Jark
>
> [1]: https://trino.io/docs/current/admin/dynamic-filtering.html
>
>
>
>
>
>
>
>
>
>
> On Tue, 12 Jul 2022 at 09:59, Jingsong Li <ji...@gmail.com> wrote:
>
> > Thanks Godfrey for driving.
> >
> > I like this FLIP.
> >
> > We can restrict this capability to more than just partitions.
> > Here are some inputs from Lake Storage.
> >
> > The format of the splits generated by Lake Storage is roughly as follows:
> > Split {
> >    Path filePath;
> >    Statistics[] fieldStats;
> > }
> >
> > Stats contain the min and max of each column.
> >
> > If the storage is sorted by a column, this means that the split
> > filtering on that column will be very good, so not only the partition
> > field, but also this column is worthy of being pushed down the
> > RuntimeFilter.
> > This information can only be known by source, so I suggest that source
> > return which fields are worthy of being pushed down.
> >
> > My overall point is:
> > This FLIP can be extended to support Source Runtime Filter push-down
> > for all fields, not just dynamic partition pruning.
> >
> > What do you think?
> >
> > Best,
> > Jingsong
> >
> > On Fri, Jul 8, 2022 at 10:12 PM godfrey he <go...@gmail.com> wrote:
> > >
> > > Hi all,
> > >
> > > I would like to open a discussion on FLIP-248: Introduce dynamic
> > > partition pruning.
> > >
> > >  Currently, Flink supports static partition pruning: the conditions in
> > > the WHERE clause are analyzed
> > > to determine in advance which partitions can be safely skipped in the
> > > optimization phase.
> > > Another common scenario: the partitions information is not available
> > > in the optimization phase but in the execution phase.
> > > That's the problem this FLIP is trying to solve: dynamic partition
> > > pruning, which could reduce the partition table source IO.
> > >
> > > The query pattern looks like:
> > > select * from store_returns, date_dim where sr_returned_date_sk =
> > > d_date_sk and d_year = 2000
> > >
> > > We will introduce a mechanism for detecting dynamic partition pruning
> > > patterns in optimization phase
> > > and performing partition pruning at runtime by sending the dimension
> > > table results to the SplitEnumerator
> > > of fact table via existing coordinator mechanism.
> > >
> > > You can find more details in FLIP-248 document[1].
> > > Looking forward to your any feedback.
> > >
> > > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248
> > >
> > >
> > > Best,
> > > Godfrey
> >
>

Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning

Posted by Jark Wu <im...@gmail.com>.
I agree with Jingsong. DPP is a particular case of Dynamic Filter Pushdown
that the join key contains partition fields.  Extending this FLIP to
general filter
pushdown can benefit more optimizations, and they can share the same
interface.

For example, Trino Hive Connector leverages dynamic filtering to support:
- dynamic partition pruning for partitioned tables
- and dynamic bucket pruning for bucket tables
- and dynamic filter pushed into the ORC and Parquet readers to perform
stripe
  or row-group pruning and save on disk I/O.

Therefore, +1 to extend this FLIP to Dynamic Filter Pushdown (or Dynamic
Filtering),
just like Trino [1].  The interfaces should also be adapted for that.

Besides, maybe the FLIP should also demonstrate the EXPLAIN result, which
is also an API.

Best,
Jark

[1]: https://trino.io/docs/current/admin/dynamic-filtering.html










On Tue, 12 Jul 2022 at 09:59, Jingsong Li <ji...@gmail.com> wrote:

> Thanks Godfrey for driving.
>
> I like this FLIP.
>
> We can restrict this capability to more than just partitions.
> Here are some inputs from Lake Storage.
>
> The format of the splits generated by Lake Storage is roughly as follows:
> Split {
>    Path filePath;
>    Statistics[] fieldStats;
> }
>
> Stats contain the min and max of each column.
>
> If the storage is sorted by a column, this means that the split
> filtering on that column will be very good, so not only the partition
> field, but also this column is worthy of being pushed down the
> RuntimeFilter.
> This information can only be known by source, so I suggest that source
> return which fields are worthy of being pushed down.
>
> My overall point is:
> This FLIP can be extended to support Source Runtime Filter push-down
> for all fields, not just dynamic partition pruning.
>
> What do you think?
>
> Best,
> Jingsong
>
> On Fri, Jul 8, 2022 at 10:12 PM godfrey he <go...@gmail.com> wrote:
> >
> > Hi all,
> >
> > I would like to open a discussion on FLIP-248: Introduce dynamic
> > partition pruning.
> >
> >  Currently, Flink supports static partition pruning: the conditions in
> > the WHERE clause are analyzed
> > to determine in advance which partitions can be safely skipped in the
> > optimization phase.
> > Another common scenario: the partitions information is not available
> > in the optimization phase but in the execution phase.
> > That's the problem this FLIP is trying to solve: dynamic partition
> > pruning, which could reduce the partition table source IO.
> >
> > The query pattern looks like:
> > select * from store_returns, date_dim where sr_returned_date_sk =
> > d_date_sk and d_year = 2000
> >
> > We will introduce a mechanism for detecting dynamic partition pruning
> > patterns in optimization phase
> > and performing partition pruning at runtime by sending the dimension
> > table results to the SplitEnumerator
> > of fact table via existing coordinator mechanism.
> >
> > You can find more details in FLIP-248 document[1].
> > Looking forward to your any feedback.
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248
> >
> >
> > Best,
> > Godfrey
>

Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning

Posted by Jingsong Li <ji...@gmail.com>.
Thanks Godfrey for driving.

I like this FLIP.

We can restrict this capability to more than just partitions.
Here are some inputs from Lake Storage.

The format of the splits generated by Lake Storage is roughly as follows:
Split {
   Path filePath;
   Statistics[] fieldStats;
}

Stats contain the min and max of each column.

If the storage is sorted by a column, this means that the split
filtering on that column will be very good, so not only the partition
field, but also this column is worthy of being pushed down the
RuntimeFilter.
This information can only be known by source, so I suggest that source
return which fields are worthy of being pushed down.

My overall point is:
This FLIP can be extended to support Source Runtime Filter push-down
for all fields, not just dynamic partition pruning.

What do you think?

Best,
Jingsong

On Fri, Jul 8, 2022 at 10:12 PM godfrey he <go...@gmail.com> wrote:
>
> Hi all,
>
> I would like to open a discussion on FLIP-248: Introduce dynamic
> partition pruning.
>
>  Currently, Flink supports static partition pruning: the conditions in
> the WHERE clause are analyzed
> to determine in advance which partitions can be safely skipped in the
> optimization phase.
> Another common scenario: the partitions information is not available
> in the optimization phase but in the execution phase.
> That's the problem this FLIP is trying to solve: dynamic partition
> pruning, which could reduce the partition table source IO.
>
> The query pattern looks like:
> select * from store_returns, date_dim where sr_returned_date_sk =
> d_date_sk and d_year = 2000
>
> We will introduce a mechanism for detecting dynamic partition pruning
> patterns in optimization phase
> and performing partition pruning at runtime by sending the dimension
> table results to the SplitEnumerator
> of fact table via existing coordinator mechanism.
>
> You can find more details in FLIP-248 document[1].
> Looking forward to your any feedback.
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248
>
>
> Best,
> Godfrey