You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 徐涛 <ha...@gmail.com> on 2018/08/21 10:19:49 UTC
Semantic when table joins table from window
Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
tableEnv.registerTable("praiseAggr", praiseAggr)
var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
tableEnv.registerTable("finalTable", finalTable)
I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
Thanks a lot.
Best,
Henry
Re: Semantic when table joins table from window
Posted by Hequn Cheng <ch...@gmail.com>.
Hi Henry,
You can increase the retention time to make sure all data you want won't be
expired.
As for incremental, I think we can sink results into a kv storage, say
hbase. The hbase table contains a total and latest data set you want so
that you don't need to flush again. Would it be satisfy your scenario?
Best, Hequn
On Wed, Aug 22, 2018 at 2:51 PM 徐涛 <ha...@gmail.com> wrote:
> Hi Hequn,
> Thanks a lot four your response! This helps me understand the mechanism
> more clearly.
>
> I have another question:
> *How do I use flink to accoplish time attenuation?*
> If a use join plus retention time solution, I can only get the increment
> data. But some other data may need to be recomputed because the time
> attenuation. Then how do I flush them?
>
> Best,
> Henry,
>
> 在 2018年8月22日,上午10:30,Hequn Cheng <ch...@gmail.com> 写道:
>
> Hi Hery,
>
> As for choise1:
>
> - The state size of join depends on it's input table size, not the
> result table, so the state size of join of choise1 depends on how many
> article id, praise id and response_id.
> - Also non-window join will merge same rows in it's state, i.e, <Row,
> RowCnt>, so the state size won't grows if you keep pouring same article id.
> I think the problem here is you need a distinct before join, so that a
> praise id won't join multi same article ids, and this will influence the
> correctness of the result.
> - I think you need do aggregate before join to make sure the
> correctness of the result. Because there are duplicated article id after
> article join praise and this will influence the value
> of count(r.response_id).
> - You can't use window or other bounded operators after non-window
> join. The time attribute fields can not be passed through because of
> semantic conflict.
> - Hop window with large fixed duration and small hop interval should
> be avoided. Data will be redundant in various windows. For example, a
> hopping window of 15 minutes size and 5 minute hop interval assigns each
> row to 3 different windows of 15 minute size.
>
> As for choice2:
>
> - I think you need another filed(for example, HOP_START) when join the
> three tables. Only join records in same window.
>
> To solve your problem, I think we can do non-window group by first and
> then join three result tables. Furthermore, state retention time can be set
> to keep state from growing larger.
>
> Best, Hequn
>
> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <ha...@gmail.com> wrote:
>
>> Hi Fabian,
>> So maybe I can not join a table that generate from a window, because the
>> table is getting larger and larger as the time goes, maybe the system will
>> crash one day.
>>
>> I am working on a system that calculate the “score" of article, which is
>> consist of the count of article praise, the count of article response, etc
>> Because I can not use flink to save all the article, I decide to update
>> the score of the article that created in 3 days.
>>
>> I have two choises,
>> 1. join the article table and praise table, response table then window
>> select a.article_id, count(p.praise_id) as pCount, count(r.response_id)
>> as rCount
>> from
>> article a
>> left join
>> praise p on a.article_id = p.article_id
>> left join
>> response r on a.article_id = r.article_id
>> group by hop(updated_time, interval '1' minute,interval '3' day) ,
>> article_id
>> 2. window the article table, window the priase table, window the response
>> table ,then join them together
>> select aAggr.article_id, pAggr.pCount, rAggr.rCount
>> (select article_id from article group by hop(updated_time, interval '1'
>> minute,interval '3' day) , article_id) aAggr
>> left join
>> (select article_id,count(praise_id) as pCount from praise group by hop(
>> updated_time, interval '1' minute,interval '3' day) , article_id) pAggr
>> on aAggr.article_id=pAggr.article_id
>> left join
>> (select article_id,count(response_id) as rCount from response group by
>> hop(updated_time, interval '1' minute,interval '3' day) , article_id)
>> rAggr on aAggr.article_id=rAggr.article_id
>>
>> Maybe I should choose 1, join then window, but not window then join.
>> Please correct me if I am wrong.
>>
>> I have some worries when choose 1,
>> I do not know how Flink works internally, it seems that in the sql ,
>> table article ,table praise, table response is growing as the time goes by,
>> will it introduce performance issue?
>>
>> Best,
>> Henry
>>
>> 在 2018年8月21日,下午9:29,Hequn Cheng <ch...@gmail.com> 写道:
>>
>> Hi Henry,
>>
>> praiseAggr is an append table, so it contains
>> "100,101,102,100,101,103,100".
>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY
>> article_id", the answer is "101,102,103"
>> 2. if you change your sql to s"SELECT last_value(article_id) FROM
>> praise", the answer is "100"
>>
>> Best, Hequn
>>
>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <ha...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>> Thanks for your response. This question puzzles me for quite a long time.
>>> If the praiseAggr has the following value:
>>> window-1 100,101,102
>>> window-2 100,101,103
>>> window-3 100
>>>
>>> the last time the article table joins praiseAggr, which of the following
>>> value does praiseAggr table has?
>>> 1— 100,101,102,100,101,103,100 collect all the element of all
>>> the window
>>> 2— 100 the element of the latest window
>>> 3— 101,102,103 the distinct value of all the window
>>>
>>>
>>> Best,
>>> Henry
>>>
>>>
>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fh...@gmail.com> 写道:
>>>
>>> Hi,
>>>
>>> The semantics of a query do not depend on the way that it is used.
>>> praiseAggr is a table that grows by one row per second and article_id.
>>> If you use that table in a join, the join will fully materialize the table.
>>> This is a special case because the same row is added multiple times, so
>>> the state won't grow that quickly, but the performance will decrease
>>> because for each row from article will join with multiple (a growing
>>> number) of rows from praiseAggr.
>>>
>>> Best, Fabian
>>>
>>> 2018-08-21 12:19 GMT+02:00 徐涛 <ha...@gmail.com>:
>>>
>>>> Hi All,
>>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise
>>>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) ,
>>>> article_id" )
>>>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>>>
>>>> var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" *)
>>>> tableEnv.registerTable("finalTable", finalTable)
>>>>
>>>> I know that praiseAggr, if written to sink, is append mode , so if a
>>>> table joins praiseAggr, what the table “see”, is a table contains the
>>>> latest value, or a table that grows larger and larger? If it is the later,
>>>> will it introduce performance problem?
>>>> Thanks a lot.
>>>>
>>>>
>>>> Best,
>>>> Henry
>>>>
>>>
>>>
>>>
>>
>>
>
Re: Semantic when table joins table from window
Posted by 徐涛 <ha...@gmail.com>.
Hi Hequn,
Thanks a lot four your response! This helps me understand the mechanism more clearly.
I have another question:
How do I use flink to accoplish time attenuation?
If a use join plus retention time solution, I can only get the increment data. But some other data may need to be recomputed because the time attenuation. Then how do I flush them?
Best,
Henry,
> 在 2018年8月22日,上午10:30,Hequn Cheng <ch...@gmail.com> 写道:
>
> Hi Hery,
>
> As for choise1:
> The state size of join depends on it's input table size, not the result table, so the state size of join of choise1 depends on how many article id, praise id and response_id.
> Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, so the state size won't grows if you keep pouring same article id. I think the problem here is you need a distinct before join, so that a praise id won't join multi same article ids, and this will influence the correctness of the result.
> I think you need do aggregate before join to make sure the correctness of the result. Because there are duplicated article id after article join praise and this will influence the value of count(r.response_id).
> You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
> Hop window with large fixed duration and small hop interval should be avoided. Data will be redundant in various windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size.
> As for choice2:
> I think you need another filed(for example, HOP_START) when join the three tables. Only join records in same window.
> To solve your problem, I think we can do non-window group by first and then join three result tables. Furthermore, state retention time can be set to keep state from growing larger.
>
> Best, Hequn
>
> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
> Hi Fabian,
> So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day.
>
> I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
> Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.
>
> I have two choises,
> 1. join the article table and praise table, response table then window
> select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
> from
> article a
> left join
> praise p on a.article_id = p.article_id
> left join
> response r on a.article_id = r.article_id
> group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
> 2. window the article table, window the priase table, window the response table ,then join them together
> select aAggr.article_id, pAggr.pCount, rAggr.rCount
> (select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
> left join
> (select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
> left join
> (select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id
>
> Maybe I should choose 1, join then window, but not window then join.
> Please correct me if I am wrong.
>
> I have some worries when choose 1,
> I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue?
>
> Best,
> Henry
>
>> 在 2018年8月21日,下午9:29,Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 写道:
>>
>> Hi Henry,
>>
>> praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
>> 2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100"
>>
>> Best, Hequn
>>
>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
>> Hi Fabian,
>> Thanks for your response. This question puzzles me for quite a long time.
>> If the praiseAggr has the following value:
>> window-1 100,101,102
>> window-2 100,101,103
>> window-3 100
>>
>> the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
>> 1— 100,101,102,100,101,103,100 collect all the element of all the window
>> 2— 100 the element of the latest window
>> 3— 101,102,103 the distinct value of all the window
>>
>>
>> Best,
>> Henry
>>
>>
>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> 写道:
>>>
>>> Hi,
>>>
>>> The semantics of a query do not depend on the way that it is used.
>>> praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
>>> This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.
>>>
>>> Best, Fabian
>>>
>>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexutao@gmail.com <ma...@gmail.com>>:
>>> Hi All,
>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
>>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>> var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
>>> tableEnv.registerTable("finalTable", finalTable)
>>> I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
>>> Thanks a lot.
>>>
>>>
>>> Best,
>>> Henry
>>>
>>
>>
>
Re: Semantic when table joins table from window
Posted by Hequn Cheng <ch...@gmail.com>.
Hi Henry,
Fabian is right. You can try to use window join if your want a bounded join.
According to your descriptions. I think what you want is(correct me if I'm
wrong) :
- Only join data within 3 days
- Score should be calculated in bounded way
- Retract previous score which exceed 3 days
So, I think window join + bounded over may solve your problem. Do window
join by `article.time between praise.time - 3days and praise.time + 3days`.
You don't have to add sliding window before window join. After window join,
you can perform a bounded over with 3 days interval to get the scores.
There are documents about window join[1] and over[2].
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#aggregations
On Tue, Aug 28, 2018 at 9:59 PM 徐涛 <ha...@gmail.com> wrote:
> Hi Fabian,
> I am working on a application that compute the “score" of an article by
> the number of praises, and reduce the score by the time, I am balancing on
> two choices:
> 1. Use global window join the article and article praise, with 3 days
> state retention, but I can not get the current time ,time is fixed when the
> program is started, so I can not compute the reduced score. I have to sink
> the data, then write some crontab jobs to update the score.
> 2. Use sliding window join, window length is 3 days , and sliding by one
> minute, this time I can get the window end time, but there so much data
> duplicated in windows, there are performance issues.
> Each choices is not good enough, I am wondering if there are some other
> solves. Thanks a lot.
>
> Best
> Henry
>
> 在 2018年8月28日,下午8:05,Fabian Hueske <fh...@gmail.com> 写道:
>
> Hi,
>
> Currently, Flink's window operators require increasing timestamp
> attributes. This limitation exists to be able to clean up the state of a
> window operator. A join operator does not preserve the order of timestamps.
> Hence, timestamp attributes lose their monotonictity property and a window
> operator cannot be applied.
>
> Have you tried to use a window join? These preserve the timestamp order.
>
> Fabian
>
> 徐涛 <ha...@gmail.com> schrieb am Di., 28. Aug. 2018, 11:42:
>
>> Hi Hequn,
>> You can't use window or other bounded operators after non-window join.
>> The time attribute fields can not be passed through because of semantic
>> conflict.
>> Why does Flink have this limitation?
>> I have a temp view
>> var finalTable = tableEnv.sqlQuery(s"select * from
>> A join B on xxxx
>> join C on xxxx " )
>> tableEnv.registerTable("finalTable", finalTable)
>>
>> And I want to window this table because I want it to output 1
>> minute per second, however obviously I can not do this now, may I ask how
>> can I make a “final table” to output 1 minute per second? And if a table is
>> a retract stream, will the item added to the window be retracted either?
>>
>> Thanks a lot.
>>
>> Best
>> Henry
>>
>>
>>
>> 在 2018年8月22日,上午10:30,Hequn Cheng <ch...@gmail.com> 写道:
>>
>> Hi Hery,
>>
>> As for choise1:
>>
>> - The state size of join depends on it's input table size, not the
>> result table, so the state size of join of choise1 depends on how many
>> article id, praise id and response_id.
>> - Also non-window join will merge same rows in it's state, i.e, <Row,
>> RowCnt>, so the state size won't grows if you keep pouring same article id.
>> I think the problem here is you need a distinct before join, so that a
>> praise id won't join multi same article ids, and this will influence the
>> correctness of the result.
>> - I think you need do aggregate before join to make sure the
>> correctness of the result. Because there are duplicated article id after
>> article join praise and this will influence the value
>> of count(r.response_id).
>> - You can't use window or other bounded operators after non-window
>> join. The time attribute fields can not be passed through because of
>> semantic conflict.
>> - Hop window with large fixed duration and small hop interval should
>> be avoided. Data will be redundant in various windows. For example, a
>> hopping window of 15 minutes size and 5 minute hop interval assigns each
>> row to 3 different windows of 15 minute size.
>>
>> As for choice2:
>>
>> - I think you need another filed(for example, HOP_START) when join
>> the three tables. Only join records in same window.
>>
>> To solve your problem, I think we can do non-window group by first and
>> then join three result tables. Furthermore, state retention time can be set
>> to keep state from growing larger.
>>
>> Best, Hequn
>>
>> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <ha...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>> So maybe I can not join a table that generate from a window, because the
>>> table is getting larger and larger as the time goes, maybe the system will
>>> crash one day.
>>>
>>> I am working on a system that calculate the “score" of article, which is
>>> consist of the count of article praise, the count of article response, etc
>>> Because I can not use flink to save all the article, I decide to update
>>> the score of the article that created in 3 days.
>>>
>>> I have two choises,
>>> 1. join the article table and praise table, response table then window
>>> select a.article_id, count(p.praise_id) as pCount, count(r.response_id)
>>> as rCount
>>> from
>>> article a
>>> left join
>>> praise p on a.article_id = p.article_id
>>> left join
>>> response r on a.article_id = r.article_id
>>> group by hop(updated_time, interval '1' minute,interval '3' day) ,
>>> article_id
>>> 2. window the article table, window the priase table, window the
>>> response table ,then join them together
>>> select aAggr.article_id, pAggr.pCount, rAggr.rCount
>>> (select article_id from article group by hop(updated_time, interval '1'
>>> minute,interval '3' day) , article_id) aAggr
>>> left join
>>> (select article_id,count(praise_id) as pCount from praise group by hop(
>>> updated_time, interval '1' minute,interval '3' day) , article_id) pAggr
>>> on aAggr.article_id=pAggr.article_id
>>> left join
>>> (select article_id,count(response_id) as rCount from response group by
>>> hop(updated_time, interval '1' minute,interval '3' day) , article_id)
>>> rAggr on aAggr.article_id=rAggr.article_id
>>>
>>> Maybe I should choose 1, join then window, but not window then join.
>>> Please correct me if I am wrong.
>>>
>>> I have some worries when choose 1,
>>> I do not know how Flink works internally, it seems that in the sql ,
>>> table article ,table praise, table response is growing as the time goes by,
>>> will it introduce performance issue?
>>>
>>> Best,
>>> Henry
>>>
>>> 在 2018年8月21日,下午9:29,Hequn Cheng <ch...@gmail.com> 写道:
>>>
>>> Hi Henry,
>>>
>>> praiseAggr is an append table, so it contains
>>> "100,101,102,100,101,103,100".
>>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY
>>> article_id", the answer is "101,102,103"
>>> 2. if you change your sql to s"SELECT last_value(article_id) FROM
>>> praise", the answer is "100"
>>>
>>> Best, Hequn
>>>
>>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <ha...@gmail.com> wrote:
>>>
>>>> Hi Fabian,
>>>> Thanks for your response. This question puzzles me for quite a long
>>>> time.
>>>> If the praiseAggr has the following value:
>>>> window-1 100,101,102
>>>> window-2 100,101,103
>>>> window-3 100
>>>>
>>>> the last time the article table joins praiseAggr, which of the
>>>> following value does praiseAggr table has?
>>>> 1— 100,101,102,100,101,103,100 collect all the element of
>>>> all the window
>>>> 2— 100 the element of the latest window
>>>> 3— 101,102,103 the distinct value of all the window
>>>>
>>>>
>>>> Best,
>>>> Henry
>>>>
>>>>
>>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fh...@gmail.com> 写道:
>>>>
>>>> Hi,
>>>>
>>>> The semantics of a query do not depend on the way that it is used.
>>>> praiseAggr is a table that grows by one row per second and article_id.
>>>> If you use that table in a join, the join will fully materialize the table.
>>>> This is a special case because the same row is added multiple times, so
>>>> the state won't grow that quickly, but the performance will decrease
>>>> because for each row from article will join with multiple (a growing
>>>> number) of rows from praiseAggr.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-08-21 12:19 GMT+02:00 徐涛 <ha...@gmail.com>:
>>>>
>>>>> Hi All,
>>>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise
>>>>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) ,
>>>>> article_id" )
>>>>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>>>>
>>>>> var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" *)
>>>>> tableEnv.registerTable("finalTable", finalTable)
>>>>>
>>>>> I know that praiseAggr, if written to sink, is append mode , so if a
>>>>> table joins praiseAggr, what the table “see”, is a table contains the
>>>>> latest value, or a table that grows larger and larger? If it is the later,
>>>>> will it introduce performance problem?
>>>>> Thanks a lot.
>>>>>
>>>>>
>>>>> Best,
>>>>> Henry
>>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>
Re: Semantic when table joins table from window
Posted by 徐涛 <ha...@gmail.com>.
Hi Fabian,
I am working on a application that compute the “score" of an article by the number of praises, and reduce the score by the time, I am balancing on two choices:
1. Use global window join the article and article praise, with 3 days state retention, but I can not get the current time ,time is fixed when the program is started, so I can not compute the reduced score. I have to sink the data, then write some crontab jobs to update the score.
2. Use sliding window join, window length is 3 days , and sliding by one minute, this time I can get the window end time, but there so much data duplicated in windows, there are performance issues.
Each choices is not good enough, I am wondering if there are some other solves. Thanks a lot.
Best
Henry
> 在 2018年8月28日,下午8:05,Fabian Hueske <fh...@gmail.com> 写道:
>
> Hi,
>
> Currently, Flink's window operators require increasing timestamp attributes. This limitation exists to be able to clean up the state of a window operator. A join operator does not preserve the order of timestamps. Hence, timestamp attributes lose their monotonictity property and a window operator cannot be applied.
>
> Have you tried to use a window join? These preserve the timestamp order.
>
> Fabian
>
> 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> schrieb am Di., 28. Aug. 2018, 11:42:
> Hi Hequn,
> You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
> Why does Flink have this limitation?
> I have a temp view
>
> var finalTable = tableEnv.sqlQuery(s"select * from
> A join B on xxxx
> join C on xxxx " )
> tableEnv.registerTable("finalTable", finalTable)
>
> And I want to window this table because I want it to output 1 minute per second, however obviously I can not do this now, may I ask how can I make a “final table” to output 1 minute per second? And if a table is a retract stream, will the item added to the window be retracted either?
>
> Thanks a lot.
>
>
> Best
> Henry
>
>
>
>> 在 2018年8月22日,上午10:30,Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 写道:
>>
>> Hi Hery,
>>
>> As for choise1:
>> The state size of join depends on it's input table size, not the result table, so the state size of join of choise1 depends on how many article id, praise id and response_id.
>> Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, so the state size won't grows if you keep pouring same article id. I think the problem here is you need a distinct before join, so that a praise id won't join multi same article ids, and this will influence the correctness of the result.
>> I think you need do aggregate before join to make sure the correctness of the result. Because there are duplicated article id after article join praise and this will influence the value of count(r.response_id).
>> You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
>> Hop window with large fixed duration and small hop interval should be avoided. Data will be redundant in various windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size.
>> As for choice2:
>> I think you need another filed(for example, HOP_START) when join the three tables. Only join records in same window.
>> To solve your problem, I think we can do non-window group by first and then join three result tables. Furthermore, state retention time can be set to keep state from growing larger.
>>
>> Best, Hequn
>>
>> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
>> Hi Fabian,
>> So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day.
>>
>> I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
>> Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.
>>
>> I have two choises,
>> 1. join the article table and praise table, response table then window
>> select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
>> from
>> article a
>> left join
>> praise p on a.article_id = p.article_id
>> left join
>> response r on a.article_id = r.article_id
>> group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
>> 2. window the article table, window the priase table, window the response table ,then join them together
>> select aAggr.article_id, pAggr.pCount, rAggr.rCount
>> (select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
>> left join
>> (select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
>> left join
>> (select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id
>>
>> Maybe I should choose 1, join then window, but not window then join.
>> Please correct me if I am wrong.
>>
>> I have some worries when choose 1,
>> I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue?
>>
>> Best,
>> Henry
>>
>>> 在 2018年8月21日,下午9:29,Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 写道:
>>>
>>> Hi Henry,
>>>
>>> praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
>>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
>>> 2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100"
>>>
>>> Best, Hequn
>>>
>>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
>>> Hi Fabian,
>>> Thanks for your response. This question puzzles me for quite a long time.
>>> If the praiseAggr has the following value:
>>> window-1 100,101,102
>>> window-2 100,101,103
>>> window-3 100
>>>
>>> the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
>>> 1— 100,101,102,100,101,103,100 collect all the element of all the window
>>> 2— 100 the element of the latest window
>>> 3— 101,102,103 the distinct value of all the window
>>>
>>>
>>> Best,
>>> Henry
>>>
>>>
>>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> 写道:
>>>>
>>>> Hi,
>>>>
>>>> The semantics of a query do not depend on the way that it is used.
>>>> praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
>>>> This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexutao@gmail.com <ma...@gmail.com>>:
>>>> Hi All,
>>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
>>>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>>> var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
>>>> tableEnv.registerTable("finalTable", finalTable)
>>>> I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
>>>> Thanks a lot.
>>>>
>>>>
>>>> Best,
>>>> Henry
>>>>
>>>
>>>
>>
>
Re: Semantic when table joins table from window
Posted by Fabian Hueske <fh...@gmail.com>.
Hi,
Currently, Flink's window operators require increasing timestamp
attributes. This limitation exists to be able to clean up the state of a
window operator. A join operator does not preserve the order of timestamps.
Hence, timestamp attributes lose their monotonictity property and a window
operator cannot be applied.
Have you tried to use a window join? These preserve the timestamp order.
Fabian
徐涛 <ha...@gmail.com> schrieb am Di., 28. Aug. 2018, 11:42:
> Hi Hequn,
> You can't use window or other bounded operators after non-window join. The
> time attribute fields can not be passed through because of semantic
> conflict.
> Why does Flink have this limitation?
> I have a temp view
> var finalTable = tableEnv.sqlQuery(s"select * from
> A join B on xxxx
> join C on xxxx " )
> tableEnv.registerTable("finalTable", finalTable)
>
> And I want to window this table because I want it to output 1 minute
> per second, however obviously I can not do this now, may I ask how can I
> make a “final table” to output 1 minute per second? And if a table is a
> retract stream, will the item added to the window be retracted either?
>
> Thanks a lot.
>
> Best
> Henry
>
>
>
> 在 2018年8月22日,上午10:30,Hequn Cheng <ch...@gmail.com> 写道:
>
> Hi Hery,
>
> As for choise1:
>
> - The state size of join depends on it's input table size, not the
> result table, so the state size of join of choise1 depends on how many
> article id, praise id and response_id.
> - Also non-window join will merge same rows in it's state, i.e, <Row,
> RowCnt>, so the state size won't grows if you keep pouring same article id.
> I think the problem here is you need a distinct before join, so that a
> praise id won't join multi same article ids, and this will influence the
> correctness of the result.
> - I think you need do aggregate before join to make sure the
> correctness of the result. Because there are duplicated article id after
> article join praise and this will influence the value
> of count(r.response_id).
> - You can't use window or other bounded operators after non-window
> join. The time attribute fields can not be passed through because of
> semantic conflict.
> - Hop window with large fixed duration and small hop interval should
> be avoided. Data will be redundant in various windows. For example, a
> hopping window of 15 minutes size and 5 minute hop interval assigns each
> row to 3 different windows of 15 minute size.
>
> As for choice2:
>
> - I think you need another filed(for example, HOP_START) when join the
> three tables. Only join records in same window.
>
> To solve your problem, I think we can do non-window group by first and
> then join three result tables. Furthermore, state retention time can be set
> to keep state from growing larger.
>
> Best, Hequn
>
> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <ha...@gmail.com> wrote:
>
>> Hi Fabian,
>> So maybe I can not join a table that generate from a window, because the
>> table is getting larger and larger as the time goes, maybe the system will
>> crash one day.
>>
>> I am working on a system that calculate the “score" of article, which is
>> consist of the count of article praise, the count of article response, etc
>> Because I can not use flink to save all the article, I decide to update
>> the score of the article that created in 3 days.
>>
>> I have two choises,
>> 1. join the article table and praise table, response table then window
>> select a.article_id, count(p.praise_id) as pCount, count(r.response_id)
>> as rCount
>> from
>> article a
>> left join
>> praise p on a.article_id = p.article_id
>> left join
>> response r on a.article_id = r.article_id
>> group by hop(updated_time, interval '1' minute,interval '3' day) ,
>> article_id
>> 2. window the article table, window the priase table, window the response
>> table ,then join them together
>> select aAggr.article_id, pAggr.pCount, rAggr.rCount
>> (select article_id from article group by hop(updated_time, interval '1'
>> minute,interval '3' day) , article_id) aAggr
>> left join
>> (select article_id,count(praise_id) as pCount from praise group by hop(
>> updated_time, interval '1' minute,interval '3' day) , article_id) pAggr
>> on aAggr.article_id=pAggr.article_id
>> left join
>> (select article_id,count(response_id) as rCount from response group by
>> hop(updated_time, interval '1' minute,interval '3' day) , article_id)
>> rAggr on aAggr.article_id=rAggr.article_id
>>
>> Maybe I should choose 1, join then window, but not window then join.
>> Please correct me if I am wrong.
>>
>> I have some worries when choose 1,
>> I do not know how Flink works internally, it seems that in the sql ,
>> table article ,table praise, table response is growing as the time goes by,
>> will it introduce performance issue?
>>
>> Best,
>> Henry
>>
>> 在 2018年8月21日,下午9:29,Hequn Cheng <ch...@gmail.com> 写道:
>>
>> Hi Henry,
>>
>> praiseAggr is an append table, so it contains
>> "100,101,102,100,101,103,100".
>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY
>> article_id", the answer is "101,102,103"
>> 2. if you change your sql to s"SELECT last_value(article_id) FROM
>> praise", the answer is "100"
>>
>> Best, Hequn
>>
>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <ha...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>> Thanks for your response. This question puzzles me for quite a long time.
>>> If the praiseAggr has the following value:
>>> window-1 100,101,102
>>> window-2 100,101,103
>>> window-3 100
>>>
>>> the last time the article table joins praiseAggr, which of the following
>>> value does praiseAggr table has?
>>> 1— 100,101,102,100,101,103,100 collect all the element of all
>>> the window
>>> 2— 100 the element of the latest window
>>> 3— 101,102,103 the distinct value of all the window
>>>
>>>
>>> Best,
>>> Henry
>>>
>>>
>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fh...@gmail.com> 写道:
>>>
>>> Hi,
>>>
>>> The semantics of a query do not depend on the way that it is used.
>>> praiseAggr is a table that grows by one row per second and article_id.
>>> If you use that table in a join, the join will fully materialize the table.
>>> This is a special case because the same row is added multiple times, so
>>> the state won't grow that quickly, but the performance will decrease
>>> because for each row from article will join with multiple (a growing
>>> number) of rows from praiseAggr.
>>>
>>> Best, Fabian
>>>
>>> 2018-08-21 12:19 GMT+02:00 徐涛 <ha...@gmail.com>:
>>>
>>>> Hi All,
>>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise
>>>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) ,
>>>> article_id" )
>>>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>>>
>>>> var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" *)
>>>> tableEnv.registerTable("finalTable", finalTable)
>>>>
>>>> I know that praiseAggr, if written to sink, is append mode , so if a
>>>> table joins praiseAggr, what the table “see”, is a table contains the
>>>> latest value, or a table that grows larger and larger? If it is the later,
>>>> will it introduce performance problem?
>>>> Thanks a lot.
>>>>
>>>>
>>>> Best,
>>>> Henry
>>>>
>>>
>>>
>>>
>>
>>
>
Re: Semantic when table joins table from window
Posted by 徐涛 <ha...@gmail.com>.
Hi Hequn,
You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
Why does Flink have this limitation?
I have a temp view
var finalTable = tableEnv.sqlQuery(s"select * from
A join B on xxxx
join C on xxxx " )
tableEnv.registerTable("finalTable", finalTable)
And I want to window this table because I want it to output 1 minute per second, however obviously I can not do this now, may I ask how can I make a “final table” to output 1 minute per second? And if a table is a retract stream, will the item added to the window be retracted either?
Thanks a lot.
Best
Henry
> 在 2018年8月22日,上午10:30,Hequn Cheng <ch...@gmail.com> 写道:
>
> Hi Hery,
>
> As for choise1:
> The state size of join depends on it's input table size, not the result table, so the state size of join of choise1 depends on how many article id, praise id and response_id.
> Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, so the state size won't grows if you keep pouring same article id. I think the problem here is you need a distinct before join, so that a praise id won't join multi same article ids, and this will influence the correctness of the result.
> I think you need do aggregate before join to make sure the correctness of the result. Because there are duplicated article id after article join praise and this will influence the value of count(r.response_id).
> You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
> Hop window with large fixed duration and small hop interval should be avoided. Data will be redundant in various windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size.
> As for choice2:
> I think you need another filed(for example, HOP_START) when join the three tables. Only join records in same window.
> To solve your problem, I think we can do non-window group by first and then join three result tables. Furthermore, state retention time can be set to keep state from growing larger.
>
> Best, Hequn
>
> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
> Hi Fabian,
> So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day.
>
> I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
> Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.
>
> I have two choises,
> 1. join the article table and praise table, response table then window
> select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
> from
> article a
> left join
> praise p on a.article_id = p.article_id
> left join
> response r on a.article_id = r.article_id
> group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
> 2. window the article table, window the priase table, window the response table ,then join them together
> select aAggr.article_id, pAggr.pCount, rAggr.rCount
> (select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
> left join
> (select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
> left join
> (select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id
>
> Maybe I should choose 1, join then window, but not window then join.
> Please correct me if I am wrong.
>
> I have some worries when choose 1,
> I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue?
>
> Best,
> Henry
>
>> 在 2018年8月21日,下午9:29,Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 写道:
>>
>> Hi Henry,
>>
>> praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
>> 2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100"
>>
>> Best, Hequn
>>
>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
>> Hi Fabian,
>> Thanks for your response. This question puzzles me for quite a long time.
>> If the praiseAggr has the following value:
>> window-1 100,101,102
>> window-2 100,101,103
>> window-3 100
>>
>> the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
>> 1— 100,101,102,100,101,103,100 collect all the element of all the window
>> 2— 100 the element of the latest window
>> 3— 101,102,103 the distinct value of all the window
>>
>>
>> Best,
>> Henry
>>
>>
>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> 写道:
>>>
>>> Hi,
>>>
>>> The semantics of a query do not depend on the way that it is used.
>>> praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
>>> This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.
>>>
>>> Best, Fabian
>>>
>>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexutao@gmail.com <ma...@gmail.com>>:
>>> Hi All,
>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
>>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>> var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
>>> tableEnv.registerTable("finalTable", finalTable)
>>> I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
>>> Thanks a lot.
>>>
>>>
>>> Best,
>>> Henry
>>>
>>
>>
>
Re: Semantic when table joins table from window
Posted by Hequn Cheng <ch...@gmail.com>.
Hi Hery,
As for choise1:
- The state size of join depends on it's input table size, not the
result table, so the state size of join of choise1 depends on how many
article id, praise id and response_id.
- Also non-window join will merge same rows in it's state, i.e, <Row,
RowCnt>, so the state size won't grows if you keep pouring same article id.
I think the problem here is you need a distinct before join, so that a
praise id won't join multi same article ids, and this will influence the
correctness of the result.
- I think you need do aggregate before join to make sure the correctness
of the result. Because there are duplicated article id after article join
praise and this will influence the value of count(r.response_id).
- You can't use window or other bounded operators after non-window join.
The time attribute fields can not be passed through because of semantic
conflict.
- Hop window with large fixed duration and small hop interval should be
avoided. Data will be redundant in various windows. For example, a hopping
window of 15 minutes size and 5 minute hop interval assigns each row to 3
different windows of 15 minute size.
As for choice2:
- I think you need another filed(for example, HOP_START) when join the
three tables. Only join records in same window.
To solve your problem, I think we can do non-window group by first and then
join three result tables. Furthermore, state retention time can be set to
keep state from growing larger.
Best, Hequn
On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <ha...@gmail.com> wrote:
> Hi Fabian,
> So maybe I can not join a table that generate from a window, because the
> table is getting larger and larger as the time goes, maybe the system will
> crash one day.
>
> I am working on a system that calculate the “score" of article, which is
> consist of the count of article praise, the count of article response, etc
> Because I can not use flink to save all the article, I decide to update
> the score of the article that created in 3 days.
>
> I have two choises,
> 1. join the article table and praise table, response table then window
> select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as
> rCount
> from
> article a
> left join
> praise p on a.article_id = p.article_id
> left join
> response r on a.article_id = r.article_id
> group by hop(updated_time, interval '1' minute,interval '3' day) ,
> article_id
> 2. window the article table, window the priase table, window the response
> table ,then join them together
> select aAggr.article_id, pAggr.pCount, rAggr.rCount
> (select article_id from article group by hop(updated_time, interval '1'
> minute,interval '3' day) , article_id) aAggr
> left join
> (select article_id,count(praise_id) as pCount from praise group by hop(
> updated_time, interval '1' minute,interval '3' day) , article_id) pAggr
> on aAggr.article_id=pAggr.article_id
> left join
> (select article_id,count(response_id) as rCount from response group by hop
> (updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on
> aAggr.article_id=rAggr.article_id
>
> Maybe I should choose 1, join then window, but not window then join.
> Please correct me if I am wrong.
>
> I have some worries when choose 1,
> I do not know how Flink works internally, it seems that in the sql , table
> article ,table praise, table response is growing as the time goes by, will
> it introduce performance issue?
>
> Best,
> Henry
>
> 在 2018年8月21日,下午9:29,Hequn Cheng <ch...@gmail.com> 写道:
>
> Hi Henry,
>
> praiseAggr is an append table, so it contains
> "100,101,102,100,101,103,100".
> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY
> article_id", the answer is "101,102,103"
> 2. if you change your sql to s"SELECT last_value(article_id) FROM
> praise", the answer is "100"
>
> Best, Hequn
>
> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <ha...@gmail.com> wrote:
>
>> Hi Fabian,
>> Thanks for your response. This question puzzles me for quite a long time.
>> If the praiseAggr has the following value:
>> window-1 100,101,102
>> window-2 100,101,103
>> window-3 100
>>
>> the last time the article table joins praiseAggr, which of the following
>> value does praiseAggr table has?
>> 1— 100,101,102,100,101,103,100 collect all the element of all
>> the window
>> 2— 100 the element of the latest window
>> 3— 101,102,103 the distinct value of all the window
>>
>>
>> Best,
>> Henry
>>
>>
>> 在 2018年8月21日,下午8:02,Fabian Hueske <fh...@gmail.com> 写道:
>>
>> Hi,
>>
>> The semantics of a query do not depend on the way that it is used.
>> praiseAggr is a table that grows by one row per second and article_id. If
>> you use that table in a join, the join will fully materialize the table.
>> This is a special case because the same row is added multiple times, so
>> the state won't grow that quickly, but the performance will decrease
>> because for each row from article will join with multiple (a growing
>> number) of rows from praiseAggr.
>>
>> Best, Fabian
>>
>> 2018-08-21 12:19 GMT+02:00 徐涛 <ha...@gmail.com>:
>>
>>> Hi All,
>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise
>>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) ,
>>> article_id" )
>>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>>
>>> var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" *)
>>> tableEnv.registerTable("finalTable", finalTable)
>>>
>>> I know that praiseAggr, if written to sink, is append mode , so if a
>>> table joins praiseAggr, what the table “see”, is a table contains the
>>> latest value, or a table that grows larger and larger? If it is the later,
>>> will it introduce performance problem?
>>> Thanks a lot.
>>>
>>>
>>> Best,
>>> Henry
>>>
>>
>>
>>
>
>
Re: Semantic when table joins table from window
Posted by 徐涛 <ha...@gmail.com>.
Hi Fabian,
So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day.
I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.
I have two choises,
1. join the article table and praise table, response table then window
select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
from
article a
left join
praise p on a.article_id = p.article_id
left join
response r on a.article_id = r.article_id
group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
2. window the article table, window the priase table, window the response table ,then join them together
select aAggr.article_id, pAggr.pCount, rAggr.rCount
(select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
left join
(select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
left join
(select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id
Maybe I should choose 1, join then window, but not window then join.
Please correct me if I am wrong.
I have some worries when choose 1,
I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue?
Best,
Henry
> 在 2018年8月21日,下午9:29,Hequn Cheng <ch...@gmail.com> 写道:
>
> Hi Henry,
>
> praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
> 2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100"
>
> Best, Hequn
>
> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
> Hi Fabian,
> Thanks for your response. This question puzzles me for quite a long time.
> If the praiseAggr has the following value:
> window-1 100,101,102
> window-2 100,101,103
> window-3 100
>
> the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
> 1— 100,101,102,100,101,103,100 collect all the element of all the window
> 2— 100 the element of the latest window
> 3— 101,102,103 the distinct value of all the window
>
>
> Best,
> Henry
>
>
>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> 写道:
>>
>> Hi,
>>
>> The semantics of a query do not depend on the way that it is used.
>> praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
>> This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.
>>
>> Best, Fabian
>>
>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexutao@gmail.com <ma...@gmail.com>>:
>> Hi All,
>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
>> tableEnv.registerTable("praiseAggr", praiseAggr)
>> var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
>> tableEnv.registerTable("finalTable", finalTable)
>> I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
>> Thanks a lot.
>>
>>
>> Best,
>> Henry
>>
>
>
Re: Semantic when table joins table from window
Posted by Hequn Cheng <ch...@gmail.com>.
Hi Henry,
praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
1. if you change your sql to s"SELECT article_id FROM praise GROUP BY
article_id", the answer is "101,102,103"
2. if you change your sql to s"SELECT last_value(article_id) FROM praise",
the answer is "100"
Best, Hequn
On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <ha...@gmail.com> wrote:
> Hi Fabian,
> Thanks for your response. This question puzzles me for quite a long time.
> If the praiseAggr has the following value:
> window-1 100,101,102
> window-2 100,101,103
> window-3 100
>
> the last time the article table joins praiseAggr, which of the following
> value does praiseAggr table has?
> 1— 100,101,102,100,101,103,100 collect all the element of all
> the window
> 2— 100 the element of the latest window
> 3— 101,102,103 the distinct value of all the window
>
>
> Best,
> Henry
>
>
> 在 2018年8月21日,下午8:02,Fabian Hueske <fh...@gmail.com> 写道:
>
> Hi,
>
> The semantics of a query do not depend on the way that it is used.
> praiseAggr is a table that grows by one row per second and article_id. If
> you use that table in a join, the join will fully materialize the table.
> This is a special case because the same row is added multiple times, so
> the state won't grow that quickly, but the performance will decrease
> because for each row from article will join with multiple (a growing
> number) of rows from praiseAggr.
>
> Best, Fabian
>
> 2018-08-21 12:19 GMT+02:00 徐涛 <ha...@gmail.com>:
>
>> Hi All,
>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP
>> BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id"
>> )
>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>
>> var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" *)
>> tableEnv.registerTable("finalTable", finalTable)
>>
>> I know that praiseAggr, if written to sink, is append mode , so if a
>> table joins praiseAggr, what the table “see”, is a table contains the
>> latest value, or a table that grows larger and larger? If it is the later,
>> will it introduce performance problem?
>> Thanks a lot.
>>
>>
>> Best,
>> Henry
>>
>
>
>
Re: Semantic when table joins table from window
Posted by 徐涛 <ha...@gmail.com>.
Hi Fabian,
Thanks for your response. This question puzzles me for quite a long time.
If the praiseAggr has the following value:
window-1 100,101,102
window-2 100,101,103
window-3 100
the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
1— 100,101,102,100,101,103,100 collect all the element of all the window
2— 100 the element of the latest window
3— 101,102,103 the distinct value of all the window
Best,
Henry
> 在 2018年8月21日,下午8:02,Fabian Hueske <fh...@gmail.com> 写道:
>
> Hi,
>
> The semantics of a query do not depend on the way that it is used.
> praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
> This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.
>
> Best, Fabian
>
> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexutao@gmail.com <ma...@gmail.com>>:
> Hi All,
> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
> tableEnv.registerTable("praiseAggr", praiseAggr)
> var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
> tableEnv.registerTable("finalTable", finalTable)
> I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
> Thanks a lot.
>
>
> Best,
> Henry
>
Re: Semantic when table joins table from window
Posted by Fabian Hueske <fh...@gmail.com>.
Hi,
The semantics of a query do not depend on the way that it is used.
praiseAggr is a table that grows by one row per second and article_id. If
you use that table in a join, the join will fully materialize the table.
This is a special case because the same row is added multiple times, so the
state won't grow that quickly, but the performance will decrease because
for each row from article will join with multiple (a growing number) of
rows from praiseAggr.
Best, Fabian
2018-08-21 12:19 GMT+02:00 徐涛 <ha...@gmail.com>:
> Hi All,
> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP
> BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id"
> )
> tableEnv.registerTable("praiseAggr", praiseAggr)
>
> var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" *)
> tableEnv.registerTable("finalTable", finalTable)
>
> I know that praiseAggr, if written to sink, is append mode , so if a
> table joins praiseAggr, what the table “see”, is a table contains the
> latest value, or a table that grows larger and larger? If it is the later,
> will it introduce performance problem?
> Thanks a lot.
>
>
> Best,
> Henry
>