You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Albert Jonathan <al...@cs.umn.edu> on 2018/07/13 15:39:42 UTC

Flink Query Optimizer

Hello,

I am just wondering, does Flink use Apache Calcite's query optimizer to
generate an optimal logical plan, or does it have its own query optimizer?
From what I observed so far, the Flink's query optimizer only groups
operator together without changing the order of aggregation operators
(e.g., join). Did I miss anything?

I am thinking of extending Flink to apply query optimization as in the
RDBMS by either integrating it with Calcite or implementing it as a new
module.
Any feedback or guidelines will be highly appreciated.

Thank you,
Albert

Re: Flink Query Optimizer

Posted by Piotr Nowojski <pi...@data-artisans.com>.
I’m not sure. As far as I know APIs are already in place for statistics support and only missing part is actual statistic provider. With this respect it batch and streaming might be almost completely independent of one another, so it shouldn’t be matter of “first that then that” but whatever will be higher on someone’s priority list and/or what will be easier.

Probably low effort, crude but generic solution would be to allow user statically configure table sizes via environment file/table api/SET SESSION VALUE in SQL client. Better solutions would require custom logic per each connector.

Piotrek  

> On 15 Jul 2018, at 17:28, Rong Rong <wa...@gmail.com> wrote:
> 
> +1. Having table statistics is one of the main blockers for more advanced
> optimization rules. I would love to contribute to this effort!
> 
> However I think @Alberts case is more on the data set side. Was there any
> plan to integrate with data set table statistics first then extend to data
> stream domain?
> 
> --
> Rong
> 
> On Sun, Jul 15, 2018 at 7:21 AM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
> 
>> Hi,
>> 
>> Currently the biggest limitation that prevents better query optimisation
>> is lack of table statistics (which are not trivial to provide in
>> streaming), thus Joins/Aggregation reordering doesn’t work. We have some
>> ideas how to tackle this issue and definitely at some point of time we will
>> improve this.
>> 
>> Piotrek
>> 
>>> On 14 Jul 2018, at 06:48, Xingcan Cui <xi...@gmail.com> wrote:
>>> 
>>> Hi Albert,
>>> 
>>> Calcite provides a rule-based optimizer (as a framework), which means
>> users can customize it by adding rules. That’s exactly what Flink did. From
>> the logical plan to the physical plan, the translations are triggered by
>> different sets of rules, according to which the relational expressions are
>> replaced, reordered or optimized.
>>> 
>>> However, IMO, the current optimization rules in Flink Table API are
>> quite primal. Some SQL statements (e.g., multiple joins) are just
>> translated to feasible execution plans, instead of optimized ones, since
>> it’s much more difficult to conduct query optimization on large datasets or
>> dynamic streams. You could first start from the Calcite query optimizer,
>> and then try to make your own rules.
>>> 
>>> Best,
>>> Xingcan
>>> 
>>>> On Jul 14, 2018, at 11:55 AM, vino yang <ya...@gmail.com> wrote:
>>>> 
>>>> Hi Albert,
>>>> 
>>>> First I guess the query optimizer you mentioned is about Flink table &
>> sql
>>>> (for batch API there is another optimizer which is implemented by
>> Flink).
>>>> 
>>>> Yes, now for table & sql, Flink use Apache Calcite's query optimizer to
>>>> translate into a Calcite plan
>>>> which is then optimized according to Calcite's optimization rules.
>>>> 
>>>> The following rules are applied so far:
>>>> 
>> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
>>>> 
>>>> In view of Flink depends on the Calcite to do the optimization, I think
>>>> enhance Flink and Calcite would be the right direction.
>>>> 
>>>> Hope for you provide more idea and details. Flink community welcome your
>>>> idea and contribution.
>>>> 
>>>> Thanks.
>>>> Vino.
>>>> 
>>>> 
>>>> 2018-07-13 23:39 GMT+08:00 Albert Jonathan <al...@cs.umn.edu>:
>>>> 
>>>>> Hello,
>>>>> 
>>>>> I am just wondering, does Flink use Apache Calcite's query optimizer to
>>>>> generate an optimal logical plan, or does it have its own query
>> optimizer?
>>>>> From what I observed so far, the Flink's query optimizer only groups
>>>>> operator together without changing the order of aggregation operators
>>>>> (e.g., join). Did I miss anything?
>>>>> 
>>>>> I am thinking of extending Flink to apply query optimization as in the
>>>>> RDBMS by either integrating it with Calcite or implementing it as a new
>>>>> module.
>>>>> Any feedback or guidelines will be highly appreciated.
>>>>> 
>>>>> Thank you,
>>>>> Albert
>>>>> 
>>> 
>> 
>> 


Re: Flink Query Optimizer

Posted by Rong Rong <wa...@gmail.com>.
+1. Having table statistics is one of the main blockers for more advanced
optimization rules. I would love to contribute to this effort!

However I think @Alberts case is more on the data set side. Was there any
plan to integrate with data set table statistics first then extend to data
stream domain?

--
Rong

On Sun, Jul 15, 2018 at 7:21 AM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Currently the biggest limitation that prevents better query optimisation
> is lack of table statistics (which are not trivial to provide in
> streaming), thus Joins/Aggregation reordering doesn’t work. We have some
> ideas how to tackle this issue and definitely at some point of time we will
> improve this.
>
> Piotrek
>
> > On 14 Jul 2018, at 06:48, Xingcan Cui <xi...@gmail.com> wrote:
> >
> > Hi Albert,
> >
> > Calcite provides a rule-based optimizer (as a framework), which means
> users can customize it by adding rules. That’s exactly what Flink did. From
> the logical plan to the physical plan, the translations are triggered by
> different sets of rules, according to which the relational expressions are
> replaced, reordered or optimized.
> >
> > However, IMO, the current optimization rules in Flink Table API are
> quite primal. Some SQL statements (e.g., multiple joins) are just
> translated to feasible execution plans, instead of optimized ones, since
> it’s much more difficult to conduct query optimization on large datasets or
> dynamic streams. You could first start from the Calcite query optimizer,
> and then try to make your own rules.
> >
> > Best,
> > Xingcan
> >
> >> On Jul 14, 2018, at 11:55 AM, vino yang <ya...@gmail.com> wrote:
> >>
> >> Hi Albert,
> >>
> >> First I guess the query optimizer you mentioned is about Flink table &
> sql
> >> (for batch API there is another optimizer which is implemented by
> Flink).
> >>
> >> Yes, now for table & sql, Flink use Apache Calcite's query optimizer to
> >> translate into a Calcite plan
> >> which is then optimized according to Calcite's optimization rules.
> >>
> >> The following rules are applied so far:
> >>
> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
> >>
> >> In view of Flink depends on the Calcite to do the optimization, I think
> >> enhance Flink and Calcite would be the right direction.
> >>
> >> Hope for you provide more idea and details. Flink community welcome your
> >> idea and contribution.
> >>
> >> Thanks.
> >> Vino.
> >>
> >>
> >> 2018-07-13 23:39 GMT+08:00 Albert Jonathan <al...@cs.umn.edu>:
> >>
> >>> Hello,
> >>>
> >>> I am just wondering, does Flink use Apache Calcite's query optimizer to
> >>> generate an optimal logical plan, or does it have its own query
> optimizer?
> >>> From what I observed so far, the Flink's query optimizer only groups
> >>> operator together without changing the order of aggregation operators
> >>> (e.g., join). Did I miss anything?
> >>>
> >>> I am thinking of extending Flink to apply query optimization as in the
> >>> RDBMS by either integrating it with Calcite or implementing it as a new
> >>> module.
> >>> Any feedback or guidelines will be highly appreciated.
> >>>
> >>> Thank you,
> >>> Albert
> >>>
> >
>
>

Re: Flink Query Optimizer

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Currently the biggest limitation that prevents better query optimisation is lack of table statistics (which are not trivial to provide in streaming), thus Joins/Aggregation reordering doesn’t work. We have some ideas how to tackle this issue and definitely at some point of time we will improve this.

Piotrek

> On 14 Jul 2018, at 06:48, Xingcan Cui <xi...@gmail.com> wrote:
> 
> Hi Albert,
> 
> Calcite provides a rule-based optimizer (as a framework), which means users can customize it by adding rules. That’s exactly what Flink did. From the logical plan to the physical plan, the translations are triggered by different sets of rules, according to which the relational expressions are replaced, reordered or optimized.
> 
> However, IMO, the current optimization rules in Flink Table API are quite primal. Some SQL statements (e.g., multiple joins) are just translated to feasible execution plans, instead of optimized ones, since it’s much more difficult to conduct query optimization on large datasets or dynamic streams. You could first start from the Calcite query optimizer, and then try to make your own rules.
> 
> Best,
> Xingcan
> 
>> On Jul 14, 2018, at 11:55 AM, vino yang <ya...@gmail.com> wrote:
>> 
>> Hi Albert,
>> 
>> First I guess the query optimizer you mentioned is about Flink table & sql
>> (for batch API there is another optimizer which is implemented by Flink).
>> 
>> Yes, now for table & sql, Flink use Apache Calcite's query optimizer to
>> translate into a Calcite plan
>> which is then optimized according to Calcite's optimization rules.
>> 
>> The following rules are applied so far:
>> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
>> 
>> In view of Flink depends on the Calcite to do the optimization, I think
>> enhance Flink and Calcite would be the right direction.
>> 
>> Hope for you provide more idea and details. Flink community welcome your
>> idea and contribution.
>> 
>> Thanks.
>> Vino.
>> 
>> 
>> 2018-07-13 23:39 GMT+08:00 Albert Jonathan <al...@cs.umn.edu>:
>> 
>>> Hello,
>>> 
>>> I am just wondering, does Flink use Apache Calcite's query optimizer to
>>> generate an optimal logical plan, or does it have its own query optimizer?
>>> From what I observed so far, the Flink's query optimizer only groups
>>> operator together without changing the order of aggregation operators
>>> (e.g., join). Did I miss anything?
>>> 
>>> I am thinking of extending Flink to apply query optimization as in the
>>> RDBMS by either integrating it with Calcite or implementing it as a new
>>> module.
>>> Any feedback or guidelines will be highly appreciated.
>>> 
>>> Thank you,
>>> Albert
>>> 
> 


Re: Flink Query Optimizer

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Albert,

Calcite provides a rule-based optimizer (as a framework), which means users can customize it by adding rules. That’s exactly what Flink did. From the logical plan to the physical plan, the translations are triggered by different sets of rules, according to which the relational expressions are replaced, reordered or optimized.

However, IMO, the current optimization rules in Flink Table API are quite primal. Some SQL statements (e.g., multiple joins) are just translated to feasible execution plans, instead of optimized ones, since it’s much more difficult to conduct query optimization on large datasets or dynamic streams. You could first start from the Calcite query optimizer, and then try to make your own rules.

Best,
Xingcan

> On Jul 14, 2018, at 11:55 AM, vino yang <ya...@gmail.com> wrote:
> 
> Hi Albert,
> 
> First I guess the query optimizer you mentioned is about Flink table & sql
> (for batch API there is another optimizer which is implemented by Flink).
> 
> Yes, now for table & sql, Flink use Apache Calcite's query optimizer to
> translate into a Calcite plan
> which is then optimized according to Calcite's optimization rules.
> 
> The following rules are applied so far:
> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
> 
> In view of Flink depends on the Calcite to do the optimization, I think
> enhance Flink and Calcite would be the right direction.
> 
> Hope for you provide more idea and details. Flink community welcome your
> idea and contribution.
> 
> Thanks.
> Vino.
> 
> 
> 2018-07-13 23:39 GMT+08:00 Albert Jonathan <al...@cs.umn.edu>:
> 
>> Hello,
>> 
>> I am just wondering, does Flink use Apache Calcite's query optimizer to
>> generate an optimal logical plan, or does it have its own query optimizer?
>> From what I observed so far, the Flink's query optimizer only groups
>> operator together without changing the order of aggregation operators
>> (e.g., join). Did I miss anything?
>> 
>> I am thinking of extending Flink to apply query optimization as in the
>> RDBMS by either integrating it with Calcite or implementing it as a new
>> module.
>> Any feedback or guidelines will be highly appreciated.
>> 
>> Thank you,
>> Albert
>> 


Re: Flink Query Optimizer

Posted by vino yang <ya...@gmail.com>.
Hi Albert,

First I guess the query optimizer you mentioned is about Flink table & sql
(for batch API there is another optimizer which is implemented by Flink).

Yes, now for table & sql, Flink use Apache Calcite's query optimizer to
translate into a Calcite plan
which is then optimized according to Calcite's optimization rules.

The following rules are applied so far:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala

In view of Flink depends on the Calcite to do the optimization, I think
enhance Flink and Calcite would be the right direction.

Hope for you provide more idea and details. Flink community welcome your
idea and contribution.

Thanks.
Vino.


2018-07-13 23:39 GMT+08:00 Albert Jonathan <al...@cs.umn.edu>:

> Hello,
>
> I am just wondering, does Flink use Apache Calcite's query optimizer to
> generate an optimal logical plan, or does it have its own query optimizer?
> From what I observed so far, the Flink's query optimizer only groups
> operator together without changing the order of aggregation operators
> (e.g., join). Did I miss anything?
>
> I am thinking of extending Flink to apply query optimization as in the
> RDBMS by either integrating it with Calcite or implementing it as a new
> module.
> Any feedback or guidelines will be highly appreciated.
>
> Thank you,
> Albert
>