You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by kant kodali <ka...@gmail.com> on 2020/01/21 10:01:19 UTC

where does flink store the intermediate results of a join and what is the key?

Hi All,

If I run a query like this

StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
table1.col1 = table2.col1")

1) Where will flink store the intermediate result? Imagine flink-conf.yaml
says state.backend = 'rocksdb'

2) If the intermediate results are stored in rockdb then what is the key
and value in this case(given the query above)?

3) What is the best way to query these intermediate results from an
external application? while the job is running and while the job is not
running?

Thanks!

Re: where does flink store the intermediate results of a join and what is the key?

Posted by Arvid Heise <ar...@ververica.com>.
Yes, the default is writing to an external system. Especially if you want
SQL, then there is currently no other way around it.

The drawbacks of writing to external systems are: additional maintenance of
another system and higher latency.

On Tue, Jan 28, 2020 at 11:49 AM kant kodali <ka...@gmail.com> wrote:

> Hi Arvid,
>
> I am trying to understand your statement. I am new to Flink so excuse me
> if I don't know something I should have known. ProcessFunction just process
> the records right? If so, how is it better than writing to an external
> system? At the end of the day I want to be able to query it (doesn't have
> to be through Queryable state and actually I probably don't want to use
> Queryable state for its limitations). But ideally I want to be able to
> query the intermediate states using SQL and hopefully, the store that is
> maintaining the intermediate state has some sort of index support so the
> read queries are faster than doing the full scan.
>
> Also, I hear Querying intermediate state just like one would in a database
> is a widely requested feature so its a bit surprising that this is not
> solved just yet but I am hopeful!
>
> Thanks!
>
>
>
> On Tue, Jan 28, 2020 at 12:45 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Kant,
>>
>> just wanted to mention the obvious. If you add a ProcessFunction right
>> after the join, you could maintain a user state with the same result. That
>> will of course blow up the data volume by a factor of 2, but may still be
>> better than writing to an external system.
>>
>> On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <
>> benoit.paris@centraliens-lille.org> wrote:
>>
>>> Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines
>>> changed.
>>> Thanks for the details, Jark!
>>>
>>> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <im...@gmail.com> wrote:
>>>
>>>> Hi Kant,
>>>> Having a custom state backend is very difficult and is not recommended.
>>>>
>>>> Hi Benoît,
>>>> Yes, the "Query on the intermediate state is on the roadmap" I
>>>> mentioned is referring to integrate Table API & SQL with Queryable State.
>>>> We also have an early issue FLINK-6968 to tracks this.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>>
>>>> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
>>>> benoit.paris@centraliens-lille.org> wrote:
>>>>
>>>>> Hi all!
>>>>>
>>>>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
>>>>> on the intermediate state is on the roadmap"?
>>>>> Are you referring to working on
>>>>> QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME
>>>>> AS OF" [2], or on other APIs/concepts (is there a FLIP?)?
>>>>>
>>>>> Cheers
>>>>> Ben
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>>>>
>>>>>
>>>>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Is it a common practice to have a custom state backend? if so, what
>>>>>> would be a popular custom backend?
>>>>>>
>>>>>> Can I do Elasticseatch as a state backend?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <im...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Kant,
>>>>>>>
>>>>>>> 1) List of row is also sufficient in this case. Using a MapState is
>>>>>>> in order to retract a row faster, and save the storage size.
>>>>>>>
>>>>>>> 2) State Process API is usually used to process save point. I’m
>>>>>>> afraid the performance is not good to use it for querying.
>>>>>>>     On the other side, AFAIK, State Process API requires the uid of
>>>>>>> operator. However, uid of operators is not set in Table API & SQL.
>>>>>>>     So I’m not sure whether it works or not.
>>>>>>>
>>>>>>> 3)You can have a custom statebackend by
>>>>>>> implement org.apache.flink.runtime.state.StateBackend interface, and use it
>>>>>>> via `env.setStateBackend(…)`.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Wed, 22 Jan 2020 at 14:16, kant kodali <ka...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Jark,
>>>>>>>>
>>>>>>>> 1) shouldn't it be a col1 to List of row? multiple rows can have
>>>>>>>> the same joining key right?
>>>>>>>>
>>>>>>>> 2) Can I use state processor API
>>>>>>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>>>>>>>> from an external application to query the intermediate results in near
>>>>>>>> real-time? I thought querying rocksdb state is a widely requested feature.
>>>>>>>> It would be really great to consider this feature for 1.11
>>>>>>>>
>>>>>>>> 3) Is there any interface where I can implement my own state
>>>>>>>> backend?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <im...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Kant,
>>>>>>>>>
>>>>>>>>> 1) Yes, it will be stored in rocksdb statebackend.
>>>>>>>>> 2) In old planner, the left state is the same with right state
>>>>>>>>> which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
>>>>>>>>>     It is a 2-level map structure, where the `col1` is the join
>>>>>>>>> key, it is the first-level key of the state. The key of the MapState is the
>>>>>>>>> input row,
>>>>>>>>>     and the `count` is the number of this row, the expiredTime
>>>>>>>>> indicates when to cleanup this row (avoid infinite state size). You can
>>>>>>>>> find the source code here[1].
>>>>>>>>>     In blink planner, the state structure will be more complex
>>>>>>>>> which is determined by the meta-information of upstream. You can see the
>>>>>>>>> source code of blink planner here [2].
>>>>>>>>> 3) Currently, the intermediate state is not exposed to users.
>>>>>>>>> Usually, users should write the query result to an external system (like
>>>>>>>>> Mysql) and query the external system.
>>>>>>>>>     Query on the intermediate state is on the roadmap, but I guess
>>>>>>>>> it is not in 1.11 plan.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jark
>>>>>>>>>
>>>>>>>>> [1]:
>>>>>>>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>>>>>>>> [2]:
>>>>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2020年1月21日 18:01,kant kodali <ka...@gmail.com> 写道:
>>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> If I run a query like this
>>>>>>>>>
>>>>>>>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2
>>>>>>>>> on table1.col1 = table2.col1")
>>>>>>>>>
>>>>>>>>> 1) Where will flink store the intermediate result? Imagine
>>>>>>>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>>>>>>>
>>>>>>>>> 2) If the intermediate results are stored in rockdb then what is
>>>>>>>>> the key and value in this case(given the query above)?
>>>>>>>>>
>>>>>>>>> 3) What is the best way to query these intermediate results from
>>>>>>>>> an external application? while the job is running and while the job is not
>>>>>>>>> running?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>
>>>>> --
>>>>> Benoît Paris
>>>>> Ingénieur Machine Learning Explicable
>>>>> Tél : +33 6 60 74 23 00
>>>>> http://benoit.paris
>>>>> http://explicable.ml
>>>>>
>>>>
>>>
>>> --
>>> Benoît Paris
>>> Ingénieur Machine Learning Explicable
>>> Tél : +33 6 60 74 23 00
>>> http://benoit.paris
>>> http://explicable.ml
>>>
>>

Re: where does flink store the intermediate results of a join and what is the key?

Posted by kant kodali <ka...@gmail.com>.
Hi Arvid,

I am trying to understand your statement. I am new to Flink so excuse me if
I don't know something I should have known. ProcessFunction just process
the records right? If so, how is it better than writing to an external
system? At the end of the day I want to be able to query it (doesn't have
to be through Queryable state and actually I probably don't want to use
Queryable state for its limitations). But ideally I want to be able to
query the intermediate states using SQL and hopefully, the store that is
maintaining the intermediate state has some sort of index support so the
read queries are faster than doing the full scan.

Also, I hear Querying intermediate state just like one would in a database
is a widely requested feature so its a bit surprising that this is not
solved just yet but I am hopeful!

Thanks!



On Tue, Jan 28, 2020 at 12:45 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Kant,
>
> just wanted to mention the obvious. If you add a ProcessFunction right
> after the join, you could maintain a user state with the same result. That
> will of course blow up the data volume by a factor of 2, but may still be
> better than writing to an external system.
>
> On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <
> benoit.paris@centraliens-lille.org> wrote:
>
>> Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines
>> changed.
>> Thanks for the details, Jark!
>>
>> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <im...@gmail.com> wrote:
>>
>>> Hi Kant,
>>> Having a custom state backend is very difficult and is not recommended.
>>>
>>> Hi Benoît,
>>> Yes, the "Query on the intermediate state is on the roadmap" I
>>> mentioned is referring to integrate Table API & SQL with Queryable State.
>>> We also have an early issue FLINK-6968 to tracks this.
>>>
>>> Best,
>>> Jark
>>>
>>>
>>> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
>>> benoit.paris@centraliens-lille.org> wrote:
>>>
>>>> Hi all!
>>>>
>>>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
>>>> on the intermediate state is on the roadmap"?
>>>> Are you referring to working on
>>>> QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME
>>>> AS OF" [2], or on other APIs/concepts (is there a FLIP?)?
>>>>
>>>> Cheers
>>>> Ben
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>>>
>>>>
>>>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali <ka...@gmail.com> wrote:
>>>>
>>>>> Is it a common practice to have a custom state backend? if so, what
>>>>> would be a popular custom backend?
>>>>>
>>>>> Can I do Elasticseatch as a state backend?
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <im...@gmail.com> wrote:
>>>>>
>>>>>> Hi Kant,
>>>>>>
>>>>>> 1) List of row is also sufficient in this case. Using a MapState is
>>>>>> in order to retract a row faster, and save the storage size.
>>>>>>
>>>>>> 2) State Process API is usually used to process save point. I’m
>>>>>> afraid the performance is not good to use it for querying.
>>>>>>     On the other side, AFAIK, State Process API requires the uid of
>>>>>> operator. However, uid of operators is not set in Table API & SQL.
>>>>>>     So I’m not sure whether it works or not.
>>>>>>
>>>>>> 3)You can have a custom statebackend by
>>>>>> implement org.apache.flink.runtime.state.StateBackend interface, and use it
>>>>>> via `env.setStateBackend(…)`.
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Wed, 22 Jan 2020 at 14:16, kant kodali <ka...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Jark,
>>>>>>>
>>>>>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>>>>>>> same joining key right?
>>>>>>>
>>>>>>> 2) Can I use state processor API
>>>>>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>>>>>>> from an external application to query the intermediate results in near
>>>>>>> real-time? I thought querying rocksdb state is a widely requested feature.
>>>>>>> It would be really great to consider this feature for 1.11
>>>>>>>
>>>>>>> 3) Is there any interface where I can implement my own state backend?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <im...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Kant,
>>>>>>>>
>>>>>>>> 1) Yes, it will be stored in rocksdb statebackend.
>>>>>>>> 2) In old planner, the left state is the same with right state
>>>>>>>> which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
>>>>>>>>     It is a 2-level map structure, where the `col1` is the join
>>>>>>>> key, it is the first-level key of the state. The key of the MapState is the
>>>>>>>> input row,
>>>>>>>>     and the `count` is the number of this row, the expiredTime
>>>>>>>> indicates when to cleanup this row (avoid infinite state size). You can
>>>>>>>> find the source code here[1].
>>>>>>>>     In blink planner, the state structure will be more complex
>>>>>>>> which is determined by the meta-information of upstream. You can see the
>>>>>>>> source code of blink planner here [2].
>>>>>>>> 3) Currently, the intermediate state is not exposed to users.
>>>>>>>> Usually, users should write the query result to an external system (like
>>>>>>>> Mysql) and query the external system.
>>>>>>>>     Query on the intermediate state is on the roadmap, but I guess
>>>>>>>> it is not in 1.11 plan.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jark
>>>>>>>>
>>>>>>>> [1]:
>>>>>>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>>>>>>> [2]:
>>>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>>>>>>
>>>>>>>>
>>>>>>>> 2020年1月21日 18:01,kant kodali <ka...@gmail.com> 写道:
>>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> If I run a query like this
>>>>>>>>
>>>>>>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2
>>>>>>>> on table1.col1 = table2.col1")
>>>>>>>>
>>>>>>>> 1) Where will flink store the intermediate result? Imagine
>>>>>>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>>>>>>
>>>>>>>> 2) If the intermediate results are stored in rockdb then what is
>>>>>>>> the key and value in this case(given the query above)?
>>>>>>>>
>>>>>>>> 3) What is the best way to query these intermediate results from an
>>>>>>>> external application? while the job is running and while the job is not
>>>>>>>> running?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>
>>>> --
>>>> Benoît Paris
>>>> Ingénieur Machine Learning Explicable
>>>> Tél : +33 6 60 74 23 00
>>>> http://benoit.paris
>>>> http://explicable.ml
>>>>
>>>
>>
>> --
>> Benoît Paris
>> Ingénieur Machine Learning Explicable
>> Tél : +33 6 60 74 23 00
>> http://benoit.paris
>> http://explicable.ml
>>
>

Re: where does flink store the intermediate results of a join and what is the key?

Posted by Arvid Heise <ar...@ververica.com>.
Hi Kant,

just wanted to mention the obvious. If you add a ProcessFunction right
after the join, you could maintain a user state with the same result. That
will of course blow up the data volume by a factor of 2, but may still be
better than writing to an external system.

On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <
benoit.paris@centraliens-lille.org> wrote:

> Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines
> changed.
> Thanks for the details, Jark!
>
> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <im...@gmail.com> wrote:
>
>> Hi Kant,
>> Having a custom state backend is very difficult and is not recommended.
>>
>> Hi Benoît,
>> Yes, the "Query on the intermediate state is on the roadmap" I
>> mentioned is referring to integrate Table API & SQL with Queryable State.
>> We also have an early issue FLINK-6968 to tracks this.
>>
>> Best,
>> Jark
>>
>>
>> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
>> benoit.paris@centraliens-lille.org> wrote:
>>
>>> Hi all!
>>>
>>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
>>> on the intermediate state is on the roadmap"?
>>> Are you referring to working on
>>> QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME
>>> AS OF" [2], or on other APIs/concepts (is there a FLIP?)?
>>>
>>> Cheers
>>> Ben
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>>
>>>
>>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali <ka...@gmail.com> wrote:
>>>
>>>> Is it a common practice to have a custom state backend? if so, what
>>>> would be a popular custom backend?
>>>>
>>>> Can I do Elasticseatch as a state backend?
>>>>
>>>> Thanks!
>>>>
>>>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <im...@gmail.com> wrote:
>>>>
>>>>> Hi Kant,
>>>>>
>>>>> 1) List of row is also sufficient in this case. Using a MapState is in
>>>>> order to retract a row faster, and save the storage size.
>>>>>
>>>>> 2) State Process API is usually used to process save point. I’m afraid
>>>>> the performance is not good to use it for querying.
>>>>>     On the other side, AFAIK, State Process API requires the uid of
>>>>> operator. However, uid of operators is not set in Table API & SQL.
>>>>>     So I’m not sure whether it works or not.
>>>>>
>>>>> 3)You can have a custom statebackend by
>>>>> implement org.apache.flink.runtime.state.StateBackend interface, and use it
>>>>> via `env.setStateBackend(…)`.
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Wed, 22 Jan 2020 at 14:16, kant kodali <ka...@gmail.com> wrote:
>>>>>
>>>>>> Hi Jark,
>>>>>>
>>>>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>>>>>> same joining key right?
>>>>>>
>>>>>> 2) Can I use state processor API
>>>>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>>>>>> from an external application to query the intermediate results in near
>>>>>> real-time? I thought querying rocksdb state is a widely requested feature.
>>>>>> It would be really great to consider this feature for 1.11
>>>>>>
>>>>>> 3) Is there any interface where I can implement my own state backend?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <im...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Kant,
>>>>>>>
>>>>>>> 1) Yes, it will be stored in rocksdb statebackend.
>>>>>>> 2) In old planner, the left state is the same with right state which
>>>>>>> are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
>>>>>>>     It is a 2-level map structure, where the `col1` is the join key,
>>>>>>> it is the first-level key of the state. The key of the MapState is the
>>>>>>> input row,
>>>>>>>     and the `count` is the number of this row, the expiredTime
>>>>>>> indicates when to cleanup this row (avoid infinite state size). You can
>>>>>>> find the source code here[1].
>>>>>>>     In blink planner, the state structure will be more complex which
>>>>>>> is determined by the meta-information of upstream. You can see the source
>>>>>>> code of blink planner here [2].
>>>>>>> 3) Currently, the intermediate state is not exposed to users.
>>>>>>> Usually, users should write the query result to an external system (like
>>>>>>> Mysql) and query the external system.
>>>>>>>     Query on the intermediate state is on the roadmap, but I guess
>>>>>>> it is not in 1.11 plan.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> [1]:
>>>>>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>>>>>> [2]:
>>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>>>>>
>>>>>>>
>>>>>>> 2020年1月21日 18:01,kant kodali <ka...@gmail.com> 写道:
>>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> If I run a query like this
>>>>>>>
>>>>>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>>>>>>> table1.col1 = table2.col1")
>>>>>>>
>>>>>>> 1) Where will flink store the intermediate result? Imagine
>>>>>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>>>>>
>>>>>>> 2) If the intermediate results are stored in rockdb then what is the
>>>>>>> key and value in this case(given the query above)?
>>>>>>>
>>>>>>> 3) What is the best way to query these intermediate results from an
>>>>>>> external application? while the job is running and while the job is not
>>>>>>> running?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>>
>>>>>>>
>>>
>>> --
>>> Benoît Paris
>>> Ingénieur Machine Learning Explicable
>>> Tél : +33 6 60 74 23 00
>>> http://benoit.paris
>>> http://explicable.ml
>>>
>>
>
> --
> Benoît Paris
> Ingénieur Machine Learning Explicable
> Tél : +33 6 60 74 23 00
> http://benoit.paris
> http://explicable.ml
>

Re: where does flink store the intermediate results of a join and what is the key?

Posted by Benoît Paris <be...@centraliens-lille.org>.
Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines changed.
Thanks for the details, Jark!

On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <im...@gmail.com> wrote:

> Hi Kant,
> Having a custom state backend is very difficult and is not recommended.
>
> Hi Benoît,
> Yes, the "Query on the intermediate state is on the roadmap" I
> mentioned is referring to integrate Table API & SQL with Queryable State.
> We also have an early issue FLINK-6968 to tracks this.
>
> Best,
> Jark
>
>
> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
> benoit.paris@centraliens-lille.org> wrote:
>
>> Hi all!
>>
>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
>> on the intermediate state is on the roadmap"?
>> Are you referring to working on QueryableStateStream/QueryableStateClient
>> [1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is
>> there a FLIP?)?
>>
>> Cheers
>> Ben
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>
>>
>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali <ka...@gmail.com> wrote:
>>
>>> Is it a common practice to have a custom state backend? if so, what
>>> would be a popular custom backend?
>>>
>>> Can I do Elasticseatch as a state backend?
>>>
>>> Thanks!
>>>
>>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <im...@gmail.com> wrote:
>>>
>>>> Hi Kant,
>>>>
>>>> 1) List of row is also sufficient in this case. Using a MapState is in
>>>> order to retract a row faster, and save the storage size.
>>>>
>>>> 2) State Process API is usually used to process save point. I’m afraid
>>>> the performance is not good to use it for querying.
>>>>     On the other side, AFAIK, State Process API requires the uid of
>>>> operator. However, uid of operators is not set in Table API & SQL.
>>>>     So I’m not sure whether it works or not.
>>>>
>>>> 3)You can have a custom statebackend by
>>>> implement org.apache.flink.runtime.state.StateBackend interface, and use it
>>>> via `env.setStateBackend(…)`.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Wed, 22 Jan 2020 at 14:16, kant kodali <ka...@gmail.com> wrote:
>>>>
>>>>> Hi Jark,
>>>>>
>>>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>>>>> same joining key right?
>>>>>
>>>>> 2) Can I use state processor API
>>>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>>>>> from an external application to query the intermediate results in near
>>>>> real-time? I thought querying rocksdb state is a widely requested feature.
>>>>> It would be really great to consider this feature for 1.11
>>>>>
>>>>> 3) Is there any interface where I can implement my own state backend?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <im...@gmail.com> wrote:
>>>>>
>>>>>> Hi Kant,
>>>>>>
>>>>>> 1) Yes, it will be stored in rocksdb statebackend.
>>>>>> 2) In old planner, the left state is the same with right state which
>>>>>> are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
>>>>>>     It is a 2-level map structure, where the `col1` is the join key,
>>>>>> it is the first-level key of the state. The key of the MapState is the
>>>>>> input row,
>>>>>>     and the `count` is the number of this row, the expiredTime
>>>>>> indicates when to cleanup this row (avoid infinite state size). You can
>>>>>> find the source code here[1].
>>>>>>     In blink planner, the state structure will be more complex which
>>>>>> is determined by the meta-information of upstream. You can see the source
>>>>>> code of blink planner here [2].
>>>>>> 3) Currently, the intermediate state is not exposed to users.
>>>>>> Usually, users should write the query result to an external system (like
>>>>>> Mysql) and query the external system.
>>>>>>     Query on the intermediate state is on the roadmap, but I guess it
>>>>>> is not in 1.11 plan.
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> [1]:
>>>>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>>>>> [2]:
>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>>>>
>>>>>>
>>>>>> 2020年1月21日 18:01,kant kodali <ka...@gmail.com> 写道:
>>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> If I run a query like this
>>>>>>
>>>>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>>>>>> table1.col1 = table2.col1")
>>>>>>
>>>>>> 1) Where will flink store the intermediate result? Imagine
>>>>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>>>>
>>>>>> 2) If the intermediate results are stored in rockdb then what is the
>>>>>> key and value in this case(given the query above)?
>>>>>>
>>>>>> 3) What is the best way to query these intermediate results from an
>>>>>> external application? while the job is running and while the job is not
>>>>>> running?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>>
>>
>> --
>> Benoît Paris
>> Ingénieur Machine Learning Explicable
>> Tél : +33 6 60 74 23 00
>> http://benoit.paris
>> http://explicable.ml
>>
>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml

Re: where does flink store the intermediate results of a join and what is the key?

Posted by Jark Wu <im...@gmail.com>.
Hi Kant,
Having a custom state backend is very difficult and is not recommended.

Hi Benoît,
Yes, the "Query on the intermediate state is on the roadmap" I mentioned is
referring to integrate Table API & SQL with Queryable State.
We also have an early issue FLINK-6968 to tracks this.

Best,
Jark


On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
benoit.paris@centraliens-lille.org> wrote:

> Hi all!
>
> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
> on the intermediate state is on the roadmap"?
> Are you referring to working on QueryableStateStream/QueryableStateClient
> [1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is
> there a FLIP?)?
>
> Cheers
> Ben
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>
>
> On Thu, Jan 23, 2020 at 6:40 AM kant kodali <ka...@gmail.com> wrote:
>
>> Is it a common practice to have a custom state backend? if so, what would
>> be a popular custom backend?
>>
>> Can I do Elasticseatch as a state backend?
>>
>> Thanks!
>>
>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <im...@gmail.com> wrote:
>>
>>> Hi Kant,
>>>
>>> 1) List of row is also sufficient in this case. Using a MapState is in
>>> order to retract a row faster, and save the storage size.
>>>
>>> 2) State Process API is usually used to process save point. I’m afraid
>>> the performance is not good to use it for querying.
>>>     On the other side, AFAIK, State Process API requires the uid of
>>> operator. However, uid of operators is not set in Table API & SQL.
>>>     So I’m not sure whether it works or not.
>>>
>>> 3)You can have a custom statebackend by
>>> implement org.apache.flink.runtime.state.StateBackend interface, and use it
>>> via `env.setStateBackend(…)`.
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 22 Jan 2020 at 14:16, kant kodali <ka...@gmail.com> wrote:
>>>
>>>> Hi Jark,
>>>>
>>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>>>> same joining key right?
>>>>
>>>> 2) Can I use state processor API
>>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>>>> from an external application to query the intermediate results in near
>>>> real-time? I thought querying rocksdb state is a widely requested feature.
>>>> It would be really great to consider this feature for 1.11
>>>>
>>>> 3) Is there any interface where I can implement my own state backend?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <im...@gmail.com> wrote:
>>>>
>>>>> Hi Kant,
>>>>>
>>>>> 1) Yes, it will be stored in rocksdb statebackend.
>>>>> 2) In old planner, the left state is the same with right state which
>>>>> are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
>>>>>     It is a 2-level map structure, where the `col1` is the join key,
>>>>> it is the first-level key of the state. The key of the MapState is the
>>>>> input row,
>>>>>     and the `count` is the number of this row, the expiredTime
>>>>> indicates when to cleanup this row (avoid infinite state size). You can
>>>>> find the source code here[1].
>>>>>     In blink planner, the state structure will be more complex which
>>>>> is determined by the meta-information of upstream. You can see the source
>>>>> code of blink planner here [2].
>>>>> 3) Currently, the intermediate state is not exposed to users. Usually,
>>>>> users should write the query result to an external system (like Mysql) and
>>>>> query the external system.
>>>>>     Query on the intermediate state is on the roadmap, but I guess it
>>>>> is not in 1.11 plan.
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> [1]:
>>>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>>>> [2]:
>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>>>
>>>>>
>>>>> 2020年1月21日 18:01,kant kodali <ka...@gmail.com> 写道:
>>>>>
>>>>> Hi All,
>>>>>
>>>>> If I run a query like this
>>>>>
>>>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>>>>> table1.col1 = table2.col1")
>>>>>
>>>>> 1) Where will flink store the intermediate result? Imagine
>>>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>>>
>>>>> 2) If the intermediate results are stored in rockdb then what is the
>>>>> key and value in this case(given the query above)?
>>>>>
>>>>> 3) What is the best way to query these intermediate results from an
>>>>> external application? while the job is running and while the job is not
>>>>> running?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>
> --
> Benoît Paris
> Ingénieur Machine Learning Explicable
> Tél : +33 6 60 74 23 00
> http://benoit.paris
> http://explicable.ml
>

Re: where does flink store the intermediate results of a join and what is the key?

Posted by Benoît Paris <be...@centraliens-lille.org>.
Hi all!

@Jark, out of curiosity, would you be so kind as to expand a bit on "Query
on the intermediate state is on the roadmap"?
Are you referring to working on QueryableStateStream/QueryableStateClient
[1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is
there a FLIP?)?

Cheers
Ben

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table


On Thu, Jan 23, 2020 at 6:40 AM kant kodali <ka...@gmail.com> wrote:

> Is it a common practice to have a custom state backend? if so, what would
> be a popular custom backend?
>
> Can I do Elasticseatch as a state backend?
>
> Thanks!
>
> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <im...@gmail.com> wrote:
>
>> Hi Kant,
>>
>> 1) List of row is also sufficient in this case. Using a MapState is in
>> order to retract a row faster, and save the storage size.
>>
>> 2) State Process API is usually used to process save point. I’m afraid
>> the performance is not good to use it for querying.
>>     On the other side, AFAIK, State Process API requires the uid of
>> operator. However, uid of operators is not set in Table API & SQL.
>>     So I’m not sure whether it works or not.
>>
>> 3)You can have a custom statebackend by
>> implement org.apache.flink.runtime.state.StateBackend interface, and use it
>> via `env.setStateBackend(…)`.
>>
>> Best,
>> Jark
>>
>> On Wed, 22 Jan 2020 at 14:16, kant kodali <ka...@gmail.com> wrote:
>>
>>> Hi Jark,
>>>
>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>>> same joining key right?
>>>
>>> 2) Can I use state processor API
>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>>> from an external application to query the intermediate results in near
>>> real-time? I thought querying rocksdb state is a widely requested feature.
>>> It would be really great to consider this feature for 1.11
>>>
>>> 3) Is there any interface where I can implement my own state backend?
>>>
>>> Thanks!
>>>
>>>
>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <im...@gmail.com> wrote:
>>>
>>>> Hi Kant,
>>>>
>>>> 1) Yes, it will be stored in rocksdb statebackend.
>>>> 2) In old planner, the left state is the same with right state which
>>>> are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
>>>>     It is a 2-level map structure, where the `col1` is the join key, it
>>>> is the first-level key of the state. The key of the MapState is the input
>>>> row,
>>>>     and the `count` is the number of this row, the expiredTime
>>>> indicates when to cleanup this row (avoid infinite state size). You can
>>>> find the source code here[1].
>>>>     In blink planner, the state structure will be more complex which is
>>>> determined by the meta-information of upstream. You can see the source code
>>>> of blink planner here [2].
>>>> 3) Currently, the intermediate state is not exposed to users. Usually,
>>>> users should write the query result to an external system (like Mysql) and
>>>> query the external system.
>>>>     Query on the intermediate state is on the roadmap, but I guess it
>>>> is not in 1.11 plan.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> [1]:
>>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>>> [2]:
>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>>
>>>>
>>>> 2020年1月21日 18:01,kant kodali <ka...@gmail.com> 写道:
>>>>
>>>> Hi All,
>>>>
>>>> If I run a query like this
>>>>
>>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>>>> table1.col1 = table2.col1")
>>>>
>>>> 1) Where will flink store the intermediate result? Imagine
>>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>>
>>>> 2) If the intermediate results are stored in rockdb then what is the
>>>> key and value in this case(given the query above)?
>>>>
>>>> 3) What is the best way to query these intermediate results from an
>>>> external application? while the job is running and while the job is not
>>>> running?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml

Re: where does flink store the intermediate results of a join and what is the key?

Posted by kant kodali <ka...@gmail.com>.
Is it a common practice to have a custom state backend? if so, what would
be a popular custom backend?

Can I do Elasticseatch as a state backend?

Thanks!

On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <im...@gmail.com> wrote:

> Hi Kant,
>
> 1) List of row is also sufficient in this case. Using a MapState is in
> order to retract a row faster, and save the storage size.
>
> 2) State Process API is usually used to process save point. I’m afraid the
> performance is not good to use it for querying.
>     On the other side, AFAIK, State Process API requires the uid of
> operator. However, uid of operators is not set in Table API & SQL.
>     So I’m not sure whether it works or not.
>
> 3)You can have a custom statebackend by
> implement org.apache.flink.runtime.state.StateBackend interface, and use it
> via `env.setStateBackend(…)`.
>
> Best,
> Jark
>
> On Wed, 22 Jan 2020 at 14:16, kant kodali <ka...@gmail.com> wrote:
>
>> Hi Jark,
>>
>> 1) shouldn't it be a col1 to List of row? multiple rows can have the same
>> joining key right?
>>
>> 2) Can I use state processor API
>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>> from an external application to query the intermediate results in near
>> real-time? I thought querying rocksdb state is a widely requested feature.
>> It would be really great to consider this feature for 1.11
>>
>> 3) Is there any interface where I can implement my own state backend?
>>
>> Thanks!
>>
>>
>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <im...@gmail.com> wrote:
>>
>>> Hi Kant,
>>>
>>> 1) Yes, it will be stored in rocksdb statebackend.
>>> 2) In old planner, the left state is the same with right state which are
>>> both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
>>>     It is a 2-level map structure, where the `col1` is the join key, it
>>> is the first-level key of the state. The key of the MapState is the input
>>> row,
>>>     and the `count` is the number of this row, the expiredTime indicates
>>> when to cleanup this row (avoid infinite state size). You can find the
>>> source code here[1].
>>>     In blink planner, the state structure will be more complex which is
>>> determined by the meta-information of upstream. You can see the source code
>>> of blink planner here [2].
>>> 3) Currently, the intermediate state is not exposed to users. Usually,
>>> users should write the query result to an external system (like Mysql) and
>>> query the external system.
>>>     Query on the intermediate state is on the roadmap, but I guess it is
>>> not in 1.11 plan.
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>> [2]:
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>
>>>
>>> 2020年1月21日 18:01,kant kodali <ka...@gmail.com> 写道:
>>>
>>> Hi All,
>>>
>>> If I run a query like this
>>>
>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>>> table1.col1 = table2.col1")
>>>
>>> 1) Where will flink store the intermediate result? Imagine
>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>
>>> 2) If the intermediate results are stored in rockdb then what is the key
>>> and value in this case(given the query above)?
>>>
>>> 3) What is the best way to query these intermediate results from an
>>> external application? while the job is running and while the job is not
>>> running?
>>>
>>> Thanks!
>>>
>>>
>>>

Re: where does flink store the intermediate results of a join and what is the key?

Posted by Jark Wu <im...@gmail.com>.
Hi Kant,

1) List of row is also sufficient in this case. Using a MapState is in
order to retract a row faster, and save the storage size.

2) State Process API is usually used to process save point. I’m afraid the
performance is not good to use it for querying.
    On the other side, AFAIK, State Process API requires the uid of
operator. However, uid of operators is not set in Table API & SQL.
    So I’m not sure whether it works or not.

3)You can have a custom statebackend by
implement org.apache.flink.runtime.state.StateBackend interface, and use it
via `env.setStateBackend(…)`.

Best,
Jark

On Wed, 22 Jan 2020 at 14:16, kant kodali <ka...@gmail.com> wrote:

> Hi Jark,
>
> 1) shouldn't it be a col1 to List of row? multiple rows can have the same
> joining key right?
>
> 2) Can I use state processor API
> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
> from an external application to query the intermediate results in near
> real-time? I thought querying rocksdb state is a widely requested feature.
> It would be really great to consider this feature for 1.11
>
> 3) Is there any interface where I can implement my own state backend?
>
> Thanks!
>
>
> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <im...@gmail.com> wrote:
>
>> Hi Kant,
>>
>> 1) Yes, it will be stored in rocksdb statebackend.
>> 2) In old planner, the left state is the same with right state which are
>> both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
>>     It is a 2-level map structure, where the `col1` is the join key, it
>> is the first-level key of the state. The key of the MapState is the input
>> row,
>>     and the `count` is the number of this row, the expiredTime indicates
>> when to cleanup this row (avoid infinite state size). You can find the
>> source code here[1].
>>     In blink planner, the state structure will be more complex which is
>> determined by the meta-information of upstream. You can see the source code
>> of blink planner here [2].
>> 3) Currently, the intermediate state is not exposed to users. Usually,
>> users should write the query result to an external system (like Mysql) and
>> query the external system.
>>     Query on the intermediate state is on the roadmap, but I guess it is
>> not in 1.11 plan.
>>
>> Best,
>> Jark
>>
>> [1]:
>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>> [2]:
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>
>>
>> 2020年1月21日 18:01,kant kodali <ka...@gmail.com> 写道:
>>
>> Hi All,
>>
>> If I run a query like this
>>
>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>> table1.col1 = table2.col1")
>>
>> 1) Where will flink store the intermediate result? Imagine
>> flink-conf.yaml says state.backend = 'rocksdb'
>>
>> 2) If the intermediate results are stored in rockdb then what is the key
>> and value in this case(given the query above)?
>>
>> 3) What is the best way to query these intermediate results from an
>> external application? while the job is running and while the job is not
>> running?
>>
>> Thanks!
>>
>>
>>

Re: where does flink store the intermediate results of a join and what is the key?

Posted by kant kodali <ka...@gmail.com>.
Hi Jark,

1) shouldn't it be a col1 to List of row? multiple rows can have the same
joining key right?

2) Can I use state processor API
<https://flink.apache.org/feature/2019/09/13/state-processor-api.html> from
an external application to query the intermediate results in near
real-time? I thought querying rocksdb state is a widely requested feature.
It would be really great to consider this feature for 1.11

3) Is there any interface where I can implement my own state backend?

Thanks!


On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <im...@gmail.com> wrote:

> Hi Kant,
>
> 1) Yes, it will be stored in rocksdb statebackend.
> 2) In old planner, the left state is the same with right state which are
> both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
>     It is a 2-level map structure, where the `col1` is the join key, it is
> the first-level key of the state. The key of the MapState is the input row,
>     and the `count` is the number of this row, the expiredTime indicates
> when to cleanup this row (avoid infinite state size). You can find the
> source code here[1].
>     In blink planner, the state structure will be more complex which is
> determined by the meta-information of upstream. You can see the source code
> of blink planner here [2].
> 3) Currently, the intermediate state is not exposed to users. Usually,
> users should write the query result to an external system (like Mysql) and
> query the external system.
>     Query on the intermediate state is on the roadmap, but I guess it is
> not in 1.11 plan.
>
> Best,
> Jark
>
> [1]:
> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
> [2]:
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>
>
> 2020年1月21日 18:01,kant kodali <ka...@gmail.com> 写道:
>
> Hi All,
>
> If I run a query like this
>
> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
> table1.col1 = table2.col1")
>
> 1) Where will flink store the intermediate result? Imagine flink-conf.yaml
> says state.backend = 'rocksdb'
>
> 2) If the intermediate results are stored in rockdb then what is the key
> and value in this case(given the query above)?
>
> 3) What is the best way to query these intermediate results from an
> external application? while the job is running and while the job is not
> running?
>
> Thanks!
>
>
>

Re: where does flink store the intermediate results of a join and what is the key?

Posted by Jark Wu <im...@gmail.com>.
Hi Kant,

1) Yes, it will be stored in rocksdb statebackend. 
2) In old planner, the left state is the same with right state which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
    It is a 2-level map structure, where the `col1` is the join key, it is the first-level key of the state. The key of the MapState is the input row, 
    and the `count` is the number of this row, the expiredTime indicates when to cleanup this row (avoid infinite state size). You can find the source code here[1].
    In blink planner, the state structure will be more complex which is determined by the meta-information of upstream. You can see the source code of blink planner here [2].
3) Currently, the intermediate state is not exposed to users. Usually, users should write the query result to an external system (like Mysql) and query the external system. 
    Query on the intermediate state is on the roadmap, but I guess it is not in 1.11 plan. 

Best,
Jark

[1]: http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61 <http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61>
[2]: https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45 <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45>


> 2020年1月21日 18:01,kant kodali <ka...@gmail.com> 写道:
> 
> Hi All,
> 
> If I run a query like this
> 
> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on table1.col1 = table2.col1")
> 
> 1) Where will flink store the intermediate result? Imagine flink-conf.yaml says state.backend = 'rocksdb'
> 
> 2) If the intermediate results are stored in rockdb then what is the key and value in this case(given the query above)?
> 
> 3) What is the best way to query these intermediate results from an external application? while the job is running and while the job is not running?
> 
> Thanks!