You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Paweł Paprota <pp...@fastmail.fm> on 2023/12/13 07:32:27 UTC

[Question] Wait.on() transform in a streaming pipeline - window not closing until new data arrives?

Hello all,

I have a streaming pipeline with a JmsIO source and downstream there are some custom transforms that include the Wait.on() transform. This is done to wait on certain things like writing to database before publishing messages to JMS at the end of my pipeline.

This custom transform is also used in a batch pipeline and it works perfectly, waiting works as expected and all is good.

However, as I mentioned, I am now using this custom transform in a streaming pipeline and it appears that the Wait.on() never "finishes" until the watermark gets past the end of the window. (I am using a window because the custom transform has GroupByKey in it; and also it uses FileIO.writeDynamic which uses GroupByKey internally it seems.)

I realize this is how Wait.on() (and the trigger Never.ever() used inside) are supposed to work but then I wonder how to make it work in a streaming setup? I tried with different window types and triggers, but that doesn't seem to make any difference.

The only thing that "unblocks" the Wait.on() is simply moving the watermark further - by pushing more data to the JMS source. In my use case, the source is not always producing data constantly, sometimes it is just a big burst of messages once a day. If those messages fit into one window, processing would hold them until next batch of data arrives (possibly, next day) which is not desired - I would like that it basically works as in batch mode, so the whole custom transform is executed for each window - together with Wait.on() working and expected, and subsequent transforms after waiting.

Does it make sense or am I missing something in the general design of windowing/triggers/watermark/Wait.on/...?

-- 
Paweł

Re: [Question] Wait.on() transform in a streaming pipeline - window not closing until new data arrives?

Posted by Paweł Paprota <pp...@fastmail.fm>.
Hello all again,

Sorry for spamming the list but after sending this message I realized that I've seen other unbounded sources before moving the watermark forward even as there was no data coming in. So I checked JmsIO and it has this:

    @Override
    public Instant getWatermark() {
      return checkpointMark.getOldestMessageTimestamp();
    }

I'm not sure how exactly the checkpointing logic works but then I checked one of our Dataflow jobs running for weeks now consuming from JMS and I see this in the GCP UI:

Data watermark: 12. December 2023 um 17:15:05 UTC+1

If this information can be trusted, I guess that's the root cause of my issue? I mean, if the watermark is stuck on the oldest message from the checkpoint, and no new data arrives, then it simply won't be moved forward until new data arrives, do I get this right?

Then I also checked some other sources how they implement the getWatermark method and I see stuff like org.apache.beam.sdk.io.kinesis.WatermarkPolicy and the watermarkIdleDurationThreshold thing which looks like what I would expect to be able to do as the user.

So in the end, could it be that JmsIO needs some changes to accommodate my use case? To be honest, I've had also many other issues with JmsIO[1][2][3] and high interest in one of the ongoing (but, sadly, now stale...) PRs[4], but that's a topic for a different thread...

As we are heavy users of this source, I could maybe carve out some time to clean up my clumsy patches and get them reviewed at least.

Paweł

[1] https://github.com/apache/beam/issues/27485
[2] https://github.com/apache/beam/issues/18857
[3] https://github.com/apache/beam/issues/27485
[4] https://github.com/apache/beam/pull/27313

On Wed, Dec 13, 2023, at 08:32, Paweł Paprota wrote:
> Hello all,
>
> I have a streaming pipeline with a JmsIO source and downstream there 
> are some custom transforms that include the Wait.on() transform. This 
> is done to wait on certain things like writing to database before 
> publishing messages to JMS at the end of my pipeline.
>
> This custom transform is also used in a batch pipeline and it works 
> perfectly, waiting works as expected and all is good.
>
> However, as I mentioned, I am now using this custom transform in a 
> streaming pipeline and it appears that the Wait.on() never "finishes" 
> until the watermark gets past the end of the window. (I am using a 
> window because the custom transform has GroupByKey in it; and also it 
> uses FileIO.writeDynamic which uses GroupByKey internally it seems.)
>
> I realize this is how Wait.on() (and the trigger Never.ever() used 
> inside) are supposed to work but then I wonder how to make it work in a 
> streaming setup? I tried with different window types and triggers, but 
> that doesn't seem to make any difference.
>
> The only thing that "unblocks" the Wait.on() is simply moving the 
> watermark further - by pushing more data to the JMS source. In my use 
> case, the source is not always producing data constantly, sometimes it 
> is just a big burst of messages once a day. If those messages fit into 
> one window, processing would hold them until next batch of data arrives 
> (possibly, next day) which is not desired - I would like that it 
> basically works as in batch mode, so the whole custom transform is 
> executed for each window - together with Wait.on() working and 
> expected, and subsequent transforms after waiting.
>
> Does it make sense or am I missing something in the general design of 
> windowing/triggers/watermark/Wait.on/...?
>
> -- 
> Paweł