You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Saiph Kappa <sa...@gmail.com> on 2016/09/09 16:29:22 UTC

Why tuples are not ignored after watermark?

Hi,

I have a streaming (event time) application where I am receiving events
with the same assigned timestamp. I receive 10000 events in total on a
window of 5 minutes, but I emit water mark when 9000 elements have been
received. This watermark is 6 minutes after the assigned timestamps. My
question is: why the function that is associated with the window reads
10000 elements and not 9000? All elements that have a timestamp lower than
the watermark should be ignored (1000), but it's not happening.

Here is part of the code:
«
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val rawStream = env.socketTextStream("localhost", 4321)

val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String, Int,
Long)] {
      val timestamp = System.currentTimeMillis();

      override def extractTimestamp(element: (String, Int, Long),
previousElementTimestamp: Long): Long =
        timestamp

      override def checkAndGetNextWatermark(lastElement: (String, Int,
Long), extractedTimestamp: Long): Watermark = {
        if(lastElement._3 == 9000) {
          val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
          new watermark.Watermark(ts)
        } else null
      }
    }

val stream = rawStream.map(line => {
      val Array(p1, p2, p3) = line.split(" ")
      (p1, p2.toInt, p3.toLong)
    })
      .assignTimestampsAndWatermarks(punctuatedAssigner)

stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
»

Thanks!

Re: Why tuples are not ignored after watermark?

Posted by Fabian Hueske <fh...@gmail.com>.
No, this is not possible unless you use an external service such as a
database.
The assigners might run on different machines and Flink does not provide
utilities for r/w shared state.

Best, Fabian

2016-09-15 20:17 GMT+02:00 Saiph Kappa <sa...@gmail.com>:

> And is it possible to share state across parallel instances with
> AssignerWithPunctuatedWatermarks?
>
> Thanks!
>
> On Wed, Sep 14, 2016 at 9:52 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> the problem might be that your timestamp/watermark assigner is run in
>> parallel and that only one parallel instance of those operators emits the
>> watermark because only one of those parallel instances sees the element
>> with _3 == 9000. For the watermark to advance at an operator it needs to
>> advance in all upstream operations.
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 9 Sep 2016 at 18:29 Saiph Kappa <sa...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a streaming (event time) application where I am receiving events
>>> with the same assigned timestamp. I receive 10000 events in total on a
>>> window of 5 minutes, but I emit water mark when 9000 elements have been
>>> received. This watermark is 6 minutes after the assigned timestamps. My
>>> question is: why the function that is associated with the window reads
>>> 10000 elements and not 9000? All elements that have a timestamp lower than
>>> the watermark should be ignored (1000), but it's not happening.
>>>
>>> Here is part of the code:
>>> «
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> val rawStream = env.socketTextStream("localhost", 4321)
>>>
>>> val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String,
>>> Int, Long)] {
>>>       val timestamp = System.currentTimeMillis();
>>>
>>>       override def extractTimestamp(element: (String, Int, Long),
>>> previousElementTimestamp: Long): Long =
>>>         timestamp
>>>
>>>       override def checkAndGetNextWatermark(lastElement: (String, Int,
>>> Long), extractedTimestamp: Long): Watermark = {
>>>         if(lastElement._3 == 9000) {
>>>           val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
>>>           new watermark.Watermark(ts)
>>>         } else null
>>>       }
>>>     }
>>>
>>> val stream = rawStream.map(line => {
>>>       val Array(p1, p2, p3) = line.split(" ")
>>>       (p1, p2.toInt, p3.toLong)
>>>     })
>>>       .assignTimestampsAndWatermarks(punctuatedAssigner)
>>>
>>> stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
>>> »
>>>
>>> Thanks!
>>>
>>
>

Re: Why tuples are not ignored after watermark?

Posted by Saiph Kappa <sa...@gmail.com>.
And is it possible to share state across parallel instances with
AssignerWithPunctuatedWatermarks?

Thanks!

On Wed, Sep 14, 2016 at 9:52 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> the problem might be that your timestamp/watermark assigner is run in
> parallel and that only one parallel instance of those operators emits the
> watermark because only one of those parallel instances sees the element
> with _3 == 9000. For the watermark to advance at an operator it needs to
> advance in all upstream operations.
>
> Cheers,
> Aljoscha
>
> On Fri, 9 Sep 2016 at 18:29 Saiph Kappa <sa...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a streaming (event time) application where I am receiving events
>> with the same assigned timestamp. I receive 10000 events in total on a
>> window of 5 minutes, but I emit water mark when 9000 elements have been
>> received. This watermark is 6 minutes after the assigned timestamps. My
>> question is: why the function that is associated with the window reads
>> 10000 elements and not 9000? All elements that have a timestamp lower than
>> the watermark should be ignored (1000), but it's not happening.
>>
>> Here is part of the code:
>> «
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> val rawStream = env.socketTextStream("localhost", 4321)
>>
>> val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String,
>> Int, Long)] {
>>       val timestamp = System.currentTimeMillis();
>>
>>       override def extractTimestamp(element: (String, Int, Long),
>> previousElementTimestamp: Long): Long =
>>         timestamp
>>
>>       override def checkAndGetNextWatermark(lastElement: (String, Int,
>> Long), extractedTimestamp: Long): Watermark = {
>>         if(lastElement._3 == 9000) {
>>           val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
>>           new watermark.Watermark(ts)
>>         } else null
>>       }
>>     }
>>
>> val stream = rawStream.map(line => {
>>       val Array(p1, p2, p3) = line.split(" ")
>>       (p1, p2.toInt, p3.toLong)
>>     })
>>       .assignTimestampsAndWatermarks(punctuatedAssigner)
>>
>> stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
>> »
>>
>> Thanks!
>>
>

Re: Why tuples are not ignored after watermark?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
the problem might be that your timestamp/watermark assigner is run in
parallel and that only one parallel instance of those operators emits the
watermark because only one of those parallel instances sees the element
with _3 == 9000. For the watermark to advance at an operator it needs to
advance in all upstream operations.

Cheers,
Aljoscha

On Fri, 9 Sep 2016 at 18:29 Saiph Kappa <sa...@gmail.com> wrote:

> Hi,
>
> I have a streaming (event time) application where I am receiving events
> with the same assigned timestamp. I receive 10000 events in total on a
> window of 5 minutes, but I emit water mark when 9000 elements have been
> received. This watermark is 6 minutes after the assigned timestamps. My
> question is: why the function that is associated with the window reads
> 10000 elements and not 9000? All elements that have a timestamp lower than
> the watermark should be ignored (1000), but it's not happening.
>
> Here is part of the code:
> «
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val rawStream = env.socketTextStream("localhost", 4321)
>
> val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String,
> Int, Long)] {
>       val timestamp = System.currentTimeMillis();
>
>       override def extractTimestamp(element: (String, Int, Long),
> previousElementTimestamp: Long): Long =
>         timestamp
>
>       override def checkAndGetNextWatermark(lastElement: (String, Int,
> Long), extractedTimestamp: Long): Watermark = {
>         if(lastElement._3 == 9000) {
>           val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
>           new watermark.Watermark(ts)
>         } else null
>       }
>     }
>
> val stream = rawStream.map(line => {
>       val Array(p1, p2, p3) = line.split(" ")
>       (p1, p2.toInt, p3.toLong)
>     })
>       .assignTimestampsAndWatermarks(punctuatedAssigner)
>
> stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
> »
>
> Thanks!
>