You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Saiph Kappa <sa...@gmail.com> on 2016/09/09 16:29:22 UTC
Why tuples are not ignored after watermark?
Hi,
I have a streaming (event time) application where I am receiving events
with the same assigned timestamp. I receive 10000 events in total on a
window of 5 minutes, but I emit water mark when 9000 elements have been
received. This watermark is 6 minutes after the assigned timestamps. My
question is: why the function that is associated with the window reads
10000 elements and not 9000? All elements that have a timestamp lower than
the watermark should be ignored (1000), but it's not happening.
Here is part of the code:
«
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val rawStream = env.socketTextStream("localhost", 4321)
val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String, Int,
Long)] {
val timestamp = System.currentTimeMillis();
override def extractTimestamp(element: (String, Int, Long),
previousElementTimestamp: Long): Long =
timestamp
override def checkAndGetNextWatermark(lastElement: (String, Int,
Long), extractedTimestamp: Long): Watermark = {
if(lastElement._3 == 9000) {
val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
new watermark.Watermark(ts)
} else null
}
}
val stream = rawStream.map(line => {
val Array(p1, p2, p3) = line.split(" ")
(p1, p2.toInt, p3.toLong)
})
.assignTimestampsAndWatermarks(punctuatedAssigner)
stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
»
Thanks!
Re: Why tuples are not ignored after watermark?
Posted by Fabian Hueske <fh...@gmail.com>.
No, this is not possible unless you use an external service such as a
database.
The assigners might run on different machines and Flink does not provide
utilities for r/w shared state.
Best, Fabian
2016-09-15 20:17 GMT+02:00 Saiph Kappa <sa...@gmail.com>:
> And is it possible to share state across parallel instances with
> AssignerWithPunctuatedWatermarks?
>
> Thanks!
>
> On Wed, Sep 14, 2016 at 9:52 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> the problem might be that your timestamp/watermark assigner is run in
>> parallel and that only one parallel instance of those operators emits the
>> watermark because only one of those parallel instances sees the element
>> with _3 == 9000. For the watermark to advance at an operator it needs to
>> advance in all upstream operations.
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 9 Sep 2016 at 18:29 Saiph Kappa <sa...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a streaming (event time) application where I am receiving events
>>> with the same assigned timestamp. I receive 10000 events in total on a
>>> window of 5 minutes, but I emit water mark when 9000 elements have been
>>> received. This watermark is 6 minutes after the assigned timestamps. My
>>> question is: why the function that is associated with the window reads
>>> 10000 elements and not 9000? All elements that have a timestamp lower than
>>> the watermark should be ignored (1000), but it's not happening.
>>>
>>> Here is part of the code:
>>> «
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> val rawStream = env.socketTextStream("localhost", 4321)
>>>
>>> val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String,
>>> Int, Long)] {
>>> val timestamp = System.currentTimeMillis();
>>>
>>> override def extractTimestamp(element: (String, Int, Long),
>>> previousElementTimestamp: Long): Long =
>>> timestamp
>>>
>>> override def checkAndGetNextWatermark(lastElement: (String, Int,
>>> Long), extractedTimestamp: Long): Watermark = {
>>> if(lastElement._3 == 9000) {
>>> val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
>>> new watermark.Watermark(ts)
>>> } else null
>>> }
>>> }
>>>
>>> val stream = rawStream.map(line => {
>>> val Array(p1, p2, p3) = line.split(" ")
>>> (p1, p2.toInt, p3.toLong)
>>> })
>>> .assignTimestampsAndWatermarks(punctuatedAssigner)
>>>
>>> stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
>>> »
>>>
>>> Thanks!
>>>
>>
>
Re: Why tuples are not ignored after watermark?
Posted by Saiph Kappa <sa...@gmail.com>.
And is it possible to share state across parallel instances with
AssignerWithPunctuatedWatermarks?
Thanks!
On Wed, Sep 14, 2016 at 9:52 AM, Aljoscha Krettek <al...@apache.org>
wrote:
> Hi,
> the problem might be that your timestamp/watermark assigner is run in
> parallel and that only one parallel instance of those operators emits the
> watermark because only one of those parallel instances sees the element
> with _3 == 9000. For the watermark to advance at an operator it needs to
> advance in all upstream operations.
>
> Cheers,
> Aljoscha
>
> On Fri, 9 Sep 2016 at 18:29 Saiph Kappa <sa...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a streaming (event time) application where I am receiving events
>> with the same assigned timestamp. I receive 10000 events in total on a
>> window of 5 minutes, but I emit water mark when 9000 elements have been
>> received. This watermark is 6 minutes after the assigned timestamps. My
>> question is: why the function that is associated with the window reads
>> 10000 elements and not 9000? All elements that have a timestamp lower than
>> the watermark should be ignored (1000), but it's not happening.
>>
>> Here is part of the code:
>> «
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> val rawStream = env.socketTextStream("localhost", 4321)
>>
>> val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String,
>> Int, Long)] {
>> val timestamp = System.currentTimeMillis();
>>
>> override def extractTimestamp(element: (String, Int, Long),
>> previousElementTimestamp: Long): Long =
>> timestamp
>>
>> override def checkAndGetNextWatermark(lastElement: (String, Int,
>> Long), extractedTimestamp: Long): Watermark = {
>> if(lastElement._3 == 9000) {
>> val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
>> new watermark.Watermark(ts)
>> } else null
>> }
>> }
>>
>> val stream = rawStream.map(line => {
>> val Array(p1, p2, p3) = line.split(" ")
>> (p1, p2.toInt, p3.toLong)
>> })
>> .assignTimestampsAndWatermarks(punctuatedAssigner)
>>
>> stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
>> »
>>
>> Thanks!
>>
>
Re: Why tuples are not ignored after watermark?
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
the problem might be that your timestamp/watermark assigner is run in
parallel and that only one parallel instance of those operators emits the
watermark because only one of those parallel instances sees the element
with _3 == 9000. For the watermark to advance at an operator it needs to
advance in all upstream operations.
Cheers,
Aljoscha
On Fri, 9 Sep 2016 at 18:29 Saiph Kappa <sa...@gmail.com> wrote:
> Hi,
>
> I have a streaming (event time) application where I am receiving events
> with the same assigned timestamp. I receive 10000 events in total on a
> window of 5 minutes, but I emit water mark when 9000 elements have been
> received. This watermark is 6 minutes after the assigned timestamps. My
> question is: why the function that is associated with the window reads
> 10000 elements and not 9000? All elements that have a timestamp lower than
> the watermark should be ignored (1000), but it's not happening.
>
> Here is part of the code:
> «
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val rawStream = env.socketTextStream("localhost", 4321)
>
> val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String,
> Int, Long)] {
> val timestamp = System.currentTimeMillis();
>
> override def extractTimestamp(element: (String, Int, Long),
> previousElementTimestamp: Long): Long =
> timestamp
>
> override def checkAndGetNextWatermark(lastElement: (String, Int,
> Long), extractedTimestamp: Long): Watermark = {
> if(lastElement._3 == 9000) {
> val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
> new watermark.Watermark(ts)
> } else null
> }
> }
>
> val stream = rawStream.map(line => {
> val Array(p1, p2, p3) = line.split(" ")
> (p1, p2.toInt, p3.toLong)
> })
> .assignTimestampsAndWatermarks(punctuatedAssigner)
>
> stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
> »
>
> Thanks!
>