You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Chris Heath (Jira)" <ji...@apache.org> on 2019/09/18 02:42:00 UTC

[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps

    [ https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931986#comment-16931986 ] 

Chris Heath commented on BEAM-644:
----------------------------------

I recently asked a somewhat related question on StackOverflow [https://stackoverflow.com/questions/57960673/how-to-switch-time-domains-in-a-beam-pipeline], and after further thought, I believe what is really needed is for the watermark not to be global.  At well-defined points in the pipeline (namely DoFns that call outputWithTimestamp), we need to be able to switch to a completely separate watermark that applies from that point on.  Each watermark in the pipeline progresses independently, at different speeds with separate heuristics.

Our particular application ideally requires 3 time domains with 3 watermarks:  First read data from Google pubsub (timestamps = published date), then parse the JSON payload. From there the pipeline branches: one branch switches to event time (timestamps obtained from JSON payload) and applies event correlations, with late data dropped; the second branch processes parsed events in batches (timestamps = processing time) with no late data ever.

> Primitive to shift the watermark while assigning timestamps
> -----------------------------------------------------------
>
>                 Key: BEAM-644
>                 URL: https://issues.apache.org/jira/browse/BEAM-644
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Kenneth Knowles
>            Priority: Major
>
> There is a general need, especially important in the presence of SplittableDoFn, to be able to assign new timestamps to elements without making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one to produce late data, but does not allow one to shift the watermark so the new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log file that contains elements for the day preceding the log file. The timestamp on the filename must currently be the beginning of the log. If such elements are constantly flowing, it may be OK, but since we don't know that element is coming, in that absence of data, the watermark may advance. We need a way to keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the following pieces:
>  - A constant duration (positive or negative) D by which to shift the watermark.
>  - A function from TimestampedElement<T> to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were mentioned.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)