You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 徐涛 <ha...@gmail.com> on 2018/08/21 10:19:49 UTC

Semantic when table joins table from window

Hi All,
	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
	tableEnv.registerTable("praiseAggr", praiseAggr)
    var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
    tableEnv.registerTable("finalTable", finalTable)
	 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
	 Thanks a lot.


Best, 
Henry

Re: Semantic when table joins table from window

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Henry,

You can increase the retention time to make sure all data you want won't be
expired.
As for incremental, I think we can sink results into a kv storage, say
hbase. The hbase table contains a total and latest data set you want so
that you don't need to flush again. Would it be satisfy your scenario?

Best, Hequn


On Wed, Aug 22, 2018 at 2:51 PM 徐涛 <ha...@gmail.com> wrote:

> Hi Hequn,
> Thanks a lot four your response! This helps me understand the mechanism
> more clearly.
>
> I have another question:
> *How do I use flink to accoplish time attenuation?*
> If a use join plus retention time solution, I can only get the increment
> data. But some other data may need to be recomputed because the time
> attenuation. Then how do I flush them?
>
> Best,
> Henry,
>
> 在 2018年8月22日,上午10:30,Hequn Cheng <ch...@gmail.com> 写道:
>
> Hi Hery,
>
> As for choise1:
>
>    - The state size of join depends on it's input table size, not the
>    result table, so the state size of join of choise1 depends on how many
>    article id, praise id and response_id.
>    - Also non-window join will merge same rows in it's state, i.e, <Row,
>    RowCnt>, so the state size won't grows if you keep pouring same article id.
>    I think the problem here is you need a distinct before join, so that a
>    praise id won't join multi same article ids, and this will influence the
>    correctness of the result.
>    - I think you need do aggregate before join to make sure the
>    correctness of the result. Because there are duplicated article id after
>    article join praise and this will influence the value
>    of count(r.response_id).
>    - You can't use window or other bounded operators after non-window
>    join. The time attribute fields can not be passed through because of
>    semantic conflict.
>    - Hop window with large fixed duration and small hop interval should
>    be avoided. Data will be redundant in various windows. For example, a
>    hopping window of 15 minutes size and 5 minute hop interval assigns each
>    row to 3 different windows of 15 minute size.
>
> As for choice2:
>
>    - I think you need another filed(for example, HOP_START) when join the
>    three tables. Only join records in same window.
>
> To solve your problem, I think we can do non-window group by first and
> then join three result tables. Furthermore, state retention time can be set
> to keep state from growing larger.
>
> Best, Hequn
>
> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <ha...@gmail.com> wrote:
>
>> Hi Fabian,
>> So maybe I can not join a table that generate from a window, because the
>> table is getting larger and larger as the time goes, maybe the system will
>> crash one day.
>>
>> I am working on a system that calculate the “score" of article, which is
>> consist of the count of article praise, the count of article response, etc
>> Because I can not use flink to save all the article, I decide to update
>> the score of the article that created in 3 days.
>>
>> I have two choises,
>> 1. join the article table and praise table, response table then window
>> select a.article_id, count(p.praise_id) as pCount, count(r.response_id)
>> as rCount
>> from
>> article a
>> left join
>> praise p on a.article_id = p.article_id
>> left join
>> response r on a.article_id = r.article_id
>> group by hop(updated_time, interval '1' minute,interval '3' day) ,
>> article_id
>> 2. window the article table, window the priase table, window the response
>> table ,then join them together
>> select aAggr.article_id, pAggr.pCount, rAggr.rCount
>> (select article_id from article group by hop(updated_time, interval '1'
>> minute,interval '3' day) , article_id) aAggr
>> left join
>> (select article_id,count(praise_id) as pCount from praise group by hop(
>> updated_time, interval '1' minute,interval '3' day) , article_id) pAggr
>> on aAggr.article_id=pAggr.article_id
>> left join
>> (select article_id,count(response_id) as rCount from response group by
>> hop(updated_time, interval '1' minute,interval '3' day) , article_id)
>> rAggr on aAggr.article_id=rAggr.article_id
>>
>> Maybe I should choose 1,   join then window, but not window then join.
>> Please correct me if I am wrong.
>>
>> I have some worries when choose 1,
>> I do not know how Flink works internally, it seems that in the sql ,
>> table article ,table praise, table response is growing as the time goes by,
>> will it introduce performance issue?
>>
>> Best,
>> Henry
>>
>> 在 2018年8月21日,下午9:29,Hequn Cheng <ch...@gmail.com> 写道:
>>
>> Hi Henry,
>>
>> praiseAggr is an append table, so it contains
>> "100,101,102,100,101,103,100".
>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY
>> article_id", the answer is "101,102,103"
>> 2. if you change your sql to s"SELECT last_value(article_id) FROM
>> praise", the answer is "100"
>>
>> Best, Hequn
>>
>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <ha...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>> Thanks for your response. This question puzzles me for quite a long time.
>>> If the praiseAggr has the following value:
>>> window-1     100,101,102
>>> window-2     100,101,103
>>> window-3     100
>>>
>>> the last time the article table joins praiseAggr, which of the following
>>> value does praiseAggr table has?
>>> 1— 100,101,102,100,101,103,100           collect all the element of all
>>> the window
>>> 2—  100    the element of the latest window
>>> 3—  101,102,103    the distinct value of all the window
>>>
>>>
>>> Best,
>>> Henry
>>>
>>>
>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fh...@gmail.com> 写道:
>>>
>>> Hi,
>>>
>>> The semantics of a query do not depend on the way that it is used.
>>> praiseAggr is a table that grows by one row per second and article_id.
>>> If you use that table in a join, the join will fully materialize the table.
>>> This is a special case because the same row is added multiple times, so
>>> the state won't grow that quickly, but the performance will decrease
>>> because for each row from article will join with multiple (a growing
>>> number) of rows from praiseAggr.
>>>
>>> Best, Fabian
>>>
>>> 2018-08-21 12:19 GMT+02:00 徐涛 <ha...@gmail.com>:
>>>
>>>> Hi All,
>>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise
>>>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) ,
>>>> article_id" )
>>>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>>>
>>>>     var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" *)
>>>>     tableEnv.registerTable("finalTable", finalTable)
>>>>
>>>>  I know that praiseAggr, if written to sink, is append mode , so if a
>>>> table joins praiseAggr, what the table “see”, is a table contains the
>>>> latest value, or a table that grows larger and larger? If it is the later,
>>>> will it introduce performance problem?
>>>>  Thanks a lot.
>>>>
>>>>
>>>> Best,
>>>> Henry
>>>>
>>>
>>>
>>>
>>
>>
>

Re: Semantic when table joins table from window

Posted by 徐涛 <ha...@gmail.com>.
Hi Hequn,
	Thanks a lot four your response! This helps me understand the mechanism more clearly.

	I have another question:
		How do I use flink to accoplish time attenuation?
		If a use join plus retention time solution, I can only get the increment data. But some other data may need to be recomputed because the time attenuation. Then how do I flush them?

Best,
Henry, 

> 在 2018年8月22日,上午10:30,Hequn Cheng <ch...@gmail.com> 写道:
> 
> Hi Hery,
> 
> As for choise1:
> The state size of join depends on it's input table size, not the result table, so the state size of join of choise1 depends on how many article id, praise id and response_id. 
> Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, so the state size won't grows if you keep pouring same article id. I think the problem here is you need a distinct before join, so that a praise id won't join multi same article ids, and this will influence the correctness of the result.
> I think you need do aggregate before join to make sure the correctness of the result. Because there are duplicated article id after article join praise and this will influence the value of count(r.response_id).
> You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
> Hop window with large fixed duration and small hop interval should be avoided. Data will be redundant in various windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size.
> As for choice2:
> I think you need another filed(for example, HOP_START) when join the three tables. Only join records in same window.
> To solve your problem, I think we can do non-window group by first and then join three result tables. Furthermore, state retention time can be set to keep state from growing larger.
> 
> Best, Hequn
> 
> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
> Hi Fabian,
> 	So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day. 
> 
> 	I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
> 	Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.
> 
> 	I have two choises,
> 	1. join the article table and praise table, response table then window
> 		select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
> 		from
> 			article a
> 		left join
> 			praise p on a.article_id = p.article_id
> 		left join
> 			response r on a.article_id = r.article_id
> 		group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
> 	2. window the article table, window the priase table, window the response table ,then join them together
> 		select aAggr.article_id, pAggr.pCount, rAggr.rCount
> 		(select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
> 		left join
> 		(select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
> 		left join
> 		(select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id
> 
> 	Maybe I should choose 1,   join then window, but not window then join.   
> 	Please correct me if I am wrong.
> 
> 	I have some worries when choose 1,
> 	I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue? 
> 
> Best,
> Henry
> 
>> 在 2018年8月21日,下午9:29,Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi Henry,
>> 
>> praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
>> 2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100" 
>> 
>> Best, Hequn
>> 
>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
>> Hi Fabian,
>> 	Thanks for your response. This question puzzles me for quite a long time.
>> 	If the praiseAggr has the following value:
>> 	window-1     100,101,102
>> 	window-2	    100,101,103
>> 	window-3	    100
>> 
>> 	the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
>> 	1—	100,101,102,100,101,103,100           collect all the element of all the window
>> 	2—  100							   the element of the latest window
>> 	3—  101,102,103					   the distinct value of all the window
>> 
>> 
>> Best,
>> Henry
>> 
>> 
>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> 写道:
>>> 
>>> Hi,
>>> 
>>> The semantics of a query do not depend on the way that it is used.
>>> praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
>>> This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.
>>> 
>>> Best, Fabian
>>> 
>>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexutao@gmail.com <ma...@gmail.com>>:
>>> Hi All,
>>> 	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
>>> 	tableEnv.registerTable("praiseAggr", praiseAggr)
>>>     var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
>>>     tableEnv.registerTable("finalTable", finalTable)
>>> 	 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
>>> 	 Thanks a lot.
>>> 
>>> 
>>> Best, 
>>> Henry
>>> 
>> 
>> 
> 


Re: Semantic when table joins table from window

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Henry,

Fabian is right. You can try to use window join if your want a bounded join.

According to your descriptions. I think what you want is(correct me if I'm
wrong) :
- Only join data within 3 days
- Score should be calculated in bounded way
- Retract previous score which exceed 3 days

So, I think window join + bounded over may solve your problem. Do window
join by `article.time between praise.time - 3days and praise.time + 3days`.
You don't have to add sliding window before window join. After window join,
you can perform a bounded over with 3 days interval to get the scores.
There are documents about window join[1] and over[2].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#aggregations

On Tue, Aug 28, 2018 at 9:59 PM 徐涛 <ha...@gmail.com> wrote:

> Hi Fabian,
> I am working on a application that compute the “score" of an article by
> the number of praises, and reduce the score by the time, I am balancing on
> two choices:
> 1. Use global window join the article and article praise, with 3 days
> state retention, but I can not get the current time ,time is fixed when the
> program is started, so I can not compute the reduced score. I have to sink
> the data, then write some crontab jobs to update the score.
> 2. Use sliding window join, window length is 3 days , and sliding by one
> minute, this time I can get the window end time, but there so much data
> duplicated in windows, there are performance issues.
> Each choices is not good enough, I am wondering if there are some other
> solves. Thanks a lot.
>
> Best
> Henry
>
> 在 2018年8月28日,下午8:05,Fabian Hueske <fh...@gmail.com> 写道:
>
> Hi,
>
> Currently, Flink's window operators require increasing timestamp
> attributes. This limitation exists to be able to clean up the state of a
> window operator. A join operator does not preserve the order of timestamps.
> Hence, timestamp attributes lose their monotonictity property and a window
> operator cannot be applied.
>
> Have you tried to use a window join? These preserve the timestamp order.
>
> Fabian
>
> 徐涛 <ha...@gmail.com> schrieb am Di., 28. Aug. 2018, 11:42:
>
>> Hi Hequn,
>> You can't use window or other bounded operators after non-window join.
>> The time attribute fields can not be passed through because of semantic
>> conflict.
>>        Why does Flink have this limitation?
>>        I have a temp view
>>         var finalTable = tableEnv.sqlQuery(s"select * from
>> A join B on xxxx
>> join C on xxxx " )
>> tableEnv.registerTable("finalTable", finalTable)
>>
>>       And I want to window this table because I want it to output 1
>> minute per second, however obviously I can not do this now, may I ask how
>> can I make a “final table” to output 1 minute per second? And if a table is
>> a retract stream, will the item added to the window be retracted either?
>>
>>       Thanks a lot.
>>
>> Best
>> Henry
>>
>>
>>
>> 在 2018年8月22日,上午10:30,Hequn Cheng <ch...@gmail.com> 写道:
>>
>> Hi Hery,
>>
>> As for choise1:
>>
>>    - The state size of join depends on it's input table size, not the
>>    result table, so the state size of join of choise1 depends on how many
>>    article id, praise id and response_id.
>>    - Also non-window join will merge same rows in it's state, i.e, <Row,
>>    RowCnt>, so the state size won't grows if you keep pouring same article id.
>>    I think the problem here is you need a distinct before join, so that a
>>    praise id won't join multi same article ids, and this will influence the
>>    correctness of the result.
>>    - I think you need do aggregate before join to make sure the
>>    correctness of the result. Because there are duplicated article id after
>>    article join praise and this will influence the value
>>    of count(r.response_id).
>>    - You can't use window or other bounded operators after non-window
>>    join. The time attribute fields can not be passed through because of
>>    semantic conflict.
>>    - Hop window with large fixed duration and small hop interval should
>>    be avoided. Data will be redundant in various windows. For example, a
>>    hopping window of 15 minutes size and 5 minute hop interval assigns each
>>    row to 3 different windows of 15 minute size.
>>
>> As for choice2:
>>
>>    - I think you need another filed(for example, HOP_START) when join
>>    the three tables. Only join records in same window.
>>
>> To solve your problem, I think we can do non-window group by first and
>> then join three result tables. Furthermore, state retention time can be set
>> to keep state from growing larger.
>>
>> Best, Hequn
>>
>> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <ha...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>> So maybe I can not join a table that generate from a window, because the
>>> table is getting larger and larger as the time goes, maybe the system will
>>> crash one day.
>>>
>>> I am working on a system that calculate the “score" of article, which is
>>> consist of the count of article praise, the count of article response, etc
>>> Because I can not use flink to save all the article, I decide to update
>>> the score of the article that created in 3 days.
>>>
>>> I have two choises,
>>> 1. join the article table and praise table, response table then window
>>> select a.article_id, count(p.praise_id) as pCount, count(r.response_id)
>>> as rCount
>>> from
>>> article a
>>> left join
>>> praise p on a.article_id = p.article_id
>>> left join
>>> response r on a.article_id = r.article_id
>>> group by hop(updated_time, interval '1' minute,interval '3' day) ,
>>> article_id
>>> 2. window the article table, window the priase table, window the
>>> response table ,then join them together
>>> select aAggr.article_id, pAggr.pCount, rAggr.rCount
>>> (select article_id from article group by hop(updated_time, interval '1'
>>> minute,interval '3' day) , article_id) aAggr
>>> left join
>>> (select article_id,count(praise_id) as pCount from praise group by hop(
>>> updated_time, interval '1' minute,interval '3' day) , article_id) pAggr
>>> on aAggr.article_id=pAggr.article_id
>>> left join
>>> (select article_id,count(response_id) as rCount from response group by
>>> hop(updated_time, interval '1' minute,interval '3' day) , article_id)
>>> rAggr on aAggr.article_id=rAggr.article_id
>>>
>>> Maybe I should choose 1,   join then window, but not window then join.
>>> Please correct me if I am wrong.
>>>
>>> I have some worries when choose 1,
>>> I do not know how Flink works internally, it seems that in the sql ,
>>> table article ,table praise, table response is growing as the time goes by,
>>> will it introduce performance issue?
>>>
>>> Best,
>>> Henry
>>>
>>> 在 2018年8月21日,下午9:29,Hequn Cheng <ch...@gmail.com> 写道:
>>>
>>> Hi Henry,
>>>
>>> praiseAggr is an append table, so it contains
>>> "100,101,102,100,101,103,100".
>>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY
>>> article_id", the answer is "101,102,103"
>>> 2. if you change your sql to s"SELECT last_value(article_id) FROM
>>> praise", the answer is "100"
>>>
>>> Best, Hequn
>>>
>>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <ha...@gmail.com> wrote:
>>>
>>>> Hi Fabian,
>>>> Thanks for your response. This question puzzles me for quite a long
>>>> time.
>>>> If the praiseAggr has the following value:
>>>> window-1     100,101,102
>>>> window-2     100,101,103
>>>> window-3     100
>>>>
>>>> the last time the article table joins praiseAggr, which of the
>>>> following value does praiseAggr table has?
>>>> 1— 100,101,102,100,101,103,100           collect all the element of
>>>> all the window
>>>> 2—  100    the element of the latest window
>>>> 3—  101,102,103    the distinct value of all the window
>>>>
>>>>
>>>> Best,
>>>> Henry
>>>>
>>>>
>>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fh...@gmail.com> 写道:
>>>>
>>>> Hi,
>>>>
>>>> The semantics of a query do not depend on the way that it is used.
>>>> praiseAggr is a table that grows by one row per second and article_id.
>>>> If you use that table in a join, the join will fully materialize the table.
>>>> This is a special case because the same row is added multiple times, so
>>>> the state won't grow that quickly, but the performance will decrease
>>>> because for each row from article will join with multiple (a growing
>>>> number) of rows from praiseAggr.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-08-21 12:19 GMT+02:00 徐涛 <ha...@gmail.com>:
>>>>
>>>>> Hi All,
>>>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise
>>>>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) ,
>>>>> article_id" )
>>>>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>>>>
>>>>>     var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" *)
>>>>>     tableEnv.registerTable("finalTable", finalTable)
>>>>>
>>>>>  I know that praiseAggr, if written to sink, is append mode , so if a
>>>>> table joins praiseAggr, what the table “see”, is a table contains the
>>>>> latest value, or a table that grows larger and larger? If it is the later,
>>>>> will it introduce performance problem?
>>>>>  Thanks a lot.
>>>>>
>>>>>
>>>>> Best,
>>>>> Henry
>>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>

Re: Semantic when table joins table from window

Posted by 徐涛 <ha...@gmail.com>.
Hi Fabian,
	I am working on a application that compute the “score" of an article by the number of praises, and reduce the score by the time, I am balancing on two choices:
	1. Use global window join the article and article praise, with 3 days state retention, but I can not get the current time ,time is fixed when the program is started, so I can not compute the reduced score. I have to sink the data, then write some crontab jobs to update the score.
	2. Use sliding window join, window length is 3 days , and sliding by one minute, this time I can get the window end time, but there so much data duplicated in windows, there are performance issues.
	Each choices is not good enough, I am wondering if there are some other solves. Thanks a lot.

Best
Henry

> 在 2018年8月28日,下午8:05,Fabian Hueske <fh...@gmail.com> 写道:
> 
> Hi,
> 
> Currently, Flink's window operators require increasing timestamp attributes. This limitation exists to be able to clean up the state of a window operator. A join operator does not preserve the order of timestamps. Hence, timestamp attributes lose their monotonictity property and a window operator cannot be applied.
> 
> Have you tried to use a window join? These preserve the timestamp order. 
> 
> Fabian
> 
> 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> schrieb am Di., 28. Aug. 2018, 11:42:
> Hi Hequn,
> 	You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
>        Why does Flink have this limitation?
>        I have a temp view
> 	
>         var finalTable = tableEnv.sqlQuery(s"select * from
> 		A join B on xxxx
> 		join C on xxxx	" )
> 	tableEnv.registerTable("finalTable", finalTable)
>      
>       And I want to window this table because I want it to output 1 minute per second, however obviously I can not do this now, may I ask how can I make a “final table” to output 1 minute per second? And if a table is a retract stream, will the item added to the window be retracted either?
> 
>       Thanks a lot.	
> 	
> 
> Best
> Henry	
> 
> 
> 
>> 在 2018年8月22日,上午10:30,Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi Hery,
>> 
>> As for choise1:
>> The state size of join depends on it's input table size, not the result table, so the state size of join of choise1 depends on how many article id, praise id and response_id. 
>> Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, so the state size won't grows if you keep pouring same article id. I think the problem here is you need a distinct before join, so that a praise id won't join multi same article ids, and this will influence the correctness of the result.
>> I think you need do aggregate before join to make sure the correctness of the result. Because there are duplicated article id after article join praise and this will influence the value of count(r.response_id).
>> You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
>> Hop window with large fixed duration and small hop interval should be avoided. Data will be redundant in various windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size.
>> As for choice2:
>> I think you need another filed(for example, HOP_START) when join the three tables. Only join records in same window.
>> To solve your problem, I think we can do non-window group by first and then join three result tables. Furthermore, state retention time can be set to keep state from growing larger.
>> 
>> Best, Hequn
>> 
>> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
>> Hi Fabian,
>> 	So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day. 
>> 
>> 	I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
>> 	Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.
>> 
>> 	I have two choises,
>> 	1. join the article table and praise table, response table then window
>> 		select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
>> 		from
>> 			article a
>> 		left join
>> 			praise p on a.article_id = p.article_id
>> 		left join
>> 			response r on a.article_id = r.article_id
>> 		group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
>> 	2. window the article table, window the priase table, window the response table ,then join them together
>> 		select aAggr.article_id, pAggr.pCount, rAggr.rCount
>> 		(select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
>> 		left join
>> 		(select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
>> 		left join
>> 		(select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id
>> 
>> 	Maybe I should choose 1,   join then window, but not window then join.   
>> 	Please correct me if I am wrong.
>> 
>> 	I have some worries when choose 1,
>> 	I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue? 
>> 
>> Best,
>> Henry
>> 
>>> 在 2018年8月21日,下午9:29,Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 写道:
>>> 
>>> Hi Henry,
>>> 
>>> praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
>>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
>>> 2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100" 
>>> 
>>> Best, Hequn
>>> 
>>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
>>> Hi Fabian,
>>> 	Thanks for your response. This question puzzles me for quite a long time.
>>> 	If the praiseAggr has the following value:
>>> 	window-1     100,101,102
>>> 	window-2	    100,101,103
>>> 	window-3	    100
>>> 
>>> 	the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
>>> 	1—	100,101,102,100,101,103,100           collect all the element of all the window
>>> 	2—  100							   the element of the latest window
>>> 	3—  101,102,103					   the distinct value of all the window
>>> 
>>> 
>>> Best,
>>> Henry
>>> 
>>> 
>>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> 写道:
>>>> 
>>>> Hi,
>>>> 
>>>> The semantics of a query do not depend on the way that it is used.
>>>> praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
>>>> This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.
>>>> 
>>>> Best, Fabian
>>>> 
>>>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexutao@gmail.com <ma...@gmail.com>>:
>>>> Hi All,
>>>> 	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
>>>> 	tableEnv.registerTable("praiseAggr", praiseAggr)
>>>>     var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
>>>>     tableEnv.registerTable("finalTable", finalTable)
>>>> 	 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
>>>> 	 Thanks a lot.
>>>> 
>>>> 
>>>> Best, 
>>>> Henry
>>>> 
>>> 
>>> 
>> 
> 


Re: Semantic when table joins table from window

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Currently, Flink's window operators require increasing timestamp
attributes. This limitation exists to be able to clean up the state of a
window operator. A join operator does not preserve the order of timestamps.
Hence, timestamp attributes lose their monotonictity property and a window
operator cannot be applied.

Have you tried to use a window join? These preserve the timestamp order.

Fabian

徐涛 <ha...@gmail.com> schrieb am Di., 28. Aug. 2018, 11:42:

> Hi Hequn,
> You can't use window or other bounded operators after non-window join. The
> time attribute fields can not be passed through because of semantic
> conflict.
>        Why does Flink have this limitation?
>        I have a temp view
>         var finalTable = tableEnv.sqlQuery(s"select * from
> A join B on xxxx
> join C on xxxx " )
> tableEnv.registerTable("finalTable", finalTable)
>
>       And I want to window this table because I want it to output 1 minute
> per second, however obviously I can not do this now, may I ask how can I
> make a “final table” to output 1 minute per second? And if a table is a
> retract stream, will the item added to the window be retracted either?
>
>       Thanks a lot.
>
> Best
> Henry
>
>
>
> 在 2018年8月22日,上午10:30,Hequn Cheng <ch...@gmail.com> 写道:
>
> Hi Hery,
>
> As for choise1:
>
>    - The state size of join depends on it's input table size, not the
>    result table, so the state size of join of choise1 depends on how many
>    article id, praise id and response_id.
>    - Also non-window join will merge same rows in it's state, i.e, <Row,
>    RowCnt>, so the state size won't grows if you keep pouring same article id.
>    I think the problem here is you need a distinct before join, so that a
>    praise id won't join multi same article ids, and this will influence the
>    correctness of the result.
>    - I think you need do aggregate before join to make sure the
>    correctness of the result. Because there are duplicated article id after
>    article join praise and this will influence the value
>    of count(r.response_id).
>    - You can't use window or other bounded operators after non-window
>    join. The time attribute fields can not be passed through because of
>    semantic conflict.
>    - Hop window with large fixed duration and small hop interval should
>    be avoided. Data will be redundant in various windows. For example, a
>    hopping window of 15 minutes size and 5 minute hop interval assigns each
>    row to 3 different windows of 15 minute size.
>
> As for choice2:
>
>    - I think you need another filed(for example, HOP_START) when join the
>    three tables. Only join records in same window.
>
> To solve your problem, I think we can do non-window group by first and
> then join three result tables. Furthermore, state retention time can be set
> to keep state from growing larger.
>
> Best, Hequn
>
> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <ha...@gmail.com> wrote:
>
>> Hi Fabian,
>> So maybe I can not join a table that generate from a window, because the
>> table is getting larger and larger as the time goes, maybe the system will
>> crash one day.
>>
>> I am working on a system that calculate the “score" of article, which is
>> consist of the count of article praise, the count of article response, etc
>> Because I can not use flink to save all the article, I decide to update
>> the score of the article that created in 3 days.
>>
>> I have two choises,
>> 1. join the article table and praise table, response table then window
>> select a.article_id, count(p.praise_id) as pCount, count(r.response_id)
>> as rCount
>> from
>> article a
>> left join
>> praise p on a.article_id = p.article_id
>> left join
>> response r on a.article_id = r.article_id
>> group by hop(updated_time, interval '1' minute,interval '3' day) ,
>> article_id
>> 2. window the article table, window the priase table, window the response
>> table ,then join them together
>> select aAggr.article_id, pAggr.pCount, rAggr.rCount
>> (select article_id from article group by hop(updated_time, interval '1'
>> minute,interval '3' day) , article_id) aAggr
>> left join
>> (select article_id,count(praise_id) as pCount from praise group by hop(
>> updated_time, interval '1' minute,interval '3' day) , article_id) pAggr
>> on aAggr.article_id=pAggr.article_id
>> left join
>> (select article_id,count(response_id) as rCount from response group by
>> hop(updated_time, interval '1' minute,interval '3' day) , article_id)
>> rAggr on aAggr.article_id=rAggr.article_id
>>
>> Maybe I should choose 1,   join then window, but not window then join.
>> Please correct me if I am wrong.
>>
>> I have some worries when choose 1,
>> I do not know how Flink works internally, it seems that in the sql ,
>> table article ,table praise, table response is growing as the time goes by,
>> will it introduce performance issue?
>>
>> Best,
>> Henry
>>
>> 在 2018年8月21日,下午9:29,Hequn Cheng <ch...@gmail.com> 写道:
>>
>> Hi Henry,
>>
>> praiseAggr is an append table, so it contains
>> "100,101,102,100,101,103,100".
>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY
>> article_id", the answer is "101,102,103"
>> 2. if you change your sql to s"SELECT last_value(article_id) FROM
>> praise", the answer is "100"
>>
>> Best, Hequn
>>
>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <ha...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>> Thanks for your response. This question puzzles me for quite a long time.
>>> If the praiseAggr has the following value:
>>> window-1     100,101,102
>>> window-2     100,101,103
>>> window-3     100
>>>
>>> the last time the article table joins praiseAggr, which of the following
>>> value does praiseAggr table has?
>>> 1— 100,101,102,100,101,103,100           collect all the element of all
>>> the window
>>> 2—  100    the element of the latest window
>>> 3—  101,102,103    the distinct value of all the window
>>>
>>>
>>> Best,
>>> Henry
>>>
>>>
>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fh...@gmail.com> 写道:
>>>
>>> Hi,
>>>
>>> The semantics of a query do not depend on the way that it is used.
>>> praiseAggr is a table that grows by one row per second and article_id.
>>> If you use that table in a join, the join will fully materialize the table.
>>> This is a special case because the same row is added multiple times, so
>>> the state won't grow that quickly, but the performance will decrease
>>> because for each row from article will join with multiple (a growing
>>> number) of rows from praiseAggr.
>>>
>>> Best, Fabian
>>>
>>> 2018-08-21 12:19 GMT+02:00 徐涛 <ha...@gmail.com>:
>>>
>>>> Hi All,
>>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise
>>>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) ,
>>>> article_id" )
>>>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>>>
>>>>     var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" *)
>>>>     tableEnv.registerTable("finalTable", finalTable)
>>>>
>>>>  I know that praiseAggr, if written to sink, is append mode , so if a
>>>> table joins praiseAggr, what the table “see”, is a table contains the
>>>> latest value, or a table that grows larger and larger? If it is the later,
>>>> will it introduce performance problem?
>>>>  Thanks a lot.
>>>>
>>>>
>>>> Best,
>>>> Henry
>>>>
>>>
>>>
>>>
>>
>>
>

Re: Semantic when table joins table from window

Posted by 徐涛 <ha...@gmail.com>.
Hi Hequn,
	You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
       Why does Flink have this limitation?
       I have a temp view
	
        var finalTable = tableEnv.sqlQuery(s"select * from
		A join B on xxxx
		join C on xxxx	" )
	tableEnv.registerTable("finalTable", finalTable)
     
      And I want to window this table because I want it to output 1 minute per second, however obviously I can not do this now, may I ask how can I make a “final table” to output 1 minute per second? And if a table is a retract stream, will the item added to the window be retracted either?

      Thanks a lot.	
	

Best
Henry	



> 在 2018年8月22日,上午10:30,Hequn Cheng <ch...@gmail.com> 写道:
> 
> Hi Hery,
> 
> As for choise1:
> The state size of join depends on it's input table size, not the result table, so the state size of join of choise1 depends on how many article id, praise id and response_id. 
> Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, so the state size won't grows if you keep pouring same article id. I think the problem here is you need a distinct before join, so that a praise id won't join multi same article ids, and this will influence the correctness of the result.
> I think you need do aggregate before join to make sure the correctness of the result. Because there are duplicated article id after article join praise and this will influence the value of count(r.response_id).
> You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
> Hop window with large fixed duration and small hop interval should be avoided. Data will be redundant in various windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size.
> As for choice2:
> I think you need another filed(for example, HOP_START) when join the three tables. Only join records in same window.
> To solve your problem, I think we can do non-window group by first and then join three result tables. Furthermore, state retention time can be set to keep state from growing larger.
> 
> Best, Hequn
> 
> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
> Hi Fabian,
> 	So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day. 
> 
> 	I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
> 	Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.
> 
> 	I have two choises,
> 	1. join the article table and praise table, response table then window
> 		select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
> 		from
> 			article a
> 		left join
> 			praise p on a.article_id = p.article_id
> 		left join
> 			response r on a.article_id = r.article_id
> 		group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
> 	2. window the article table, window the priase table, window the response table ,then join them together
> 		select aAggr.article_id, pAggr.pCount, rAggr.rCount
> 		(select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
> 		left join
> 		(select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
> 		left join
> 		(select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id
> 
> 	Maybe I should choose 1,   join then window, but not window then join.   
> 	Please correct me if I am wrong.
> 
> 	I have some worries when choose 1,
> 	I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue? 
> 
> Best,
> Henry
> 
>> 在 2018年8月21日,下午9:29,Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi Henry,
>> 
>> praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
>> 2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100" 
>> 
>> Best, Hequn
>> 
>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
>> Hi Fabian,
>> 	Thanks for your response. This question puzzles me for quite a long time.
>> 	If the praiseAggr has the following value:
>> 	window-1     100,101,102
>> 	window-2	    100,101,103
>> 	window-3	    100
>> 
>> 	the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
>> 	1—	100,101,102,100,101,103,100           collect all the element of all the window
>> 	2—  100							   the element of the latest window
>> 	3—  101,102,103					   the distinct value of all the window
>> 
>> 
>> Best,
>> Henry
>> 
>> 
>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> 写道:
>>> 
>>> Hi,
>>> 
>>> The semantics of a query do not depend on the way that it is used.
>>> praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
>>> This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.
>>> 
>>> Best, Fabian
>>> 
>>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexutao@gmail.com <ma...@gmail.com>>:
>>> Hi All,
>>> 	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
>>> 	tableEnv.registerTable("praiseAggr", praiseAggr)
>>>     var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
>>>     tableEnv.registerTable("finalTable", finalTable)
>>> 	 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
>>> 	 Thanks a lot.
>>> 
>>> 
>>> Best, 
>>> Henry
>>> 
>> 
>> 
> 


Re: Semantic when table joins table from window

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Hery,

As for choise1:

   - The state size of join depends on it's input table size, not the
   result table, so the state size of join of choise1 depends on how many
   article id, praise id and response_id.
   - Also non-window join will merge same rows in it's state, i.e, <Row,
   RowCnt>, so the state size won't grows if you keep pouring same article id.
   I think the problem here is you need a distinct before join, so that a
   praise id won't join multi same article ids, and this will influence the
   correctness of the result.
   - I think you need do aggregate before join to make sure the correctness
   of the result. Because there are duplicated article id after article join
   praise and this will influence the value of count(r.response_id).
   - You can't use window or other bounded operators after non-window join.
   The time attribute fields can not be passed through because of semantic
   conflict.
   - Hop window with large fixed duration and small hop interval should be
   avoided. Data will be redundant in various windows. For example, a hopping
   window of 15 minutes size and 5 minute hop interval assigns each row to 3
   different windows of 15 minute size.

As for choice2:

   - I think you need another filed(for example, HOP_START) when join the
   three tables. Only join records in same window.

To solve your problem, I think we can do non-window group by first and then
join three result tables. Furthermore, state retention time can be set to
keep state from growing larger.

Best, Hequn

On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <ha...@gmail.com> wrote:

> Hi Fabian,
> So maybe I can not join a table that generate from a window, because the
> table is getting larger and larger as the time goes, maybe the system will
> crash one day.
>
> I am working on a system that calculate the “score" of article, which is
> consist of the count of article praise, the count of article response, etc
> Because I can not use flink to save all the article, I decide to update
> the score of the article that created in 3 days.
>
> I have two choises,
> 1. join the article table and praise table, response table then window
> select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as
> rCount
> from
> article a
> left join
> praise p on a.article_id = p.article_id
> left join
> response r on a.article_id = r.article_id
> group by hop(updated_time, interval '1' minute,interval '3' day) ,
> article_id
> 2. window the article table, window the priase table, window the response
> table ,then join them together
> select aAggr.article_id, pAggr.pCount, rAggr.rCount
> (select article_id from article group by hop(updated_time, interval '1'
> minute,interval '3' day) , article_id) aAggr
> left join
> (select article_id,count(praise_id) as pCount from praise group by hop(
> updated_time, interval '1' minute,interval '3' day) , article_id) pAggr
> on aAggr.article_id=pAggr.article_id
> left join
> (select article_id,count(response_id) as rCount from response group by hop
> (updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on
> aAggr.article_id=rAggr.article_id
>
> Maybe I should choose 1,   join then window, but not window then join.
> Please correct me if I am wrong.
>
> I have some worries when choose 1,
> I do not know how Flink works internally, it seems that in the sql , table
> article ,table praise, table response is growing as the time goes by, will
> it introduce performance issue?
>
> Best,
> Henry
>
> 在 2018年8月21日,下午9:29,Hequn Cheng <ch...@gmail.com> 写道:
>
> Hi Henry,
>
> praiseAggr is an append table, so it contains
> "100,101,102,100,101,103,100".
> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY
> article_id", the answer is "101,102,103"
> 2. if you change your sql to s"SELECT last_value(article_id) FROM
> praise", the answer is "100"
>
> Best, Hequn
>
> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <ha...@gmail.com> wrote:
>
>> Hi Fabian,
>> Thanks for your response. This question puzzles me for quite a long time.
>> If the praiseAggr has the following value:
>> window-1     100,101,102
>> window-2     100,101,103
>> window-3     100
>>
>> the last time the article table joins praiseAggr, which of the following
>> value does praiseAggr table has?
>> 1— 100,101,102,100,101,103,100           collect all the element of all
>> the window
>> 2—  100    the element of the latest window
>> 3—  101,102,103    the distinct value of all the window
>>
>>
>> Best,
>> Henry
>>
>>
>> 在 2018年8月21日,下午8:02,Fabian Hueske <fh...@gmail.com> 写道:
>>
>> Hi,
>>
>> The semantics of a query do not depend on the way that it is used.
>> praiseAggr is a table that grows by one row per second and article_id. If
>> you use that table in a join, the join will fully materialize the table.
>> This is a special case because the same row is added multiple times, so
>> the state won't grow that quickly, but the performance will decrease
>> because for each row from article will join with multiple (a growing
>> number) of rows from praiseAggr.
>>
>> Best, Fabian
>>
>> 2018-08-21 12:19 GMT+02:00 徐涛 <ha...@gmail.com>:
>>
>>> Hi All,
>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise
>>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) ,
>>> article_id" )
>>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>>
>>>     var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" *)
>>>     tableEnv.registerTable("finalTable", finalTable)
>>>
>>>  I know that praiseAggr, if written to sink, is append mode , so if a
>>> table joins praiseAggr, what the table “see”, is a table contains the
>>> latest value, or a table that grows larger and larger? If it is the later,
>>> will it introduce performance problem?
>>>  Thanks a lot.
>>>
>>>
>>> Best,
>>> Henry
>>>
>>
>>
>>
>
>

Re: Semantic when table joins table from window

Posted by 徐涛 <ha...@gmail.com>.
Hi Fabian,
	So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day. 

	I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
	Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.

	I have two choises,
	1. join the article table and praise table, response table then window
		select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
		from
			article a
		left join
			praise p on a.article_id = p.article_id
		left join
			response r on a.article_id = r.article_id
		group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
	2. window the article table, window the priase table, window the response table ,then join them together
		select aAggr.article_id, pAggr.pCount, rAggr.rCount
		(select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
		left join
		(select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
		left join
		(select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id

	Maybe I should choose 1,   join then window, but not window then join.   
	Please correct me if I am wrong.

	I have some worries when choose 1,
	I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue? 

Best,
Henry

> 在 2018年8月21日,下午9:29,Hequn Cheng <ch...@gmail.com> 写道:
> 
> Hi Henry,
> 
> praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
> 2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100" 
> 
> Best, Hequn
> 
> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> wrote:
> Hi Fabian,
> 	Thanks for your response. This question puzzles me for quite a long time.
> 	If the praiseAggr has the following value:
> 	window-1     100,101,102
> 	window-2	    100,101,103
> 	window-3	    100
> 
> 	the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
> 	1—	100,101,102,100,101,103,100           collect all the element of all the window
> 	2—  100							   the element of the latest window
> 	3—  101,102,103					   the distinct value of all the window
> 
> 
> Best,
> Henry
> 
> 
>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi,
>> 
>> The semantics of a query do not depend on the way that it is used.
>> praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
>> This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.
>> 
>> Best, Fabian
>> 
>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexutao@gmail.com <ma...@gmail.com>>:
>> Hi All,
>> 	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
>> 	tableEnv.registerTable("praiseAggr", praiseAggr)
>>     var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
>>     tableEnv.registerTable("finalTable", finalTable)
>> 	 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
>> 	 Thanks a lot.
>> 
>> 
>> Best, 
>> Henry
>> 
> 
> 


Re: Semantic when table joins table from window

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Henry,

praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
1. if you change your sql to s"SELECT article_id FROM praise GROUP BY
article_id", the answer is "101,102,103"
2. if you change your sql to s"SELECT last_value(article_id) FROM praise",
the answer is "100"

Best, Hequn

On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <ha...@gmail.com> wrote:

> Hi Fabian,
> Thanks for your response. This question puzzles me for quite a long time.
> If the praiseAggr has the following value:
> window-1     100,101,102
> window-2     100,101,103
> window-3     100
>
> the last time the article table joins praiseAggr, which of the following
> value does praiseAggr table has?
> 1— 100,101,102,100,101,103,100           collect all the element of all
> the window
> 2—  100    the element of the latest window
> 3—  101,102,103    the distinct value of all the window
>
>
> Best,
> Henry
>
>
> 在 2018年8月21日,下午8:02,Fabian Hueske <fh...@gmail.com> 写道:
>
> Hi,
>
> The semantics of a query do not depend on the way that it is used.
> praiseAggr is a table that grows by one row per second and article_id. If
> you use that table in a join, the join will fully materialize the table.
> This is a special case because the same row is added multiple times, so
> the state won't grow that quickly, but the performance will decrease
> because for each row from article will join with multiple (a growing
> number) of rows from praiseAggr.
>
> Best, Fabian
>
> 2018-08-21 12:19 GMT+02:00 徐涛 <ha...@gmail.com>:
>
>> Hi All,
>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP
>> BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id"
>> )
>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>
>>     var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" *)
>>     tableEnv.registerTable("finalTable", finalTable)
>>
>>  I know that praiseAggr, if written to sink, is append mode , so if a
>> table joins praiseAggr, what the table “see”, is a table contains the
>> latest value, or a table that grows larger and larger? If it is the later,
>> will it introduce performance problem?
>>  Thanks a lot.
>>
>>
>> Best,
>> Henry
>>
>
>
>

Re: Semantic when table joins table from window

Posted by 徐涛 <ha...@gmail.com>.
Hi Fabian,
	Thanks for your response. This question puzzles me for quite a long time.
	If the praiseAggr has the following value:
	window-1     100,101,102
	window-2	    100,101,103
	window-3	    100

	the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
	1—	100,101,102,100,101,103,100           collect all the element of all the window
	2—  100							   the element of the latest window
	3—  101,102,103					   the distinct value of all the window


Best,
Henry

> 在 2018年8月21日,下午8:02,Fabian Hueske <fh...@gmail.com> 写道:
> 
> Hi,
> 
> The semantics of a query do not depend on the way that it is used.
> praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
> This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.
> 
> Best, Fabian
> 
> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexutao@gmail.com <ma...@gmail.com>>:
> Hi All,
> 	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
> 	tableEnv.registerTable("praiseAggr", praiseAggr)
>     var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
>     tableEnv.registerTable("finalTable", finalTable)
> 	 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
> 	 Thanks a lot.
> 
> 
> Best, 
> Henry
> 


Re: Semantic when table joins table from window

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

The semantics of a query do not depend on the way that it is used.
praiseAggr is a table that grows by one row per second and article_id. If
you use that table in a join, the join will fully materialize the table.
This is a special case because the same row is added multiple times, so the
state won't grow that quickly, but the performance will decrease because
for each row from article will join with multiple (a growing number) of
rows from praiseAggr.

Best, Fabian

2018-08-21 12:19 GMT+02:00 徐涛 <ha...@gmail.com>:

> Hi All,
> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP
> BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id"
> )
> tableEnv.registerTable("praiseAggr", praiseAggr)
>
>     var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" *)
>     tableEnv.registerTable("finalTable", finalTable)
>
>  I know that praiseAggr, if written to sink, is append mode , so if a
> table joins praiseAggr, what the table “see”, is a table contains the
> latest value, or a table that grows larger and larger? If it is the later,
> will it introduce performance problem?
>  Thanks a lot.
>
>
> Best,
> Henry
>