You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Felipe Gutierrez <fe...@gmail.com> on 2019/07/08 15:13:28 UTC

Add Logical filter on a query plan from Flink Table API

Hi,

I am a newbie in Apache Calcite. I am trying to use it with Apache Flink.
To start I am trying to create a HelloWorld which just add a logical filter
on my query.
1 - I have my Flink app using Table API [1].
2 - I have created my Calcite filter rule which is applied to my FLink
query if I use CalciteConfig cc = new
CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build()
[2];
3 - The debug thread only goes to my rule if there is a filter on my query.

I would like to create a logical filter if there is no filter set on the
logical query. How should I implement it?
I see my LogicalFilter been created when I call "tableEnv.explain()"
method. I suppose that I can add some logical filters on the plan.

== Abstract Syntax Tree ==
LogicalFilter(condition=[>=($6, 50)])
  LogicalTableScan(table=[[TicketsStation01Plat01]])

== Optimized Logical Plan ==
DataStreamCalc(select=[sensorId, sensorType, platformId, platformType,
stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
  StreamTableSourceScan(table=[[TicketsStation01Plat01]], fields=[sensorId,
sensorType, platformId, platformType, stationId, timestamp, value, trip,
eventTime], source=[SensorTuples])

== Physical Execution Plan ==
....

Thanks,
Felipe

[1]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
[2]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Re: Add Logical filter on a query plan from Flink Table API

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Felipe,

Yes, we are short of such tutorials. Probably you can take a look at the
code of Flink-9713[1](checking the changelog in IDE is more convenient).
The code shows how to create a logical node and how to use a rule to
convert it into a FlinkLogicalRel and then convert into a DataStream
RelNode.
Hope this helps.

[1] https://github.com/apache/flink/pull/6299/files

On Tue, Jul 9, 2019 at 9:37 PM Felipe Gutierrez <
felipe.o.gutierrez@gmail.com> wrote:

> Hi Hequn,
>
> it has been very hard to find even a very small tutorial of how to create
> my on rule in Calcite+Flink. What I did was copy a Calcite rule to my
> project and try to understand it. I am working with the FilterJoinRule [1]
> which is one rule the Flink is modifying it. In the end I want to create a
> rule for Join operators that allow me to choose between different
> implementations of Join algorithms (nested-loop, sort-merge, hash).
>
> If you have any step-by-step on understanding the "RelOptRuleCall"
> parameter would be very nice =). but I guess I have to keep digging into
> the code...
>
> Thanks anyway!
> [1]
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/FilterJoinRule.java
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Tue, Jul 9, 2019 at 2:10 PM Hequn Cheng <ch...@gmail.com> wrote:
>
>> Hi Felipe,
>>
>> > what is the relation of RelFactories [1] when I use it to create the
>> INSTANCE of my rule?
>> The `RelFactories.LOGICAL_BUILDER` can be used during the rule
>> transformation, i.e., the `RelFactories.LOGICAL_BUILDER` is a
>> `RelBuilderFactory` which contains a `create` method can be used to create
>> a `RelBuilder`. The `RelBuilder`[1] is used to create relational
>> expressions.
>>
>> Maybe we can also post the question in the Calcite mailing list. They may
>> give more details. :-)
>>
>> Best,
>> Hequn
>>
>> [1]
>> https://github.com/hequn8128/calcite/blob/master/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
>>
>>
>> On Tue, Jul 9, 2019 at 4:06 PM Felipe Gutierrez <
>> felipe.o.gutierrez@gmail.com> wrote:
>>
>>> Hi Hequn,
>>>
>>> what is the relation of RelFactories [1] when I use it to create the
>>> INSTANCE of my rule? For example:
>>>
>>> public static final MyFilterRule INSTANCE = new
>>> MyFilterRule(Filter.class, RelFactories.LOGICAL_BUILDER);
>>>
>>> then I create a CalciteCOnfigBuilder using "new
>>> CalciteConfigBuilder().addLogicalOptRuleSet(), .addNormRuleSet(),
>>> .addPhysicalOptRuleSet()".
>>>
>>> [1]
>>> https://calcite.apache.org/apidocs/org/apache/calcite/rel/core/RelFactories.html#LOGICAL_BUILDER
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Tue, Jul 9, 2019 at 5:06 AM Hequn Cheng <ch...@gmail.com> wrote:
>>>
>>>> Hi Felipe,
>>>>
>>>> > I would like to create a logical filter if there is no filter set on
>>>> the logical query. How should I implement it?
>>>> Do you mean you want to add a LogicalFilter node if the query even
>>>> doesn't contain filter? Logically, this can be done through a rule.
>>>> However, it sounds a little hack and you have to pay attention to semantic
>>>> problems. One thing you have to notice is that you can't change the RowType
>>>> when you perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should
>>>> contain the same row type with NodeA.
>>>> There are a lot of rules in Flink which I think is a good example for
>>>> you. You can find these rules in the class of `FlinkRuleSets`.
>>>>
>>>> > I see my LogicalFilter been created when I call "tableEnv.explain()"
>>>> method. I suppose that I can add some logical filters on the plan.
>>>> The `LogicalFilter` and `DataStreamCalc` is not created by your Filter
>>>> rule. If you remove your filter rule, there is nothing change for the plan.
>>>>
>>>> Best, Hequn
>>>>
>>>> On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <
>>>> felipe.o.gutierrez@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am a newbie in Apache Calcite. I am trying to use it with Apache
>>>>> Flink. To start I am trying to create a HelloWorld which just add a logical
>>>>> filter on my query.
>>>>> 1 - I have my Flink app using Table API [1].
>>>>> 2 - I have created my Calcite filter rule which is applied to my FLink
>>>>> query if I use CalciteConfig cc = new
>>>>> CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build()
>>>>> [2];
>>>>> 3 - The debug thread only goes to my rule if there is a filter on my
>>>>> query.
>>>>>
>>>>> I would like to create a logical filter if there is no filter set on
>>>>> the logical query. How should I implement it?
>>>>> I see my LogicalFilter been created when I call "tableEnv.explain()"
>>>>> method. I suppose that I can add some logical filters on the plan.
>>>>>
>>>>> == Abstract Syntax Tree ==
>>>>> LogicalFilter(condition=[>=($6, 50)])
>>>>>   LogicalTableScan(table=[[TicketsStation01Plat01]])
>>>>>
>>>>> == Optimized Logical Plan ==
>>>>> DataStreamCalc(select=[sensorId, sensorType, platformId, platformType,
>>>>> stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
>>>>>   StreamTableSourceScan(table=[[TicketsStation01Plat01]],
>>>>> fields=[sensorId, sensorType, platformId, platformType, stationId,
>>>>> timestamp, value, trip, eventTime], source=[SensorTuples])
>>>>>
>>>>> == Physical Execution Plan ==
>>>>> ....
>>>>>
>>>>> Thanks,
>>>>> Felipe
>>>>>
>>>>> [1]
>>>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
>>>>> [2]
>>>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
>>>>> *--*
>>>>> *-- Felipe Gutierrez*
>>>>>
>>>>> *-- skype: felipe.o.gutierrez*
>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>
>>>>

Re: Add Logical filter on a query plan from Flink Table API

Posted by Felipe Gutierrez <fe...@gmail.com>.
Hi Hequn,

it has been very hard to find even a very small tutorial of how to create
my on rule in Calcite+Flink. What I did was copy a Calcite rule to my
project and try to understand it. I am working with the FilterJoinRule [1]
which is one rule the Flink is modifying it. In the end I want to create a
rule for Join operators that allow me to choose between different
implementations of Join algorithms (nested-loop, sort-merge, hash).

If you have any step-by-step on understanding the "RelOptRuleCall"
parameter would be very nice =). but I guess I have to keep digging into
the code...

Thanks anyway!
[1]
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/FilterJoinRule.java
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Tue, Jul 9, 2019 at 2:10 PM Hequn Cheng <ch...@gmail.com> wrote:

> Hi Felipe,
>
> > what is the relation of RelFactories [1] when I use it to create the
> INSTANCE of my rule?
> The `RelFactories.LOGICAL_BUILDER` can be used during the rule
> transformation, i.e., the `RelFactories.LOGICAL_BUILDER` is a
> `RelBuilderFactory` which contains a `create` method can be used to create
> a `RelBuilder`. The `RelBuilder`[1] is used to create relational
> expressions.
>
> Maybe we can also post the question in the Calcite mailing list. They may
> give more details. :-)
>
> Best,
> Hequn
>
> [1]
> https://github.com/hequn8128/calcite/blob/master/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
>
>
> On Tue, Jul 9, 2019 at 4:06 PM Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
>
>> Hi Hequn,
>>
>> what is the relation of RelFactories [1] when I use it to create the
>> INSTANCE of my rule? For example:
>>
>> public static final MyFilterRule INSTANCE = new
>> MyFilterRule(Filter.class, RelFactories.LOGICAL_BUILDER);
>>
>> then I create a CalciteCOnfigBuilder using "new
>> CalciteConfigBuilder().addLogicalOptRuleSet(), .addNormRuleSet(),
>> .addPhysicalOptRuleSet()".
>>
>> [1]
>> https://calcite.apache.org/apidocs/org/apache/calcite/rel/core/RelFactories.html#LOGICAL_BUILDER
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Tue, Jul 9, 2019 at 5:06 AM Hequn Cheng <ch...@gmail.com> wrote:
>>
>>> Hi Felipe,
>>>
>>> > I would like to create a logical filter if there is no filter set on
>>> the logical query. How should I implement it?
>>> Do you mean you want to add a LogicalFilter node if the query even
>>> doesn't contain filter? Logically, this can be done through a rule.
>>> However, it sounds a little hack and you have to pay attention to semantic
>>> problems. One thing you have to notice is that you can't change the RowType
>>> when you perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should
>>> contain the same row type with NodeA.
>>> There are a lot of rules in Flink which I think is a good example for
>>> you. You can find these rules in the class of `FlinkRuleSets`.
>>>
>>> > I see my LogicalFilter been created when I call "tableEnv.explain()"
>>> method. I suppose that I can add some logical filters on the plan.
>>> The `LogicalFilter` and `DataStreamCalc` is not created by your Filter
>>> rule. If you remove your filter rule, there is nothing change for the plan.
>>>
>>> Best, Hequn
>>>
>>> On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <
>>> felipe.o.gutierrez@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am a newbie in Apache Calcite. I am trying to use it with Apache
>>>> Flink. To start I am trying to create a HelloWorld which just add a logical
>>>> filter on my query.
>>>> 1 - I have my Flink app using Table API [1].
>>>> 2 - I have created my Calcite filter rule which is applied to my FLink
>>>> query if I use CalciteConfig cc = new
>>>> CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build()
>>>> [2];
>>>> 3 - The debug thread only goes to my rule if there is a filter on my
>>>> query.
>>>>
>>>> I would like to create a logical filter if there is no filter set on
>>>> the logical query. How should I implement it?
>>>> I see my LogicalFilter been created when I call "tableEnv.explain()"
>>>> method. I suppose that I can add some logical filters on the plan.
>>>>
>>>> == Abstract Syntax Tree ==
>>>> LogicalFilter(condition=[>=($6, 50)])
>>>>   LogicalTableScan(table=[[TicketsStation01Plat01]])
>>>>
>>>> == Optimized Logical Plan ==
>>>> DataStreamCalc(select=[sensorId, sensorType, platformId, platformType,
>>>> stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
>>>>   StreamTableSourceScan(table=[[TicketsStation01Plat01]],
>>>> fields=[sensorId, sensorType, platformId, platformType, stationId,
>>>> timestamp, value, trip, eventTime], source=[SensorTuples])
>>>>
>>>> == Physical Execution Plan ==
>>>> ....
>>>>
>>>> Thanks,
>>>> Felipe
>>>>
>>>> [1]
>>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
>>>> [2]
>>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
>>>> *--*
>>>> *-- Felipe Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>

Re: Add Logical filter on a query plan from Flink Table API

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Felipe,

> what is the relation of RelFactories [1] when I use it to create the
INSTANCE of my rule?
The `RelFactories.LOGICAL_BUILDER` can be used during the rule
transformation, i.e., the `RelFactories.LOGICAL_BUILDER` is a
`RelBuilderFactory` which contains a `create` method can be used to create
a `RelBuilder`. The `RelBuilder`[1] is used to create relational
expressions.

Maybe we can also post the question in the Calcite mailing list. They may
give more details. :-)

Best,
Hequn

[1]
https://github.com/hequn8128/calcite/blob/master/core/src/main/java/org/apache/calcite/tools/RelBuilder.java


On Tue, Jul 9, 2019 at 4:06 PM Felipe Gutierrez <
felipe.o.gutierrez@gmail.com> wrote:

> Hi Hequn,
>
> what is the relation of RelFactories [1] when I use it to create the
> INSTANCE of my rule? For example:
>
> public static final MyFilterRule INSTANCE = new MyFilterRule(Filter.class,
> RelFactories.LOGICAL_BUILDER);
>
> then I create a CalciteCOnfigBuilder using "new
> CalciteConfigBuilder().addLogicalOptRuleSet(), .addNormRuleSet(),
> .addPhysicalOptRuleSet()".
>
> [1]
> https://calcite.apache.org/apidocs/org/apache/calcite/rel/core/RelFactories.html#LOGICAL_BUILDER
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Tue, Jul 9, 2019 at 5:06 AM Hequn Cheng <ch...@gmail.com> wrote:
>
>> Hi Felipe,
>>
>> > I would like to create a logical filter if there is no filter set on
>> the logical query. How should I implement it?
>> Do you mean you want to add a LogicalFilter node if the query even
>> doesn't contain filter? Logically, this can be done through a rule.
>> However, it sounds a little hack and you have to pay attention to semantic
>> problems. One thing you have to notice is that you can't change the RowType
>> when you perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should
>> contain the same row type with NodeA.
>> There are a lot of rules in Flink which I think is a good example for
>> you. You can find these rules in the class of `FlinkRuleSets`.
>>
>> > I see my LogicalFilter been created when I call "tableEnv.explain()"
>> method. I suppose that I can add some logical filters on the plan.
>> The `LogicalFilter` and `DataStreamCalc` is not created by your Filter
>> rule. If you remove your filter rule, there is nothing change for the plan.
>>
>> Best, Hequn
>>
>> On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <
>> felipe.o.gutierrez@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am a newbie in Apache Calcite. I am trying to use it with Apache
>>> Flink. To start I am trying to create a HelloWorld which just add a logical
>>> filter on my query.
>>> 1 - I have my Flink app using Table API [1].
>>> 2 - I have created my Calcite filter rule which is applied to my FLink
>>> query if I use CalciteConfig cc = new
>>> CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build()
>>> [2];
>>> 3 - The debug thread only goes to my rule if there is a filter on my
>>> query.
>>>
>>> I would like to create a logical filter if there is no filter set on the
>>> logical query. How should I implement it?
>>> I see my LogicalFilter been created when I call "tableEnv.explain()"
>>> method. I suppose that I can add some logical filters on the plan.
>>>
>>> == Abstract Syntax Tree ==
>>> LogicalFilter(condition=[>=($6, 50)])
>>>   LogicalTableScan(table=[[TicketsStation01Plat01]])
>>>
>>> == Optimized Logical Plan ==
>>> DataStreamCalc(select=[sensorId, sensorType, platformId, platformType,
>>> stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
>>>   StreamTableSourceScan(table=[[TicketsStation01Plat01]],
>>> fields=[sensorId, sensorType, platformId, platformType, stationId,
>>> timestamp, value, trip, eventTime], source=[SensorTuples])
>>>
>>> == Physical Execution Plan ==
>>> ....
>>>
>>> Thanks,
>>> Felipe
>>>
>>> [1]
>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
>>> [2]
>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>

Re: Add Logical filter on a query plan from Flink Table API

Posted by Felipe Gutierrez <fe...@gmail.com>.
Hi Hequn,

what is the relation of RelFactories [1] when I use it to create the
INSTANCE of my rule? For example:

public static final MyFilterRule INSTANCE = new MyFilterRule(Filter.class,
RelFactories.LOGICAL_BUILDER);

then I create a CalciteCOnfigBuilder using "new
CalciteConfigBuilder().addLogicalOptRuleSet(), .addNormRuleSet(),
.addPhysicalOptRuleSet()".

[1]
https://calcite.apache.org/apidocs/org/apache/calcite/rel/core/RelFactories.html#LOGICAL_BUILDER
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Tue, Jul 9, 2019 at 5:06 AM Hequn Cheng <ch...@gmail.com> wrote:

> Hi Felipe,
>
> > I would like to create a logical filter if there is no filter set on the
> logical query. How should I implement it?
> Do you mean you want to add a LogicalFilter node if the query even doesn't
> contain filter? Logically, this can be done through a rule. However, it
> sounds a little hack and you have to pay attention to semantic problems.
> One thing you have to notice is that you can't change the RowType when you
> perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should contain
> the same row type with NodeA.
> There are a lot of rules in Flink which I think is a good example for you.
> You can find these rules in the class of `FlinkRuleSets`.
>
> > I see my LogicalFilter been created when I call "tableEnv.explain()"
> method. I suppose that I can add some logical filters on the plan.
> The `LogicalFilter` and `DataStreamCalc` is not created by your Filter
> rule. If you remove your filter rule, there is nothing change for the plan.
>
> Best, Hequn
>
> On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
>
>> Hi,
>>
>> I am a newbie in Apache Calcite. I am trying to use it with Apache Flink.
>> To start I am trying to create a HelloWorld which just add a logical filter
>> on my query.
>> 1 - I have my Flink app using Table API [1].
>> 2 - I have created my Calcite filter rule which is applied to my FLink
>> query if I use CalciteConfig cc = new
>> CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build()
>> [2];
>> 3 - The debug thread only goes to my rule if there is a filter on my
>> query.
>>
>> I would like to create a logical filter if there is no filter set on the
>> logical query. How should I implement it?
>> I see my LogicalFilter been created when I call "tableEnv.explain()"
>> method. I suppose that I can add some logical filters on the plan.
>>
>> == Abstract Syntax Tree ==
>> LogicalFilter(condition=[>=($6, 50)])
>>   LogicalTableScan(table=[[TicketsStation01Plat01]])
>>
>> == Optimized Logical Plan ==
>> DataStreamCalc(select=[sensorId, sensorType, platformId, platformType,
>> stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
>>   StreamTableSourceScan(table=[[TicketsStation01Plat01]],
>> fields=[sensorId, sensorType, platformId, platformType, stationId,
>> timestamp, value, trip, eventTime], source=[SensorTuples])
>>
>> == Physical Execution Plan ==
>> ....
>>
>> Thanks,
>> Felipe
>>
>> [1]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
>> [2]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>

Re: Add Logical filter on a query plan from Flink Table API

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Felipe,

> I would like to create a logical filter if there is no filter set on the
logical query. How should I implement it?
Do you mean you want to add a LogicalFilter node if the query even doesn't
contain filter? Logically, this can be done through a rule. However, it
sounds a little hack and you have to pay attention to semantic problems.
One thing you have to notice is that you can't change the RowType when you
perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should contain
the same row type with NodeA.
There are a lot of rules in Flink which I think is a good example for you.
You can find these rules in the class of `FlinkRuleSets`.

> I see my LogicalFilter been created when I call "tableEnv.explain()"
method. I suppose that I can add some logical filters on the plan.
The `LogicalFilter` and `DataStreamCalc` is not created by your Filter
rule. If you remove your filter rule, there is nothing change for the plan.

Best, Hequn

On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <
felipe.o.gutierrez@gmail.com> wrote:

> Hi,
>
> I am a newbie in Apache Calcite. I am trying to use it with Apache Flink.
> To start I am trying to create a HelloWorld which just add a logical filter
> on my query.
> 1 - I have my Flink app using Table API [1].
> 2 - I have created my Calcite filter rule which is applied to my FLink
> query if I use CalciteConfig cc = new
> CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build()
> [2];
> 3 - The debug thread only goes to my rule if there is a filter on my query.
>
> I would like to create a logical filter if there is no filter set on the
> logical query. How should I implement it?
> I see my LogicalFilter been created when I call "tableEnv.explain()"
> method. I suppose that I can add some logical filters on the plan.
>
> == Abstract Syntax Tree ==
> LogicalFilter(condition=[>=($6, 50)])
>   LogicalTableScan(table=[[TicketsStation01Plat01]])
>
> == Optimized Logical Plan ==
> DataStreamCalc(select=[sensorId, sensorType, platformId, platformType,
> stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
>   StreamTableSourceScan(table=[[TicketsStation01Plat01]],
> fields=[sensorId, sensorType, platformId, platformType, stationId,
> timestamp, value, trip, eventTime], source=[SensorTuples])
>
> == Physical Execution Plan ==
> ....
>
> Thanks,
> Felipe
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>