You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/27 10:49:46 UTC

[GitHub] [beam] scwhittle opened a new issue, #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

scwhittle opened a new issue, #23379:
URL: https://github.com/apache/beam/issues/23379

   ### What would you like to happen?
   
   Currently element lateness is evaluated during windowing/triggering based upon TimerInternals.currentOutputWatermarkTime
   
   There are some cases where users want to handle late records differently. For example, instead of dropping late events or assigning them to their original (now late window) with allowedLateness, the late record could be assigned to a different window, logged and monitored explicitly, or mutated and output differently.
   
   By exposing the currentOutputWatermarkTime on the ProcessContext it would be possible for a DoFn to compare the timestamp of the incoming watermark to it to perform custom handling for late events.  
   The benefit of exposing the timestamp and not just an isLate method is that it would also be possible for a DoFn to set the timestamp of their output to a guaranteed non-late time based upon this value. 
   
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: sdk-java-core


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1263983648

   It cannot be the output watermark, because the timer itself holds the output watermark. The output watermark is a reservation of the ability to output at that timestamp. Anything that intends to produce output needs to reserve the time and impacts the output watermark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] iht commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
iht commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1260578847

   In my experience, this is a need demanded by several users of Beam. For instance, in the Beam book written by @je-ik, there are a couple of examples of how to achieve something similar by filtering out some messages before a window is applied ([example 1](https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter3/src/main/java/com/packtpub/beam/chapter3/DroppableDataFilter.java), [example 2](https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter3/src/main/java/com/packtpub/beam/chapter3/DroppableDataFilter2.java)). The implementation of those examples would be straightforward if the process context would expose the value of the watermark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] je-ik commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
je-ik commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1263686667

   > Basically all conceptual models need to consider windows to just be keys IMO.
   
   This is correct, but window keys have limited life-span. This is what we represent in the model by `window.maxTimestamp`. Generic keys do not have this property.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] je-ik commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
je-ik commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1264032104

   Agree. That is the current approach. But `offset(0)` is something like "immediate fire". There is no hold needed, because there is no invariant to break. Causality is preserved. There are implementation difficulties, I agree. But that is a different discussion. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
scwhittle commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1259325732

   @kennknowles What do you think about this proposal? Do you have some other ideas?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] je-ik commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
je-ik commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1263841450

   We are able to observe input watermark via timers. The input watermark has to be there in the ProcessContext, because event timer with `timer.setRelative().offset(0)` needs to know the value of input watermark. It is questionable if this should not be the output watermark, but currently we use the input one. If the watermark is already present, I think we could make it available. If we are afraid of users misusing it, then we are maybe talking more about the API, then the fact, it already is available, although in a somewhat obfuscated way. But I agree this deserves broader discussion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1263640365

   It would break batch processing, since all windows are processed simultaneously. And that is also of course possible in streaming if there becomes a backlog.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] je-ik commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
je-ik commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1263683779

   Where exactly is the breakage? If the set of open windows is finite (which it is), then there is finite set of open windows with lowest timestamp.
   
   The fact that watermark might represent "wall clock" doesn't mean that it has to advance. That is what I meant with non-negative speed wrt. processing time. The watermark can stay still for any (bounded) amount of processing time, which should solve the batch case. It only cannot travel back in time (excluding failure recovery) - i.e. dTp/dTe >= 0 for all event times Te and processing times Tp.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1261455416

   Just commenting with my thought process around this, and from reading code. (most of this could/should be in public docs but I don't know that it is)
   
    - A window "is" a key with an end time for the purposes of aggregation or stateful processing.
    - Windows are equally meaningful in batch and streaming, and _all_ windows may be processed simultaneously in either mode. Concepts like "the latest window that is still open" do not exist.
    - To the extent possible, and only allowing for nondeterministic variation, batch backfill and experiments should yield equivalent results to streaming.
    - There is no such thing as "late" in the current model, outside the context of an aggregation or stateful processing. For an ordinary ParDo, observing the watermark actually creates a side channel of aggregation, since the watermark is an aggregation, breaking the stateless per-element invariant.
    - Late means that downstream already has the "complete" result so may need to do some fixup work.
    - When writing files with windowed writes, the aggregation that has an end time is aggregating elements to the same file(set).
   
   I am having trouble fitting this request into that framing.
   
   But I have one way to start: when you are talking about concepts like "the latest window that is still open" it means that the time series you are mentally considering is the time series of when elements are ingested by a pipeline, not the time series of when the originating events happened. In that case you need the ability to adjust timestamps to be the ingestion time, for example just using the worker clock. Except you want to do it with something slightly more principled.
   
   This won't work with batch processing of an archive, of course.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1263783149

   I want to be clear that I think the general pattern here is valuable and I would like to do it a non-hacky way. We know that it works with state & timers by buffering and emitting with a watermark timer, right?
   
   We do already let users observe the input watermark approximately via event time timers, and that let's you approximately observe the output watermark from upstream transforms. So I'm not totally opposed to exposing it. But it is a pretty major design decision that needs thought. Probably too much for a feature request comment thread and needing some document exploring possible pitfalls and what the requirements are for it to not allow semantic weirdness, send to dev@ list?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1259875113

   Just to clarify definitions, these days _late_ = element arriving for aggregation after the watermark has exceeded the end of window for that aggregation. Is there a reason that processing late panes is not sufficient here?
   
   My main concern is increasingly directly basing the output of a transform on nondeterministic inputs will break any invariants we might hope for. Like mostly we want code to not be sensitive to this, or to be sensitive to it in well-behaved ways.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1263777770

   OK this is all fine. So a common case will be that a large number of windows are processed simultaneously, and then the watermark makes a large jump (in batch from -inf to +inf, but it can be any large jump) causing them all to be completed. My main point is that we need to avoid mental models that think of them being processed one after the other. There is no "next" window, and the idea of there being a "latest open window" is not very useful. I think that your points are all fine, but the goal here is still not obviously doable.
   
   The user basically wants fixed windows that are processed "one after the other" and for elements to be assigned to the "current" window with just a flag that indicates whether they really belong to that window or whether they "should have" been put in a window that they are too late for.
   
   State & timers are a good way to do this, with a buffer that accepts all elements and them emits when the timer fires. It is easy to label the elements as to whether they "should" be in this timer firing or they arrived too late for their timer firing.
   
   The trouble is that the user wants to do this with WriteFiles withWindowedWrites, which depends on the window mechanism. I would say that the problem is this mismatch. Their use case is easy to express with state & timers and does not fit well with windows, but the file sink is tightly coupled to windowing.
   
   @scwhittle if you do the thing with state & timers and then just assign each output to the window for that timer's timestamp, does that work? I suppose there is likely a problem with large iterable elements being too inefficient?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] je-ik commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
je-ik commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1263253868

   Do we have these definitions "set in stone"? I'd offer an alternative definition of watermark - not actually being an aggregation of "elements", it is rather an aggregation of causality. I tried to explain it [here](https://twitter.com/janl_apache/status/1478757956263071745?s=20&t=4SarwNn0_gNXd35FlK-pYg).
   
   The basic idea is that the invariant is not the actual value of watermark is what is important. The important part is that causality must be preserved. If event B is caused by event B in any upstream partition of any transform (e.g. source), then the same order of these events (that is, first is observed event A and only then event B) must be preserved in *all* downstream partitions of *all* transforms and this must hold even in the presence of failures.
   
   With this definition we can view watermark as an "event-time wall clock", This wall clock may move at any non-negative speed wrt to processing-time. This definition also gives sense to "last open window", because the "window close" is simple an event as any other, having assigned timestamp that can be compared to this wall-clock.
   
   Would this definition brake any concepts we currently use?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1261441787

   I understand these perspectives that there are use cases that can be implemented with this feature. That doesn't actually address the concern that it allows a lot of unprincipled behavior so the basic meaning of the pieces of the model are potentially no longer reliable building blocks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
scwhittle commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1259960289

   In this particular case, the windowing/triggering is evaluated as part of writing files with windowed writes.  In this case late events want to be marked as late (counter/log/modify event) but written to a window that will still be read by a downstream system using the watermark to finalize windowed directories.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #23379: [Feature Request]: Expose TimerStateInternals.currentOutputWatermarkTime to allow for DoFns to handle elements behind the watemark differently

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #23379:
URL: https://github.com/apache/beam/issues/23379#issuecomment-1263640911

   Basically all conceptual models need to consider windows to just be keys IMO.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org