You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 徐涛 <ha...@gmail.com> on 2019/03/21 08:39:01 UTC

Best practice to handle update messages in stream

Hi Experts,
	Assuming there is a stream which content is like this:
        Seq     ID             MONEY
	1.        100           100
        2.        100           200
        3.        101           300

	The record of Seq#2 is updating record of Seq#1, changing the money from 100 to 200.
	If I register the stream as table T, and want to sum all the money group by each ID, if I write  "select sum(MONEY) from T”, will get 600 as the result, which is incorrect.

	I can write a UDAF, for example latest, to compute the latest value of all the ID, then the SQL is like this:
	select sum(MONEY) from
	(
		select ID, latest(MONEY) from T group by ID
	)
	But I have to save each ID and its latest value in state, I am worried that the state goes too large. Now I use this method and set the state retention to several days before the state goes too large. I wonder if there are better ways to do this.

	So what is the best practice in this scenario? Anyone have a suggestion? Thanks a lot.


Best
Henry
	

Re: Best practice to handle update messages in stream

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

There is an ongoing work [1] to support natively the streams like you described (we call them upsert streams/changelogs). But it boils down to the exactly the same thing you have done - aggregating the records per key and adding `latest` aggregation function. Until we support this natively, you can use the query that you have written.

Regarding the state size. In most cases there is no workaround this issue. Records overwriting previous value could arrive at arbitrary point of time and for most of the operations (like SUM aggregation in your case, filtering) we need to keep the previous value for the key on the state. Sometimes it might be possible to optimise the query and skip the “latest value aggregation”, if the following SQL operator either do not need to know the previous value (like sink or projection) or if the following SQL operator knows the previous value anyway (like join).

Piotr Nowojski

[1] https://issues.apache.org/jira/browse/FLINK-8545 <https://issues.apache.org/jira/browse/FLINK-8545>

> On 21 Mar 2019, at 09:39, 徐涛 <ha...@gmail.com> wrote:
> 
> Hi Experts,
> 	Assuming there is a stream which content is like this:
>        Seq     ID             MONEY
> 	1.        100           100
>        2.        100           200
>        3.        101           300
> 
> 	The record of Seq#2 is updating record of Seq#1, changing the money from 100 to 200.
> 	If I register the stream as table T, and want to sum all the money group by each ID, if I write  "select sum(MONEY) from T”, will get 600 as the result, which is incorrect.
> 
> 	I can write a UDAF, for example latest, to compute the latest value of all the ID, then the SQL is like this:
> 	select sum(MONEY) from
> 	(
> 		select ID, latest(MONEY) from T group by ID
> 	)
> 	But I have to save each ID and its latest value in state, I am worried that the state goes too large. Now I use this method and set the state retention to several days before the state goes too large. I wonder if there are better ways to do this.
> 
> 	So what is the best practice in this scenario? Anyone have a suggestion? Thanks a lot.
> 
> 
> Best
> Henry
>