You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Radu Tudoran <ra...@huawei.com> on 2017/03/31 07:34:37 UTC

concurrency?

Hi,

I would like to use a processFunction to accumulate elements. Therefore in the processElement function I will accumulate this element into a state. However, I would like to emit the output only 1ms later. Therefore I would register a timer to trigger one second later and read the state and emit it.
However, I am curious of what happens if in the next ms another event arrives. In principle both the processElement function and the onTimer function should be triggered in the same time.  My question is: is there a fix order to execute them?  Because if any of them work just like normal threads, than concurrency related issues could happen when accessing the state.


Re: concurrency?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
just a bit of clarification. In the example above:

time = 1, processElement(A) -> put A in state keyed to t=1, registerProcTimer(2)
time = 1, processElement(B) -> put B in state keyed to t=1, registerProcTimer(2) // deduplicated
time = 2, onTimer(2) -> access state with key t=2-1, get A, B
time = 2, pocessElement(C) -> put C in state keyed to t=2, registerProcTimer(3)

the order of the two last calls is not deterministic. If you have a timer set for proc-time == 2 and an elements also arrives exactly at proc-time == 2 then the order of those onTimer() and processElement() calls is arbitrary. I think the only way of ensuring deterministic behaviour here is to put everything that happens at proc-time == 2 into a buffer, wait for proc-time == 3 and then process the buffered invocations in a deterministic order. This is not something that we plan to do, I think.

Best,
Aljoscha
> On 31. Mar 2017, at 13:20, Radu Tudoran <ra...@huawei.com> wrote:
> 
> Hi,
>  
> Yes it does – thanks a lot
>  
> Knowing that this is the order
> time = 2, onTimer(2) -> access state with key t=2-1, get A, B
> time = 2, pocessElement(C) -> put C in state keyed to t=2, registerProcTimer(3)
> is useful!
>  
> Dr. Radu Tudoran
> Senior Research Engineer - Big Data Expert
> IT R&D Division
>  
> <image001.png>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> German Research Center
> Munich Office
> Riesstrasse 25, 80992 München
>  
> E-mail: radu.tudoran@huawei.com <ma...@huawei.com>
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com <http://www.huawei.com/>
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>  
> From: Fabian Hueske [mailto:fhueske@gmail.com] 
> Sent: Friday, March 31, 2017 12:00 PM
> To: Radu Tudoran
> Cc: user@flink.apache.org
> Subject: Re: concurrency?
>  
> Hi Radu,
> 
> timers are fired in order of their time stamps. 
> Multiple timers on the same time are deduplicated. 
>  
> if you have the following logic:
> 
> time = 1, processElement(A) -> put A in state keyed to t=1, registerProcTimer(2)
> time = 1, processElement(B) -> put B in state keyed to t=1, registerProcTimer(2) // deduplicated
> time = 2, onTimer(2) -> access state with key t=2-1, get A, B
> time = 2, pocessElement(C) -> put C in state keyed to t=2, registerProcTimer(3)
> ...
> 
> You get all calls in the right order.
> 
> Does that answer you questions?
>  
>  
> 2017-03-31 11:36 GMT+02:00 Radu Tudoran <radu.tudoran@huawei.com <ma...@huawei.com>>:
> Hi,
>  
> Thanks Fabian. But is there also a fixed order that is imposed in their execution?
>  
> I am asking this because it is not enough just to have them executed atomically. If once you have the processElement() being called and then onTimer(), and in the next called you have them vice versa, it would mean that you need additional mechanism to synchronize your logic. Right?
> For example if in the
> process element you do state.update (newValue)
> and in the ontimer you do out.collect(state.getValue())
>  
> than if you have ev1,ev2 and eve3 coming at consecutive times and once the function are executed processelement and than timer and then in reverse order your output would be:
>  
> time1: (processElement)                                    ev1 –arrives state=ev1            
> time2: (processElement – executed first)       ev2-arrives state=ev2              onTime(executed second):   out = ev2
> time3: (processElement – executed second) ev3-arrives state=ev3              onTime(executed first):          out = ev2
>  
> Best regards,
>  
> Dr. Radu Tudoran
> Senior Research Engineer - Big Data Expert
> IT R&D Division
>  
> <image001.png>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> German Research Center
> Munich Office
> Riesstrasse 25, 80992 München
>  
> E-mail: radu.tudoran@huawei.com <ma...@huawei.com>
> Mobile: +49 15209084330 <tel:+49%201520%209084330>
> Telephone: +49 891588344173 <tel:+49%2089%201588344173>
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com <http://www.huawei.com/>
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>  
> From: Fabian Hueske [mailto:fhueske@gmail.com <ma...@gmail.com>] 
> Sent: Friday, March 31, 2017 11:05 AM
> To: Radu Tudoran
> Cc: user@flink.apache.org <ma...@flink.apache.org>
> Subject: Re: concurrency?
>  
> Hi Radu,
> 
> the processElement() and onTimer() calls are synchronized by a lock, i.e., they won't be called at the same time.
> 
> Best, Fabian
>  
> 2017-03-31 9:34 GMT+02:00 Radu Tudoran <radu.tudoran@huawei.com <ma...@huawei.com>>:
> Hi,
>  
> I would like to use a processFunction to accumulate elements. Therefore in the processElement function I will accumulate this element into a state. However, I would like to emit the output only 1ms later. Therefore I would register a timer to trigger one second later and read the state and emit it.
> However, I am curious of what happens if in the next ms another event arrives. In principle both the processElement function and the onTimer function should be triggered in the same time.  My question is: is there a fix order to execute them?  Because if any of them work just like normal threads, than concurrency related issues could happen when accessing the state.


RE: concurrency?

Posted by Radu Tudoran <ra...@huawei.com>.
Hi,

Yes it does – thanks a lot

Knowing that this is the order
time = 2, onTimer(2) -> access state with key t=2-1, get A, B
time = 2, pocessElement(C) -> put C in state keyed to t=2, registerProcTimer(3)
is useful!

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München

E-mail: radu.tudoran@huawei.com<ma...@huawei.com>
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/>
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

From: Fabian Hueske [mailto:fhueske@gmail.com]
Sent: Friday, March 31, 2017 12:00 PM
To: Radu Tudoran
Cc: user@flink.apache.org
Subject: Re: concurrency?

Hi Radu,
timers are fired in order of their time stamps.
Multiple timers on the same time are deduplicated.

if you have the following logic:
time = 1, processElement(A) -> put A in state keyed to t=1, registerProcTimer(2)
time = 1, processElement(B) -> put B in state keyed to t=1, registerProcTimer(2) // deduplicated
time = 2, onTimer(2) -> access state with key t=2-1, get A, B
time = 2, pocessElement(C) -> put C in state keyed to t=2, registerProcTimer(3)
...
You get all calls in the right order.
Does that answer you questions?


2017-03-31 11:36 GMT+02:00 Radu Tudoran <ra...@huawei.com>>:
Hi,

Thanks Fabian. But is there also a fixed order that is imposed in their execution?

I am asking this because it is not enough just to have them executed atomically. If once you have the processElement() being called and then onTimer(), and in the next called you have them vice versa, it would mean that you need additional mechanism to synchronize your logic. Right?
For example if in the
process element you do state.update (newValue)
and in the ontimer you do out.collect(state.getValue())

than if you have ev1,ev2 and eve3 coming at consecutive times and once the function are executed processelement and than timer and then in reverse order your output would be:

time1: (processElement)                                    ev1 –arrives state=ev1
time2: (processElement – executed first)       ev2-arrives state=ev2              onTime(executed second):   out = ev2
time3: (processElement – executed second) ev3-arrives state=ev3              onTime(executed first):          out = ev2

Best regards,

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München

E-mail: radu.tudoran@huawei.com<ma...@huawei.com>
Mobile: +49 15209084330<tel:+49%201520%209084330>
Telephone: +49 891588344173<tel:+49%2089%201588344173>

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/>
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

From: Fabian Hueske [mailto:fhueske@gmail.com<ma...@gmail.com>]
Sent: Friday, March 31, 2017 11:05 AM
To: Radu Tudoran
Cc: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: concurrency?

Hi Radu,
the processElement() and onTimer() calls are synchronized by a lock, i.e., they won't be called at the same time.
Best, Fabian

2017-03-31 9:34 GMT+02:00 Radu Tudoran <ra...@huawei.com>>:
Hi,

I would like to use a processFunction to accumulate elements. Therefore in the processElement function I will accumulate this element into a state. However, I would like to emit the output only 1ms later. Therefore I would register a timer to trigger one second later and read the state and emit it.
However, I am curious of what happens if in the next ms another event arrives. In principle both the processElement function and the onTimer function should be triggered in the same time.  My question is: is there a fix order to execute them?  Because if any of them work just like normal threads, than concurrency related issues could happen when accessing the state.




Re: concurrency?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Radu,

timers are fired in order of their time stamps.
Multiple timers on the same time are deduplicated.

if you have the following logic:

time = 1, processElement(A) -> put A in state keyed to t=1,
registerProcTimer(2)
time = 1, processElement(B) -> put B in state keyed to t=1,
registerProcTimer(2) // deduplicated
time = 2, onTimer(2) -> access state with key t=2-1, get A, B
time = 2, pocessElement(C) -> put C in state keyed to t=2,
registerProcTimer(3)
...

You get all calls in the right order.

Does that answer you questions?


2017-03-31 11:36 GMT+02:00 Radu Tudoran <ra...@huawei.com>:

> Hi,
>
>
>
> Thanks Fabian. But is there also a fixed order that is imposed in their
> execution?
>
>
>
> I am asking this because it is not enough just to have them executed
> atomically. If once you have the processElement() being called and then
> onTimer(), and in the next called you have them vice versa, it would mean
> that you need additional mechanism to synchronize your logic. Right?
>
> For example if in the
>
> process element you do state.update (newValue)
>
> and in the ontimer you do out.collect(state.getValue())
>
>
>
> than if you have ev1,ev2 and eve3 coming at consecutive times and once the
> function are executed processelement and than timer and then in reverse
> order your output would be:
>
>
>
> time1: (processElement)                                    ev1 –arrives
> state=ev1
>
> time2: (processElement – executed first)       ev2-arrives state=ev2
>              onTime(executed second):   out = ev2
>
> time3: (processElement – executed second) ev3-arrives state=ev3
>              onTime(executed first):          out = ev2
>
>
>
> Best regards,
>
>
>
> Dr. Radu Tudoran
>
> Senior Research Engineer - Big Data Expert
>
> IT R&D Division
>
>
>
> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> German Research Center
>
> Munich Office
>
> Riesstrasse 25, 80992 München
>
>
>
> E-mail: *radu.tudoran@huawei.com <ra...@huawei.com>*
>
> Mobile: +49 15209084330 <+49%201520%209084330>
>
> Telephone: +49 891588344173 <+49%2089%201588344173>
>
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
>
> *From:* Fabian Hueske [mailto:fhueske@gmail.com]
> *Sent:* Friday, March 31, 2017 11:05 AM
> *To:* Radu Tudoran
> *Cc:* user@flink.apache.org
> *Subject:* Re: concurrency?
>
>
>
> Hi Radu,
>
> the processElement() and onTimer() calls are synchronized by a lock, i.e.,
> they won't be called at the same time.
>
> Best, Fabian
>
>
>
> 2017-03-31 9:34 GMT+02:00 Radu Tudoran <ra...@huawei.com>:
>
> Hi,
>
>
>
> I would like to use a processFunction to accumulate elements. Therefore in
> the processElement function I will accumulate this element into a state.
> However, I would like to emit the output only 1ms later. Therefore I would
> register a timer to trigger one second later and read the state and emit it.
>
> However, I am curious of what happens if in the next ms another event
> arrives. In principle both the processElement function and the onTimer
> function should be triggered in the same time.  My question is: is there a
> fix order to execute them?  Because if any of them work just like normal
> threads, than concurrency related issues could happen when accessing the
> state.
>
>
>
>
>

RE: concurrency?

Posted by Radu Tudoran <ra...@huawei.com>.
Hi,

Thanks Fabian. But is there also a fixed order that is imposed in their execution?

I am asking this because it is not enough just to have them executed atomically. If once you have the processElement() being called and then onTimer(), and in the next called you have them vice versa, it would mean that you need additional mechanism to synchronize your logic. Right?
For example if in the
process element you do state.update (newValue)
and in the ontimer you do out.collect(state.getValue())

than if you have ev1,ev2 and eve3 coming at consecutive times and once the function are executed processelement and than timer and then in reverse order your output would be:

time1: (processElement)                                    ev1 –arrives state=ev1
time2: (processElement – executed first)       ev2-arrives state=ev2              onTime(executed second):   out = ev2
time3: (processElement – executed second) ev3-arrives state=ev3              onTime(executed first):          out = ev2

Best regards,

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München

E-mail: radu.tudoran@huawei.com<ma...@huawei.com>
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/>
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

From: Fabian Hueske [mailto:fhueske@gmail.com]
Sent: Friday, March 31, 2017 11:05 AM
To: Radu Tudoran
Cc: user@flink.apache.org
Subject: Re: concurrency?

Hi Radu,
the processElement() and onTimer() calls are synchronized by a lock, i.e., they won't be called at the same time.
Best, Fabian

2017-03-31 9:34 GMT+02:00 Radu Tudoran <ra...@huawei.com>>:
Hi,

I would like to use a processFunction to accumulate elements. Therefore in the processElement function I will accumulate this element into a state. However, I would like to emit the output only 1ms later. Therefore I would register a timer to trigger one second later and read the state and emit it.
However, I am curious of what happens if in the next ms another event arrives. In principle both the processElement function and the onTimer function should be triggered in the same time.  My question is: is there a fix order to execute them?  Because if any of them work just like normal threads, than concurrency related issues could happen when accessing the state.



Re: concurrency?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Radu,

the processElement() and onTimer() calls are synchronized by a lock, i.e.,
they won't be called at the same time.

Best, Fabian

2017-03-31 9:34 GMT+02:00 Radu Tudoran <ra...@huawei.com>:

> Hi,
>
>
>
> I would like to use a processFunction to accumulate elements. Therefore in
> the processElement function I will accumulate this element into a state.
> However, I would like to emit the output only 1ms later. Therefore I would
> register a timer to trigger one second later and read the state and emit it.
>
> However, I am curious of what happens if in the next ms another event
> arrives. In principle both the processElement function and the onTimer
> function should be triggered in the same time.  My question is: is there a
> fix order to execute them?  Because if any of them work just like normal
> threads, than concurrency related issues could happen when accessing the
> state.
>
>
>