You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kenneth Knowles <ke...@apache.org> on 2019/05/13 02:47:57 UTC

Re: PubSubIO watermark not advancing for low volumes

Nice analysis & details!

Thanks to your info, I think it is the configuration of MovingFunction [1]
that is the likely culprit, but I don't totally understand why. It is
configured like so:

 - store 60 seconds of data
 - update data every 5 seconds
 - require at least 10 messages to be 'significant'
 - require messages from at least 2 distinct 5 second update periods to
'significant'

I would expect a rate of 1 message per second to satisfy this. I may have
read something wrong.

Have you filed an issue in Jira [2]?

Kenn

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
[2] https://issues.apache.org/jira/projects/BEAM/issues

*From: *Tim Sell <ts...@google.com>
*Date: *Fri, May 10, 2019 at 4:09 AM
*To: * <us...@beam.apache.org>

Hello,
>
> I have identified an issue where the watermark does not advance when using
> the beam PubSubIO when volumes are very low.
>
> The behaviour is easily replicated if you apply a fixed window triggering
> after the watermark passes the end of the window.
>
> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>     .apply(ParDo.of(new ParseScoreEventFn()))
>     .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>         .triggering(AfterWatermark.pastEndOfWindow())
>         .withAllowedLateness(Duration.standardSeconds(60))
>         .discardingFiredPanes())
>     .apply(MapElements.into(kvs(strings(), integers()))
>         .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), scoreEvent.getScore())))
>     .apply(Count.perKey())
>     .apply(ParDo.of(Log.of("counted per key")));
>
> With this triggering, using both the flink local runner the direct runner, *no
> panes will ever be emitted* if the volume of messages in pubsub is very
> low. eg 1 per second.
>
> If I change the triggering to have early firings I get exactly the emitted
> panes that you would expect.
>
> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>     .triggering(AfterWatermark.pastEndOfWindow()
>         .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>             .alignedTo(Duration.standardSeconds(60))))
>     .withAllowedLateness(Duration.standardSeconds(60))
>     .discardingFiredPanes())
>
> I can use any variation of early firing triggers and they work as expected.
>
> We believe that the watermark is not advancing at all when the volume is
> too low because of the sampling that PubSubIO does to determine it's
> watermark. It just never has a large enough sample.
> This problem occurs in the direct runner and flink runner, but not in the
> dataflow runner (because dataflow uses it's own PubSubIO because dataflow
> has access to internal details of pubsub and so doesn't need to do any
> sampling).
>
>
> I have also verified that for high volumes of messages, the PubSubIO
> *does* successfully advance the watermark. Here's a python script I wrote
> to mass produce random messages:
>
> import json
> import random
> from google.cloud import pubsub_v1
>
>
> def publish_loop(n, project_id, topic_name):
>     publisher = pubsub_v1.PublisherClient()
>     topic_path = publisher.topic_path(project_id, topic_name)
>     rand = random.Random()
>     players = ["eufy"]
>
>     for i in range(n):
>         score = rand.randint(1, 10)
>         player = rand.choice(players)
>         message = json.dumps({
>           "player": player,
>           "score": score,
>         })
>         print("'%s'" % message)
>         publisher.publish(topic_path, data=message.encode("utf-8"))
>
> Running my code without early firings on Dataflow, I verified it does
> count them as you'd expect.
>
> [image: Screen Shot 2019-05-08 at 16.41.02.png]
>
> Doing the same using the direct runner, it struggles to process messages
> at rate they are being produced... but it does eventually close some
> windows. Here are screenshots of logs with early firings turned on and then
> off.
>
> [image: Screen Shot 2019-05-08 at 17.02.14.png]
>
> [image: Screen Shot 2019-05-08 at 17.21.46.png]
>
> The key here is that you can see that is logging the ON_TIME panes. This
> *never* happened for me if the message rate was as low as 1 per second.
>
> Has anyone else seen this behaviour, where no ON_TIME panes are emitted
> when there are low volumes from a PubSubIO (when not using Dataflow)?
> I believe the details that cause this are within the getWatermark function
> in PubsubUnboundedSource, but it looks too delicate for me to approach.
>
> It's a problem because we ideally want it to behave well at low volumes
> too, but also because this is often one of the first streaming examples
> people try. We discovered this while trying to train people on streaming
> and it was a bit awkward :)
>
> Tim Sell
>
>
>

Re: PubSubIO watermark not advancing for low volumes

Posted by Tim Sell <ts...@google.com>.
Thanks!

I made a jira
https://issues.apache.org/jira/browse/BEAM-7322

And dumped my sample code here:
https://github.com/tims/beam/tree/master/pubsub-watermark

*From: *Alexey Romanenko <ar...@gmail.com>
*Date: *Wed, May 15, 2019 at 12:18 AM
*To: * <us...@beam.apache.org>

Not sure that this can be very helpful but I recall a similar issue with
> KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed.
>
> [1] https://issues.apache.org/jira/browse/BEAM-5063
> [2] https://github.com/apache/beam/pull/6178
>
> On 13 May 2019, at 20:52, Kenneth Knowles <ke...@apache.org> wrote:
>
> You should definitely not feel foolish. That was a great report. I expect
> many users face the same situation. If they are lurking on this list, then
> you will have helped them already.
>
> Reza - I expect you should weigh in on the Jira, too, since the "one
> message test" use case seems like it wouldn't work at all with those
> MovingFunction params. But I may not understand all the subtleties of the
> connector.
>
> Kenn
>
> *From: *Tim Sell <ts...@google.com>
> *Date: *Mon, May 13, 2019 at 8:06 AM
> *To: * <us...@beam.apache.org>
>
> Thanks for the feedback, I did some more investigating after you said 1
>> second frequency should be enough to sample on.. And it is I feel foolish.
>> I think I just wasn't waiting long enough as it takes minutes to close
>> the windows. We waited much longer when we were just messages manually and
>> never had a window close.
>>
>> I'm generating some stats of lag times to window closing for different
>> frequencies, with code so people can reproduce it, then I'll add this to a
>> jira ticket.
>>
>> *From: *Kenneth Knowles <ke...@apache.org>
>> *Date: *Mon, May 13, 2019 at 10:48 AM
>> *To: * <us...@beam.apache.org>, dev
>>
>> Nice analysis & details!
>>>
>>> Thanks to your info, I think it is the configuration of MovingFunction
>>> [1] that is the likely culprit, but I don't totally understand why. It is
>>> configured like so:
>>>
>>>  - store 60 seconds of data
>>>  - update data every 5 seconds
>>>  - require at least 10 messages to be 'significant'
>>>  - require messages from at least 2 distinct 5 second update periods to
>>> 'significant'
>>>
>>> I would expect a rate of 1 message per second to satisfy this. I may
>>> have read something wrong.
>>>
>>> Have you filed an issue in Jira [2]?
>>>
>>> Kenn
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
>>> [2] https://issues.apache.org/jira/projects/BEAM/issues
>>>
>>> *From: *Tim Sell <ts...@google.com>
>>> *Date: *Fri, May 10, 2019 at 4:09 AM
>>> *To: * <us...@beam.apache.org>
>>>
>>> Hello,
>>>>
>>>> I have identified an issue where the watermark does not advance when
>>>> using the beam PubSubIO when volumes are very low.
>>>>
>>>> The behaviour is easily replicated if you apply a fixed window
>>>> triggering after the watermark passes the end of the window.
>>>>
>>>> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>>>>     .apply(ParDo.of(new ParseScoreEventFn()))
>>>>     .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>>>>         .triggering(AfterWatermark.pastEndOfWindow())
>>>>         .withAllowedLateness(Duration.standardSeconds(60))
>>>>         .discardingFiredPanes())
>>>>     .apply(MapElements.into(kvs(strings(), integers()))
>>>>         .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), scoreEvent.getScore())))
>>>>     .apply(Count.perKey())
>>>>     .apply(ParDo.of(Log.of("counted per key")));
>>>>
>>>> With this triggering, using both the flink local runner the direct
>>>> runner, *no panes will ever be emitted* if the volume of messages in
>>>> pubsub is very low. eg 1 per second.
>>>>
>>>> If I change the triggering to have early firings I get exactly the
>>>> emitted panes that you would expect.
>>>>
>>>> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>>>>     .triggering(AfterWatermark.pastEndOfWindow()
>>>>         .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>             .alignedTo(Duration.standardSeconds(60))))
>>>>     .withAllowedLateness(Duration.standardSeconds(60))
>>>>     .discardingFiredPanes())
>>>>
>>>> I can use any variation of early firing triggers and they work as
>>>> expected.
>>>>
>>>> We believe that the watermark is not advancing at all when the volume
>>>> is too low because of the sampling that PubSubIO does to determine it's
>>>> watermark. It just never has a large enough sample.
>>>> This problem occurs in the direct runner and flink runner, but not in
>>>> the dataflow runner (because dataflow uses it's own PubSubIO because
>>>> dataflow has access to internal details of pubsub and so doesn't need to do
>>>> any sampling).
>>>>
>>>>
>>>> I have also verified that for high volumes of messages, the PubSubIO
>>>> *does* successfully advance the watermark. Here's a python script I wrote
>>>> to mass produce random messages:
>>>>
>>>> import json
>>>> import random
>>>> from google.cloud import pubsub_v1
>>>>
>>>>
>>>> def publish_loop(n, project_id, topic_name):
>>>>     publisher = pubsub_v1.PublisherClient()
>>>>     topic_path = publisher.topic_path(project_id, topic_name)
>>>>     rand = random.Random()
>>>>     players = ["eufy"]
>>>>
>>>>     for i in range(n):
>>>>         score = rand.randint(1, 10)
>>>>         player = rand.choice(players)
>>>>         message = json.dumps({
>>>>           "player": player,
>>>>           "score": score,
>>>>         })
>>>>         print("'%s'" % message)
>>>>         publisher.publish(topic_path, data=message.encode("utf-8"))
>>>>
>>>> Running my code without early firings on Dataflow, I verified it does
>>>> count them as you'd expect.
>>>>
>>>> <Screen Shot 2019-05-08 at 16.41.02.png>
>>>>
>>>> Doing the same using the direct runner, it struggles to process
>>>> messages at rate they are being produced... but it does eventually close
>>>> some windows. Here are screenshots of logs with early firings turned on and
>>>> then off.
>>>>
>>>> <Screen Shot 2019-05-08 at 17.02.14.png>
>>>>
>>>> <Screen Shot 2019-05-08 at 17.21.46.png>
>>>>
>>>> The key here is that you can see that is logging the ON_TIME panes.
>>>> This *never* happened for me if the message rate was as low as 1 per second.
>>>>
>>>> Has anyone else seen this behaviour, where no ON_TIME panes are emitted
>>>> when there are low volumes from a PubSubIO (when not using Dataflow)?
>>>> I believe the details that cause this are within the getWatermark
>>>> function in PubsubUnboundedSource, but it looks too delicate for me to
>>>> approach.
>>>>
>>>> It's a problem because we ideally want it to behave well at low volumes
>>>> too, but also because this is often one of the first streaming examples
>>>> people try. We discovered this while trying to train people on streaming
>>>> and it was a bit awkward :)
>>>>
>>>> Tim Sell
>>>>
>>>>
>>>>
>

Re: PubSubIO watermark not advancing for low volumes

Posted by Alexey Romanenko <ar...@gmail.com>.
Not sure that this can be very helpful but I recall a similar issue with KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed.

[1] https://issues.apache.org/jira/browse/BEAM-5063 <https://issues.apache.org/jira/browse/BEAM-5063>
[2] https://github.com/apache/beam/pull/6178 <https://github.com/apache/beam/pull/6178>

> On 13 May 2019, at 20:52, Kenneth Knowles <ke...@apache.org> wrote:
> 
> You should definitely not feel foolish. That was a great report. I expect many users face the same situation. If they are lurking on this list, then you will have helped them already.
> 
> Reza - I expect you should weigh in on the Jira, too, since the "one message test" use case seems like it wouldn't work at all with those MovingFunction params. But I may not understand all the subtleties of the connector.
> 
> Kenn
> 
> From: Tim Sell <tsell@google.com <ma...@google.com>>
> Date: Mon, May 13, 2019 at 8:06 AM
> To: <user@beam.apache.org <ma...@beam.apache.org>>
> 
> Thanks for the feedback, I did some more investigating after you said 1 second frequency should be enough to sample on.. And it is I feel foolish. 
> I think I just wasn't waiting long enough as it takes minutes to close the windows. We waited much longer when we were just messages manually and never had a window close.
> 
> I'm generating some stats of lag times to window closing for different frequencies, with code so people can reproduce it, then I'll add this to a jira ticket.
> 
> From: Kenneth Knowles <kenn@apache.org <ma...@apache.org>>
> Date: Mon, May 13, 2019 at 10:48 AM
> To: <user@beam.apache.org <ma...@beam.apache.org>>, dev
> 
> Nice analysis & details!
> 
> Thanks to your info, I think it is the configuration of MovingFunction [1] that is the likely culprit, but I don't totally understand why. It is configured like so:
> 
>  - store 60 seconds of data
>  - update data every 5 seconds
>  - require at least 10 messages to be 'significant'
>  - require messages from at least 2 distinct 5 second update periods to 'significant'
> 
> I would expect a rate of 1 message per second to satisfy this. I may have read something wrong.
> 
> Have you filed an issue in Jira [2]?
> 
> Kenn
> 
> [1] https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508 <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508>
> [2] https://issues.apache.org/jira/projects/BEAM/issues <https://issues.apache.org/jira/projects/BEAM/issues>
> From: Tim Sell <tsell@google.com <ma...@google.com>>
> Date: Fri, May 10, 2019 at 4:09 AM
> To: <user@beam.apache.org <ma...@beam.apache.org>>
> 
> Hello,
> 
> I have identified an issue where the watermark does not advance when using the beam PubSubIO when volumes are very low.
> 
> The behaviour is easily replicated if you apply a fixed window triggering after the watermark passes the end of the window.
> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>     .apply(ParDo.of(new ParseScoreEventFn()))
>     .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>         .triggering(AfterWatermark.pastEndOfWindow())
>         .withAllowedLateness(Duration.standardSeconds(60))
>         .discardingFiredPanes())
>     .apply(MapElements.into(kvs(strings(), integers()))
>         .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), scoreEvent.getScore())))
>     .apply(Count.perKey())
>     .apply(ParDo.of(Log.of("counted per key")));
> With this triggering, using both the flink local runner the direct runner, no panes will ever be emitted if the volume of messages in pubsub is very low. eg 1 per second.
> 
> If I change the triggering to have early firings I get exactly the emitted panes that you would expect.
> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>     .triggering(AfterWatermark.pastEndOfWindow()
>         .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>             .alignedTo(Duration.standardSeconds(60))))
>     .withAllowedLateness(Duration.standardSeconds(60))
>     .discardingFiredPanes())
> I can use any variation of early firing triggers and they work as expected.
> 
> We believe that the watermark is not advancing at all when the volume is too low because of the sampling that PubSubIO does to determine it's watermark. It just never has a large enough sample. 
> This problem occurs in the direct runner and flink runner, but not in the dataflow runner (because dataflow uses it's own PubSubIO because dataflow has access to internal details of pubsub and so doesn't need to do any sampling).
> 
> 
> I have also verified that for high volumes of messages, the PubSubIO *does* successfully advance the watermark. Here's a python script I wrote to mass produce random messages:
> import json
> import random
> from google.cloud import pubsub_v1
> 
> 
> def publish_loop(n, project_id, topic_name):
>     publisher = pubsub_v1.PublisherClient()
>     topic_path = publisher.topic_path(project_id, topic_name)
>     rand = random.Random()
>     players = ["eufy"]
> 
>     for i in range(n):
>         score = rand.randint(1, 10)
>         player = rand.choice(players)
>         message = json.dumps({
>           "player": player,
>           "score": score,
>         })
>         print("'%s'" % message)
>         publisher.publish(topic_path, data=message.encode("utf-8"))
> Running my code without early firings on Dataflow, I verified it does count them as you'd expect.
> 
> <Screen Shot 2019-05-08 at 16.41.02.png>
> 
> Doing the same using the direct runner, it struggles to process messages at rate they are being produced... but it does eventually close some windows. Here are screenshots of logs with early firings turned on and then off.
> 
> <Screen Shot 2019-05-08 at 17.02.14.png>
> 
> <Screen Shot 2019-05-08 at 17.21.46.png>
> 
> The key here is that you can see that is logging the ON_TIME panes. This *never* happened for me if the message rate was as low as 1 per second.
> 
> Has anyone else seen this behaviour, where no ON_TIME panes are emitted when there are low volumes from a PubSubIO (when not using Dataflow)? 
> I believe the details that cause this are within the getWatermark function in PubsubUnboundedSource, but it looks too delicate for me to approach.
> 
> It's a problem because we ideally want it to behave well at low volumes too, but also because this is often one of the first streaming examples people try. We discovered this while trying to train people on streaming and it was a bit awkward :)
> 
> Tim Sell
> 
> 


Re: PubSubIO watermark not advancing for low volumes

Posted by Kenneth Knowles <ke...@apache.org>.
You should definitely not feel foolish. That was a great report. I expect
many users face the same situation. If they are lurking on this list, then
you will have helped them already.

Reza - I expect you should weigh in on the Jira, too, since the "one
message test" use case seems like it wouldn't work at all with those
MovingFunction params. But I may not understand all the subtleties of the
connector.

Kenn

*From: *Tim Sell <ts...@google.com>
*Date: *Mon, May 13, 2019 at 8:06 AM
*To: * <us...@beam.apache.org>

Thanks for the feedback, I did some more investigating after you said 1
> second frequency should be enough to sample on.. And it is I feel foolish.
> I think I just wasn't waiting long enough as it takes minutes to close the
> windows. We waited much longer when we were just messages manually and
> never had a window close.
>
> I'm generating some stats of lag times to window closing for different
> frequencies, with code so people can reproduce it, then I'll add this to a
> jira ticket.
>
> *From: *Kenneth Knowles <ke...@apache.org>
> *Date: *Mon, May 13, 2019 at 10:48 AM
> *To: * <us...@beam.apache.org>, dev
>
> Nice analysis & details!
>>
>> Thanks to your info, I think it is the configuration of MovingFunction
>> [1] that is the likely culprit, but I don't totally understand why. It is
>> configured like so:
>>
>>  - store 60 seconds of data
>>  - update data every 5 seconds
>>  - require at least 10 messages to be 'significant'
>>  - require messages from at least 2 distinct 5 second update periods to
>> 'significant'
>>
>> I would expect a rate of 1 message per second to satisfy this. I may have
>> read something wrong.
>>
>> Have you filed an issue in Jira [2]?
>>
>> Kenn
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
>> [2] https://issues.apache.org/jira/projects/BEAM/issues
>>
>> *From: *Tim Sell <ts...@google.com>
>> *Date: *Fri, May 10, 2019 at 4:09 AM
>> *To: * <us...@beam.apache.org>
>>
>> Hello,
>>>
>>> I have identified an issue where the watermark does not advance when
>>> using the beam PubSubIO when volumes are very low.
>>>
>>> The behaviour is easily replicated if you apply a fixed window
>>> triggering after the watermark passes the end of the window.
>>>
>>> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>>>     .apply(ParDo.of(new ParseScoreEventFn()))
>>>     .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>>>         .triggering(AfterWatermark.pastEndOfWindow())
>>>         .withAllowedLateness(Duration.standardSeconds(60))
>>>         .discardingFiredPanes())
>>>     .apply(MapElements.into(kvs(strings(), integers()))
>>>         .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), scoreEvent.getScore())))
>>>     .apply(Count.perKey())
>>>     .apply(ParDo.of(Log.of("counted per key")));
>>>
>>> With this triggering, using both the flink local runner the direct
>>> runner, *no panes will ever be emitted* if the volume of messages in
>>> pubsub is very low. eg 1 per second.
>>>
>>> If I change the triggering to have early firings I get exactly the
>>> emitted panes that you would expect.
>>>
>>> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>>>     .triggering(AfterWatermark.pastEndOfWindow()
>>>         .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>             .alignedTo(Duration.standardSeconds(60))))
>>>     .withAllowedLateness(Duration.standardSeconds(60))
>>>     .discardingFiredPanes())
>>>
>>> I can use any variation of early firing triggers and they work as
>>> expected.
>>>
>>> We believe that the watermark is not advancing at all when the volume is
>>> too low because of the sampling that PubSubIO does to determine it's
>>> watermark. It just never has a large enough sample.
>>> This problem occurs in the direct runner and flink runner, but not in
>>> the dataflow runner (because dataflow uses it's own PubSubIO because
>>> dataflow has access to internal details of pubsub and so doesn't need to do
>>> any sampling).
>>>
>>>
>>> I have also verified that for high volumes of messages, the PubSubIO
>>> *does* successfully advance the watermark. Here's a python script I wrote
>>> to mass produce random messages:
>>>
>>> import json
>>> import random
>>> from google.cloud import pubsub_v1
>>>
>>>
>>> def publish_loop(n, project_id, topic_name):
>>>     publisher = pubsub_v1.PublisherClient()
>>>     topic_path = publisher.topic_path(project_id, topic_name)
>>>     rand = random.Random()
>>>     players = ["eufy"]
>>>
>>>     for i in range(n):
>>>         score = rand.randint(1, 10)
>>>         player = rand.choice(players)
>>>         message = json.dumps({
>>>           "player": player,
>>>           "score": score,
>>>         })
>>>         print("'%s'" % message)
>>>         publisher.publish(topic_path, data=message.encode("utf-8"))
>>>
>>> Running my code without early firings on Dataflow, I verified it does
>>> count them as you'd expect.
>>>
>>> [image: Screen Shot 2019-05-08 at 16.41.02.png]
>>>
>>> Doing the same using the direct runner, it struggles to process messages
>>> at rate they are being produced... but it does eventually close some
>>> windows. Here are screenshots of logs with early firings turned on and then
>>> off.
>>>
>>> [image: Screen Shot 2019-05-08 at 17.02.14.png]
>>>
>>> [image: Screen Shot 2019-05-08 at 17.21.46.png]
>>>
>>> The key here is that you can see that is logging the ON_TIME panes. This
>>> *never* happened for me if the message rate was as low as 1 per second.
>>>
>>> Has anyone else seen this behaviour, where no ON_TIME panes are emitted
>>> when there are low volumes from a PubSubIO (when not using Dataflow)?
>>> I believe the details that cause this are within the getWatermark
>>> function in PubsubUnboundedSource, but it looks too delicate for me to
>>> approach.
>>>
>>> It's a problem because we ideally want it to behave well at low volumes
>>> too, but also because this is often one of the first streaming examples
>>> people try. We discovered this while trying to train people on streaming
>>> and it was a bit awkward :)
>>>
>>> Tim Sell
>>>
>>>
>>>

Re: PubSubIO watermark not advancing for low volumes

Posted by Tim Sell <ts...@google.com>.
Thanks for the feedback, I did some more investigating after you said 1
second frequency should be enough to sample on.. And it is I feel foolish.
I think I just wasn't waiting long enough as it takes minutes to close the
windows. We waited much longer when we were just messages manually and
never had a window close.

I'm generating some stats of lag times to window closing for different
frequencies, with code so people can reproduce it, then I'll add this to a
jira ticket.

*From: *Kenneth Knowles <ke...@apache.org>
*Date: *Mon, May 13, 2019 at 10:48 AM
*To: * <us...@beam.apache.org>, dev

Nice analysis & details!
>
> Thanks to your info, I think it is the configuration of MovingFunction [1]
> that is the likely culprit, but I don't totally understand why. It is
> configured like so:
>
>  - store 60 seconds of data
>  - update data every 5 seconds
>  - require at least 10 messages to be 'significant'
>  - require messages from at least 2 distinct 5 second update periods to
> 'significant'
>
> I would expect a rate of 1 message per second to satisfy this. I may have
> read something wrong.
>
> Have you filed an issue in Jira [2]?
>
> Kenn
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
> [2] https://issues.apache.org/jira/projects/BEAM/issues
>
> *From: *Tim Sell <ts...@google.com>
> *Date: *Fri, May 10, 2019 at 4:09 AM
> *To: * <us...@beam.apache.org>
>
> Hello,
>>
>> I have identified an issue where the watermark does not advance when
>> using the beam PubSubIO when volumes are very low.
>>
>> The behaviour is easily replicated if you apply a fixed window triggering
>> after the watermark passes the end of the window.
>>
>> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>>     .apply(ParDo.of(new ParseScoreEventFn()))
>>     .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>>         .triggering(AfterWatermark.pastEndOfWindow())
>>         .withAllowedLateness(Duration.standardSeconds(60))
>>         .discardingFiredPanes())
>>     .apply(MapElements.into(kvs(strings(), integers()))
>>         .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), scoreEvent.getScore())))
>>     .apply(Count.perKey())
>>     .apply(ParDo.of(Log.of("counted per key")));
>>
>> With this triggering, using both the flink local runner the direct
>> runner, *no panes will ever be emitted* if the volume of messages in
>> pubsub is very low. eg 1 per second.
>>
>> If I change the triggering to have early firings I get exactly the
>> emitted panes that you would expect.
>>
>> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>>     .triggering(AfterWatermark.pastEndOfWindow()
>>         .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>             .alignedTo(Duration.standardSeconds(60))))
>>     .withAllowedLateness(Duration.standardSeconds(60))
>>     .discardingFiredPanes())
>>
>> I can use any variation of early firing triggers and they work as
>> expected.
>>
>> We believe that the watermark is not advancing at all when the volume is
>> too low because of the sampling that PubSubIO does to determine it's
>> watermark. It just never has a large enough sample.
>> This problem occurs in the direct runner and flink runner, but not in the
>> dataflow runner (because dataflow uses it's own PubSubIO because dataflow
>> has access to internal details of pubsub and so doesn't need to do any
>> sampling).
>>
>>
>> I have also verified that for high volumes of messages, the PubSubIO
>> *does* successfully advance the watermark. Here's a python script I wrote
>> to mass produce random messages:
>>
>> import json
>> import random
>> from google.cloud import pubsub_v1
>>
>>
>> def publish_loop(n, project_id, topic_name):
>>     publisher = pubsub_v1.PublisherClient()
>>     topic_path = publisher.topic_path(project_id, topic_name)
>>     rand = random.Random()
>>     players = ["eufy"]
>>
>>     for i in range(n):
>>         score = rand.randint(1, 10)
>>         player = rand.choice(players)
>>         message = json.dumps({
>>           "player": player,
>>           "score": score,
>>         })
>>         print("'%s'" % message)
>>         publisher.publish(topic_path, data=message.encode("utf-8"))
>>
>> Running my code without early firings on Dataflow, I verified it does
>> count them as you'd expect.
>>
>> [image: Screen Shot 2019-05-08 at 16.41.02.png]
>>
>> Doing the same using the direct runner, it struggles to process messages
>> at rate they are being produced... but it does eventually close some
>> windows. Here are screenshots of logs with early firings turned on and then
>> off.
>>
>> [image: Screen Shot 2019-05-08 at 17.02.14.png]
>>
>> [image: Screen Shot 2019-05-08 at 17.21.46.png]
>>
>> The key here is that you can see that is logging the ON_TIME panes. This
>> *never* happened for me if the message rate was as low as 1 per second.
>>
>> Has anyone else seen this behaviour, where no ON_TIME panes are emitted
>> when there are low volumes from a PubSubIO (when not using Dataflow)?
>> I believe the details that cause this are within the getWatermark
>> function in PubsubUnboundedSource, but it looks too delicate for me to
>> approach.
>>
>> It's a problem because we ideally want it to behave well at low volumes
>> too, but also because this is often one of the first streaming examples
>> people try. We discovered this while trying to train people on streaming
>> and it was a bit awkward :)
>>
>> Tim Sell
>>
>>
>>

Re: PubSubIO watermark not advancing for low volumes

Posted by Reza Rokni <re...@google.com>.
Thanx Tim, awesome investigation work! :-)

Kenneth, also important to note this IO is often used in training
situations, where rather than a continuous rate of messages a single manual
message is sent.

Cheers

Reza

*From: *Kenneth Knowles <ke...@apache.org>
*Date: *Mon, 13 May 2019 at 10:48
*To: * <us...@beam.apache.org>, dev

Nice analysis & details!
>
> Thanks to your info, I think it is the configuration of MovingFunction [1]
> that is the likely culprit, but I don't totally understand why. It is
> configured like so:
>
>  - store 60 seconds of data
>  - update data every 5 seconds
>  - require at least 10 messages to be 'significant'
>  - require messages from at least 2 distinct 5 second update periods to
> 'significant'
>
> I would expect a rate of 1 message per second to satisfy this. I may have
> read something wrong.
>
> Have you filed an issue in Jira [2]?
>
> Kenn
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
> [2] https://issues.apache.org/jira/projects/BEAM/issues
>
> *From: *Tim Sell <ts...@google.com>
> *Date: *Fri, May 10, 2019 at 4:09 AM
> *To: * <us...@beam.apache.org>
>
> Hello,
>>
>> I have identified an issue where the watermark does not advance when
>> using the beam PubSubIO when volumes are very low.
>>
>> The behaviour is easily replicated if you apply a fixed window triggering
>> after the watermark passes the end of the window.
>>
>> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>>     .apply(ParDo.of(new ParseScoreEventFn()))
>>     .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>>         .triggering(AfterWatermark.pastEndOfWindow())
>>         .withAllowedLateness(Duration.standardSeconds(60))
>>         .discardingFiredPanes())
>>     .apply(MapElements.into(kvs(strings(), integers()))
>>         .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), scoreEvent.getScore())))
>>     .apply(Count.perKey())
>>     .apply(ParDo.of(Log.of("counted per key")));
>>
>> With this triggering, using both the flink local runner the direct
>> runner, *no panes will ever be emitted* if the volume of messages in
>> pubsub is very low. eg 1 per second.
>>
>> If I change the triggering to have early firings I get exactly the
>> emitted panes that you would expect.
>>
>> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>>     .triggering(AfterWatermark.pastEndOfWindow()
>>         .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>             .alignedTo(Duration.standardSeconds(60))))
>>     .withAllowedLateness(Duration.standardSeconds(60))
>>     .discardingFiredPanes())
>>
>> I can use any variation of early firing triggers and they work as
>> expected.
>>
>> We believe that the watermark is not advancing at all when the volume is
>> too low because of the sampling that PubSubIO does to determine it's
>> watermark. It just never has a large enough sample.
>> This problem occurs in the direct runner and flink runner, but not in the
>> dataflow runner (because dataflow uses it's own PubSubIO because dataflow
>> has access to internal details of pubsub and so doesn't need to do any
>> sampling).
>>
>>
>> I have also verified that for high volumes of messages, the PubSubIO
>> *does* successfully advance the watermark. Here's a python script I wrote
>> to mass produce random messages:
>>
>> import json
>> import random
>> from google.cloud import pubsub_v1
>>
>>
>> def publish_loop(n, project_id, topic_name):
>>     publisher = pubsub_v1.PublisherClient()
>>     topic_path = publisher.topic_path(project_id, topic_name)
>>     rand = random.Random()
>>     players = ["eufy"]
>>
>>     for i in range(n):
>>         score = rand.randint(1, 10)
>>         player = rand.choice(players)
>>         message = json.dumps({
>>           "player": player,
>>           "score": score,
>>         })
>>         print("'%s'" % message)
>>         publisher.publish(topic_path, data=message.encode("utf-8"))
>>
>> Running my code without early firings on Dataflow, I verified it does
>> count them as you'd expect.
>>
>> [image: Screen Shot 2019-05-08 at 16.41.02.png]
>>
>> Doing the same using the direct runner, it struggles to process messages
>> at rate they are being produced... but it does eventually close some
>> windows. Here are screenshots of logs with early firings turned on and then
>> off.
>>
>> [image: Screen Shot 2019-05-08 at 17.02.14.png]
>>
>> [image: Screen Shot 2019-05-08 at 17.21.46.png]
>>
>> The key here is that you can see that is logging the ON_TIME panes. This
>> *never* happened for me if the message rate was as low as 1 per second.
>>
>> Has anyone else seen this behaviour, where no ON_TIME panes are emitted
>> when there are low volumes from a PubSubIO (when not using Dataflow)?
>> I believe the details that cause this are within the getWatermark
>> function in PubsubUnboundedSource, but it looks too delicate for me to
>> approach.
>>
>> It's a problem because we ideally want it to behave well at low volumes
>> too, but also because this is often one of the first streaming examples
>> people try. We discovered this while trying to train people on streaming
>> and it was a bit awkward :)
>>
>> Tim Sell
>>
>>
>>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Re: PubSubIO watermark not advancing for low volumes

Posted by Reza Rokni <re...@google.com>.
Thanx Tim, awesome investigation work! :-)

Kenneth, also important to note this IO is often used in training
situations, where rather than a continuous rate of messages a single manual
message is sent.

Cheers

Reza

*From: *Kenneth Knowles <ke...@apache.org>
*Date: *Mon, 13 May 2019 at 10:48
*To: * <us...@beam.apache.org>, dev

Nice analysis & details!
>
> Thanks to your info, I think it is the configuration of MovingFunction [1]
> that is the likely culprit, but I don't totally understand why. It is
> configured like so:
>
>  - store 60 seconds of data
>  - update data every 5 seconds
>  - require at least 10 messages to be 'significant'
>  - require messages from at least 2 distinct 5 second update periods to
> 'significant'
>
> I would expect a rate of 1 message per second to satisfy this. I may have
> read something wrong.
>
> Have you filed an issue in Jira [2]?
>
> Kenn
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
> [2] https://issues.apache.org/jira/projects/BEAM/issues
>
> *From: *Tim Sell <ts...@google.com>
> *Date: *Fri, May 10, 2019 at 4:09 AM
> *To: * <us...@beam.apache.org>
>
> Hello,
>>
>> I have identified an issue where the watermark does not advance when
>> using the beam PubSubIO when volumes are very low.
>>
>> The behaviour is easily replicated if you apply a fixed window triggering
>> after the watermark passes the end of the window.
>>
>> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>>     .apply(ParDo.of(new ParseScoreEventFn()))
>>     .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>>         .triggering(AfterWatermark.pastEndOfWindow())
>>         .withAllowedLateness(Duration.standardSeconds(60))
>>         .discardingFiredPanes())
>>     .apply(MapElements.into(kvs(strings(), integers()))
>>         .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), scoreEvent.getScore())))
>>     .apply(Count.perKey())
>>     .apply(ParDo.of(Log.of("counted per key")));
>>
>> With this triggering, using both the flink local runner the direct
>> runner, *no panes will ever be emitted* if the volume of messages in
>> pubsub is very low. eg 1 per second.
>>
>> If I change the triggering to have early firings I get exactly the
>> emitted panes that you would expect.
>>
>> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>>     .triggering(AfterWatermark.pastEndOfWindow()
>>         .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>             .alignedTo(Duration.standardSeconds(60))))
>>     .withAllowedLateness(Duration.standardSeconds(60))
>>     .discardingFiredPanes())
>>
>> I can use any variation of early firing triggers and they work as
>> expected.
>>
>> We believe that the watermark is not advancing at all when the volume is
>> too low because of the sampling that PubSubIO does to determine it's
>> watermark. It just never has a large enough sample.
>> This problem occurs in the direct runner and flink runner, but not in the
>> dataflow runner (because dataflow uses it's own PubSubIO because dataflow
>> has access to internal details of pubsub and so doesn't need to do any
>> sampling).
>>
>>
>> I have also verified that for high volumes of messages, the PubSubIO
>> *does* successfully advance the watermark. Here's a python script I wrote
>> to mass produce random messages:
>>
>> import json
>> import random
>> from google.cloud import pubsub_v1
>>
>>
>> def publish_loop(n, project_id, topic_name):
>>     publisher = pubsub_v1.PublisherClient()
>>     topic_path = publisher.topic_path(project_id, topic_name)
>>     rand = random.Random()
>>     players = ["eufy"]
>>
>>     for i in range(n):
>>         score = rand.randint(1, 10)
>>         player = rand.choice(players)
>>         message = json.dumps({
>>           "player": player,
>>           "score": score,
>>         })
>>         print("'%s'" % message)
>>         publisher.publish(topic_path, data=message.encode("utf-8"))
>>
>> Running my code without early firings on Dataflow, I verified it does
>> count them as you'd expect.
>>
>> [image: Screen Shot 2019-05-08 at 16.41.02.png]
>>
>> Doing the same using the direct runner, it struggles to process messages
>> at rate they are being produced... but it does eventually close some
>> windows. Here are screenshots of logs with early firings turned on and then
>> off.
>>
>> [image: Screen Shot 2019-05-08 at 17.02.14.png]
>>
>> [image: Screen Shot 2019-05-08 at 17.21.46.png]
>>
>> The key here is that you can see that is logging the ON_TIME panes. This
>> *never* happened for me if the message rate was as low as 1 per second.
>>
>> Has anyone else seen this behaviour, where no ON_TIME panes are emitted
>> when there are low volumes from a PubSubIO (when not using Dataflow)?
>> I believe the details that cause this are within the getWatermark
>> function in PubsubUnboundedSource, but it looks too delicate for me to
>> approach.
>>
>> It's a problem because we ideally want it to behave well at low volumes
>> too, but also because this is often one of the first streaming examples
>> people try. We discovered this while trying to train people on streaming
>> and it was a bit awkward :)
>>
>> Tim Sell
>>
>>
>>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.