You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jan Lukavský <je...@seznam.cz> on 2019/11/01 09:07:17 UTC

Re: Python SDK timestamp precision

 > Yes, this is the "minus epsilon" idea, but assigning this as a bit on 
the WindowedValue rather than on the Timestamp itself. This means that 
pulling the timestamp out then re-assigning it would be lossy. (As a 
basic example, imaging the batching DoFn that batches up elements (with 
their respective metadata), calls an external service on the full batch, 
and then emits the results (with the appropriate, cached, metadata).

I'm not sure it I follow - I'd say that in the example the "limit bit" 
could be part of the metadata, so that if the external service returns 
response, the limiting bit could be be added back. Actually a PTransform 
for manipulating of this limit bit should be accessible by users, 
because it can be useful when reading data from external service (e.g. 
kafka).

Yes, the current approach of subtracting 1 (millisecond, nanosecond, 
picosecond, whatever) ensures that we don't have to hold this metadata, 
because it is part of the timestamp. But we are not modeling the reality 
correctly. In reality the output for a window with range 
<02:00:00,03:00:00) (half-closed) cannot happen *before* 03:00:00. Yes, 
it's validity interval might depend on a precision - 
<02:00:00-02:59:59.999> in case of milliseconds 
<02:00:00-02:59:59.999999999> in case of nanoseconds - that is only due 
to numerical limitations. Neither of that is correct and that is what 
brings this timestamp precision into question. This only arises from the 
fact, that we cannot preserve causality of events with the same 
(numerical value of) timestamp correctly. That's why I'd say if would be 
better to actually fix how we treat the reality. Whether it would be by 
creating Beam's Timestamp with this ability, or adding this to 
WindowedValue along with the other metadata (timestamp, window, pane, 
...) is an implementation detail. Still there is a question if we can 
actually do that, because that would affect output timestamps of user 
transforms (correct them actually, but even though it is a breaking change).

Jan

On 10/31/19 6:07 PM, Robert Bradshaw wrote:
> On Thu, Oct 31, 2019 at 1:49 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>>   > This is quite an interesting idea. In some sense, timestamps become
>> like interval windows, and window assignment is similar to the window
>> mapping fns that we use for side inputs. I still think the idea of a
>> timestmap for an element and a window for an element is needed (e.g. one
>> can have elements in a single window, especially the global window that
>> have different timestamps), but this could be interesting to explore. It
>> could definitely get rid of the "minus epsilon" weirdness, though I
>> don't think it completely solves the granularity issues.
>>
>> Thinking about this a little more - maybe we actually don't need a time
>> interval (not sure), it might be sufficient to actually add a single bit
>> to the WindowedValue. That bit would be "the timestamp it PRECISELY this
>> one" or "the timestamp is limiting value". This flag would have to
>> propagate to timer settings (that is "set timer to exactly timestamp T"
>> or "set timer as close to T as possible"). Then window timers at the end
>> of window would set timers with this "limiting" setting (note that
>> window.maxTimestamp() would have to change definition to be the first
>> timestamp strictly greater than any timestamp that belongs to the window
>> - it will actually be the first timestamp of new window with "limit"
>> flag on). The impact on timers would be that events fired from @OnTimer
>> would just propagate the flag to the WindowedValue being output.
>>
>> That way it seems to not matter how SDK internally handles time
>> precision, as it would be transparent (at least seems to me). It is
>> actually precisely what you proposed as "minus epsilon", only taken to
>> the extreme. Looks useful to me and seems not that hard to implement.
>> Although it would be of course a somewhat breaking change, because
>> outputs of windows would become "3:00:00.000" instead of "2:59:59.999"
>> (but I like the first one much more! :))
> Yes, this is the "minus epsilon" idea, but assigning this as a bit on
> the WindowedValue rather than on the Timestamp itself. This means that
> pulling the timestamp out then re-assigning it would be lossy. (As a
> basic example, imaging the batching DoFn that batches up elements
> (with their respective metadata), calls an external service on the
> full batch, and then emits the results (with the appropriate, cached,
> metadata).
>
>> On 10/30/19 10:32 PM, Robert Bradshaw wrote:
>>> On Wed, Oct 30, 2019 at 2:00 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>> TL;DR - can we solve this by representing aggregations as not point-wise
>>>> events in time, but time ranges? Explanation below.
>>>>
>>>> Hi,
>>>>
>>>> this is pretty interesting from a theoretical point of view. The
>>>> question generally seems to be - having two events, can I reliably order
>>>> them? One event might be end of one window and the other event might be
>>>> start of another window. There is strictly required causality in this
>>>> (although these events in fact have the same limiting timestamp). On the
>>>> other hand, when I have two (random) point-wise events (one with
>>>> timestamp T1 and the other with timestamp T2), then causality of these
>>>> two depend on distance in space. If these two events differ by single
>>>> nanosecond, then if they do not originate very "close" to each other,
>>>> then there is high probability that different observers will see them in
>>>> different order (and hence there is no causality possible and the order
>>>> doesn't matter).
>>>>
>>>> That is to say - if I'm dealing with single external event (someone
>>>> tells me something has happened at time T), then - around the boundaries
>>>> of windows - it doesn't matter which window is this event placed into.
>>>> There is no causal connection between start of window and the event.
>>> I resolve this theoretical issue by working in a space with a single
>>> time dimension and zero spatial dimensions. That is, the location of
>>> an event is completely determined by a single coordinate in time, and
>>> distance and causality are hence well defined for all possible
>>> observers :). Realistically, there is both error and ambiguity in
>>> assigning absolute timestamps to real-world events, but it's important
>>> that this choice of ordering should be preserved (e.g. not compressed
>>> away) by the system.
>>>
>>>> Different situation is when we actually perform an aggregation on
>>>> multiple elements (GBK) - then although we produce output at certain
>>>> event-time timestamp, this event is not "point-wise external", but is a
>>>> running aggregation on multiple (and possibly long term) data. That is
>>>> of course causally preceding opening a new (tumbling) window.
>>>>
>>>> The question now is - could we extend WindowedValue so that it doesn't
>>>> contain only single point-wise event in time, but a time range in case
>>>> of aggregations? Then the example of Window.into -> GBK -> Window.into
>>>> could actually behave correctly (respecting causality) and another
>>>> positive thing could be, that the resulting timestamp of the aggregation
>>>> would no longer have to be window.maxTimestamp - 1 (which is kind of
>>>> strange).
>>>>
>>>> This might be actually equivalent to the "minus epsilon" approach, but
>>>> maybe better understandable. Moreover, this should be probably available
>>>> to users, because one can easily get to the same problems when using
>>>> stateful ParDo to perform a GBK-like aggregation.
>>>>
>>>> Thoughts?
>>> This is quite an interesting idea. In some sense, timestamps become
>>> like interval windows, and window assignment is similar to the window
>>> mapping fns that we use for side inputs. I still think the idea of a
>>> timestmap for an element and a window for an element is needed (e.g.
>>> one can have elements in a single window, especially the global window
>>> that have different timestamps), but this could be interesting to
>>> explore. It could definitely get rid of the "minus epsilon" weirdness,
>>> though I don't think it completely solves the granularity issues.
>>>
>>>> On 10/30/19 1:05 AM, Robert Bradshaw wrote:
>>>>> On Tue, Oct 29, 2019 at 4:20 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>>>> Point (1) is compelling. Solutions to the "minus epsilon" seem a bit complex. On the other hand, an opaque and abstract Timestamp type (in each SDK) going forward seems like a Pretty Good Idea (tm). Would you really have to go floating point? Could you just have a distinguished representation for non-inclusive upper/lower bounds? These could be at the same reduced resolution as timestamps in element metadata, since that is all they are compared against.
>>>>> If I were coming up with an abstract, opaque representation of
>>>>> Timestamp (and Duration) for Beam, I would explicitly include the
>>>>> "minus epsilon" concept. One could still do arithmetic with these.
>>>>> This would make any conversion to standard datetime libraries lossy
>>>>> though.
>>>>>
>>>>>> Point (2) is also good, though it seems like something that could be cleverly engineered and/or we just provide one implementation and it is easy to make your own for finer granularity, since a WindowFn separately receives the Timestamp (here I'm pretending it is abstract and opaque and likely approximate) and the original element with whatever precision the original data included.
>>>>> Yes, but I don't see how a generic WindowFn would reach into the
>>>>> (arbitrary) element and pull out this original data. One of the
>>>>> benefits of the Beam model is that the WindowFn does not have to
>>>>> depend on the element type.
>>>>>
>>>>>> Point (3) the model/runner owns the timestamp metadata so I feel fine about it being approximated as long as any original user data is still present. I don't recall seeing a compelling case where the timestamp metadata that the runner tracks and understands is required to be exactly the same as a user value (assuming users understand this distinction, which is another issue that I would separate from whether it is technically feasible).
>>>>> As we provide the ability to designate user data as the runner
>>>>> timestamp against which to window, and promote the runner timestamp
>>>>> back to user data (people are going to want to get DateTime or Instant
>>>>> objects out of it), it seems tricky to explain to users that one or
>>>>> both of these operations may be lossy (and, in addition, I don't think
>>>>> there's a consistently safe direction to round).
>>>>>
>>>>>> The more I think about the very real problems you point out, the more I think that our backwards-incompatible move should be to our own abstract Timestamp type, putting the design decision behind a minimal interface. If we see a concrete design for that data type, we might be inspired how to support more possibilities.
>>>>>>
>>>>>> As for the rest of the speculation... moving to nanos immediately helps users so I am now +1 on just doing it, or moving ahead with an abstract data type under the assumption that it will basically be nanos under the hood.
>>>>> If the fact that it's stored as nanos under the hood leaks out (and I
>>>>> have trouble seeing how it won't) I'd lean towards just using them
>>>>> directly (e.g. Java Instant) rather than wrapping it.
>>>>>
>>>>>> Having a cleverly resolution-independent system is interesting and maybe extremely future proof but maybe preparing for a very distant future that may never come.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Fri, Oct 18, 2019 at 11:35 AM Robert Bradshaw <ro...@google.com> wrote:
>>>>>>> TL;DR: We should just settle on nanosecond precision ubiquitously for timestamp/windowing in Beam.
>>>>>>>
>>>>>>>
>>>>>>> Re-visiting this discussion in light of cross-language transforms and runners, and trying to tighten up testing. I've spent some more time thinking about how we could make these operations granularity-agnostic, but just can't find a good solution. In particular, the sticklers seem to be:
>>>>>>>
>>>>>>> (1) Windows are half-open intervals, and the timestamp associated with a window coming out of a GBK is (by default) as large as possible but must live in that window. (Otherwise WindowInto + GBK + WindowInto would have the unforunate effect of moving aggregate values into subsequent windows, which is clearly not the intent.) In other words, the timestamp of a grouped value is basically End(Window) - epsilon. Unless we choose a representation able to encode "minus epsilon" we must agree on a granularity.
>>>>>>>
>>>>>>> (2) Unless we want to have multiple vairants of all our WindowFns (e.g. FixedWindowMillis, FixedWindowMicros, FixedWindowNanos) we must agree on a granularity with which to parameterize these well-known operations. There are cases (e.g. side input window mapping, merging) where these Fns may be used downstream in contexts other than where they are applied/defined.
>>>>>>>
>>>>>>> (3) Reification of the timestamp into user-visible data, and the other way around, require a choice of precision to expose to the user. This means that the timestamp is actual data, and truncating/rounding cannot be done implicitly. Also round trip of reification and application of timestamps should hopefully be idempotent no matter the SDK.
>>>>>>>
>>>>>>> The closest I've come is possibly parameterizing the timestamp type, where encoding, decoding (including pulling the end out of a window?), comparison (against each other and a watermark), "minus epsilon", etc could be UDFs. Possibly we'd need the full set of arithmetic operations to implement FixedWindows on an unknown timestamp type. Reification would simply be dis-allowed (or return an opaque rather than SDK-native) type if the SDK did not know that window type. The fact that one might need comparison between timestamps of different types, or (lossless) coercion from one type to another, means that timestamp types need to know about each other, or another entity needs to know about the full cross-product, unless there is a common base-type (at which point we might as well always choose that).
>>>>>>>
>>>>>>> An intermediate solution is to settle on floating (decimal) point representation, plus a "minus-epsiloin" bit. It wouldn't quite solve the mapping through SDK-native types (which could require rounding or errors or a new opaque type, and few date librarys could faithfully expose the minus epsilon part). It might also be more expensive (compute and storage), and would not allow us to use the protofuf timestamp/duration fields (or any standard date/time libraries).
>>>>>>>
>>>>>>> Unless we can come up with a clean solution to the issues above shortly, I think we should fix a precision and move forward. If this makes sense to everyone, then we can start talking about the specific choice of precision and a migration path (possibly only for portability).
>>>>>>>
>>>>>>>
>>>>>>> For reference, the manipulations we do on timestamps are:
>>>>>>>
>>>>>>> WindowInto: Timestamp -> Window
>>>>>>> TimestampCombine: Window, [Timestamp] -> Timestamp
>>>>>>>        End(Window)
>>>>>>>        Min(Timestamps)
>>>>>>>        Max(Timestamps)
>>>>>>> PastEndOfWindow: Watermark, Window -> {True, False}
>>>>>>>
>>>>>>> [SideInput]WindowMappingFn: Window -> Window
>>>>>>>        WindowInto(End(Window))
>>>>>>>
>>>>>>> GetTimestamp: Timestamp -> SDK Native Object
>>>>>>> EmitAtTimestamp: SDK Native Object -> Timestamp
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, May 10, 2019 at 1:33 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>>>>> On Thu, May 9, 2019 at 9:32 AM PM Kenneth Knowles <ke...@apache.org> wrote:
>>>>>>>>
>>>>>>>>> From: Robert Bradshaw <ro...@google.com>
>>>>>>>>> Date: Wed, May 8, 2019 at 3:00 PM
>>>>>>>>> To: dev
>>>>>>>>>
>>>>>>>>>> From: Kenneth Knowles <ke...@apache.org>
>>>>>>>>>> Date: Wed, May 8, 2019 at 6:50 PM
>>>>>>>>>> To: dev
>>>>>>>>>>
>>>>>>>>>>>> The end-of-window, for firing, can be approximate, but it seems it
>>>>>>>>>>>> should be exact for timestamp assignment of the result (and similarly
>>>>>>>>>>>> with the other timestamp combiners).
>>>>>>>>>>> I was thinking that the window itself should be stored as exact data, while just the firing itself is approximated, since it already is, because of watermarks and timers.
>>>>>>>>>> I think this works where we can compare encoded windows, but some
>>>>>>>>>> portable interpretation of windows is required for runner-side
>>>>>>>>>> implementation of merging windows (for example).
>>>>>>>>> But in this case, you've recognized the URN of the WindowFn anyhow, so you understand its windows. Remembering that IntervalWindow is just one choice, and that windows themselves are totally user-defined and that merging logic is completely arbitrary per WindowFn (we probably should have some restrictions, but see https://issues.apache.org/jira/browse/BEAM-654). So I file this use case in the "runner knows everything about the WindowFn and Window type and window encoding anyhow".
>>>>>>>> Being able to merge common windows in the runner is just an
>>>>>>>> optimization, but an important one (especially for bootstrapping
>>>>>>>> SDKs). However, this is not just about runner to SDK, but SDK to SDK
>>>>>>>> as well (where a user from one SDK may want to inspect the windows
>>>>>>>> produced by another). Having MillisIntervalWindow,
>>>>>>>> MicrosIntervalWindow, NanosIntervalWindow, etc. isn't a path that I
>>>>>>>> think is worth going down.
>>>>>>>>
>>>>>>>> Yes, we need to solve the "extract the endpoint of an unknown encoded
>>>>>>>> window" problem as well, possibly similar to what we do with length
>>>>>>>> prefix coders, possibly a restriction on window encodings themselves.
>>>>>>>>
>>>>>>>>>> There may also be issues if windows (or timestamps) are assigned to a
>>>>>>>>>> high precision in one SDK, then inspected/acted on in another SDK, and
>>>>>>>>>> then passed back to the original SDK where the truncation would be
>>>>>>>>>> visible.
>>>>>>>>> This is pretty interesting and complex. But again, a window is just data. An SDK has to know how to deserialize it to operate on it. Unless we do actually standardize some aspects of it. I don't believe BoundedWindow encoding has a defined way to get the timestamp without decoding the window, does it? I thought we had basically default to all InternalWindows. But I am not following that closely.
>>>>>>>>>
>>>>>>>>>>> You raise a good point that min/max timestamp combiners require actually understanding the higher-precision timestamp. I can think of a couple things to do. One is the old "standardize all 3 or for precisions we need" and the other is that combiners other than EOW exist primarily to hold the watermark, and that hold does not require the original precision. Still, neither of these is that satisfying.
>>>>>>>>>> In the current model, the output timestamp is user-visible.
>>>>>>>>> But as long as the watermark hold is less, it is safe. It requires knowing the coarse-precision lower bound of the timestamps of the input. And there may be situations where you also want the coarse upper bound. But you do know that these are at most one millisecond apart (assuming the runner is in millis) so perhaps no storage overhead. But a lot of complexity and chances for off by ones. And this is pretty hand-wavy.
>>>>>>>> Yeah. A different SDK may (implicitly or explicitly) ask for the
>>>>>>>> timestamp of the (transitive) output of a GBK, for which an
>>>>>>>> approximation (either way) is undesirable.
>>>>>>>>
>>>>>>>>>>>>> A correction: Java *now* uses nanoseconds [1]. It uses the same breakdown as proto (int64 seconds since epoch + int32 nanos within second). It has legacy classes that use milliseconds, and Joda itself now encourages moving back to Java's new Instant type. Nanoseconds should complicate the arithmetic only for the one person authoring the date library, which they have already done.
>>>>>>>>>>>> The encoding and decoding need to be done in a language-consistent way
>>>>>>>>>>>> as well.
>>>>>>>>>>> I honestly am not sure what you mean by "language-consistent" here.
>>>>>>>>>> If we want to make reading and writing of timestamps, windows
>>>>>>>>>> cross-language, we can't rely on language-specific libraries to do the
>>>>>>>>>> encoding.
>>>>>>>>>>
>>>>>>>>>>>> Also, most date libraries don't division, etc. operators, so
>>>>>>>>>>>> we have to do that as well. Not that it should be *that* hard.
>>>>>>>>>>> If the libraries dedicated to time handling haven't found it needful, is there a specific reason you raise this? We do some simple math to find the window things fall into; is that it?
>>>>>>>>>> Yes. E.g.
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java#L77
>>>>>>>>>>
>>>>>>>>>> would be a lot messier if there were no mapping date libraries to raw
>>>>>>>>>> ints that we can do arithmetic on. Writing this with the (seconds,
>>>>>>>>>> nanos) representation is painful. But I suppose we'd only have to do
>>>>>>>>>> it once per SDK.
>>>>>>>>> Yea I think that arithmetic is not so bad. But this raises the issue of writing a *generic* WindowFn where its idea of timestamp granularity (the WindowFn owns the window type and encoding) may not match the user data coming in. So you need to apply the approximation function to provide type-correct input to the WindowFn. That's kind of exciting and weird and perhaps unsolvable, except by choosing a concrete granularity.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>>>>>>> It would also be really nice to clean up the infinite-future being the
>>>>>>>>>>>>>> somewhat arbitrary max micros rounded to millis, and
>>>>>>>>>>>>>> end-of-global-window being infinite-future minus 1 hour (IIRC), etc.
>>>>>>>>>>>>>> as well as the ugly logic in Python to cope with millis-micros
>>>>>>>>>>>>>> conversion.
>>>>>>>>>>>>> I actually don't have a problem with this. If you are trying to keep the representation compact, not add bytes on top of instants, then you just have to choose magic numbers, right?
>>>>>>>>>>>> It's not about compactness, it's the (historically-derived?)
>>>>>>>>>>>> arbitrariness of the numbers.
>>>>>>>>>>> What I mean is that the only reason to fit them into an integer at all is compactness. Otherwise, you could use a proper disjoint union representing your intent directly, and all fiddling goes away, like `Timestamp ::= PosInf | NegInf | EndOfGlobalWindow | ActualTime(Instant)`. It costs a couple of bits.
>>>>>>>>>> The other cost is not being able to use standard libraries to
>>>>>>>>>> represent all of your timestamps.
>>>>>>>>>>
>>>>>>>>>>>> For example, the bounds are chosen to
>>>>>>>>>>>> fit within 64-bit mircos despite milliseconds being the "chosen"
>>>>>>>>>>>> granularity, and care was taken that
>>>>>>>>>>>>
>>>>>>>>>>>>        WindowInto(Global) | GBK | WindowInto(Minute) | GBK
>>>>>>>>>>>>
>>>>>>>>>>>> works, but
>>>>>>>>>>>>
>>>>>>>>>>>>        WindowInto(Global) | GBK | WindowInto(Day) | GBK
>>>>>>>>>>>>
>>>>>>>>>>>> may produce elements with timestamps greater than MaxTimestamp.
>>>>>>>>>>>>
>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1] https://docs.oracle.com/javase/8/docs/api/java/time/Instant.html
>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Apr 17, 2019 at 3:13 PM Robert Burke <ro...@frantil.com> wrote:
>>>>>>>>>>>>>>>> +1 for plan B. Nano second precision on windowing seems... a little much for a system that's aggregating data over time. Even for processing say particle super collider data, they'd get away with artificially increasing the granularity in batch settings.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Now if they were streaming... they'd probably want femtoseconds anyway.
>>>>>>>>>>>>>>>> The point is, we should see if users demand it before adding in the necessary work.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, 17 Apr 2019 at 14:26, Chamikara Jayalath <ch...@google.com> wrote:
>>>>>>>>>>>>>>>>> +1 for plan B as well. I think it's important to make timestamp precision consistent now without introducing surprising behaviors for existing users. But we should move towards a higher granularity timestamp precision in the long run to support use-cases that Beam users otherwise might miss out (on a runner that supports such precision).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> - Cham
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2019 at 1:35 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>>>>>>>>>>>>>> I also like Plan B because in the cross language case, the pipeline would not work since every party (Runners & SDKs) would have to be aware of the new beam:coder:windowed_value:v2 coder. Plan A has the property where if the SDK/Runner wasn't updated then it may start truncating the timestamps unexpectedly.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2019 at 1:24 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>>>>>>>>>>>>>>> Kenn, this discussion is about the precision of the timestamp in the user data. As you had mentioned, Runners need not have the same granularity of user data as long as they correctly round the timestamp to guarantee that triggers are executed correctly but the user data should have the same precision across SDKs otherwise user data timestamps will be truncated in cross language scenarios.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Based on the systems that were listed, either microsecond or nanosecond would make sense. The issue with changing the precision is that all Beam runners except for possibly Beam Python on Dataflow are using millisecond precision since they are all using the same Java Runner windowing/trigger logic.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Plan A: Swap precision to nanosecond
>>>>>>>>>>>>>>>>>>> 1) Change the Python SDK to only expose millisecond precision timestamps (do now)
>>>>>>>>>>>>>>>>>>> 2) Change the user data encoding to support nanosecond precision (do now)
>>>>>>>>>>>>>>>>>>> 3) Swap runner libraries to be nanosecond precision aware updating all window/triggering logic (do later)
>>>>>>>>>>>>>>>>>>> 4) Swap SDKs to expose nanosecond precision (do later)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Plan B:
>>>>>>>>>>>>>>>>>>> 1) Change the Python SDK to only expose millisecond precision timestamps and keep the data encoding as is (do now)
>>>>>>>>>>>>>>>>>>> (We could add greater precision later to plan B by creating a new version beam:coder:windowed_value:v2 which would be nanosecond and would require runners to correctly perform an internal conversions for windowing/triggering.)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I think we should go with Plan B and when users request greater precision we can make that an explicit effort. What do people think?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2019 at 5:43 AM Maximilian Michels <mx...@apache.org> wrote:
>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for taking care of this issue in the Python SDK, Thomas!
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> It would be nice to have a uniform precision for timestamps but, as Kenn
>>>>>>>>>>>>>>>>>>>> pointed out, timestamps are extracted from systems that have different
>>>>>>>>>>>>>>>>>>>> precision.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> To add to the list: Flink - milliseconds
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> After all, it doesn't matter as long as there is sufficient precision
>>>>>>>>>>>>>>>>>>>> and conversions are done correctly.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I think we could improve the situation by at least adding a
>>>>>>>>>>>>>>>>>>>> "milliseconds" constructor to the Python SDK's Timestamp.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>> Max
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On 17.04.19 04:13, Kenneth Knowles wrote:
>>>>>>>>>>>>>>>>>>>>> I am not so sure this is a good idea. Here are some systems and their
>>>>>>>>>>>>>>>>>>>>> precision:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Arrow - microseconds
>>>>>>>>>>>>>>>>>>>>> BigQuery - microseconds
>>>>>>>>>>>>>>>>>>>>> New Java instant - nanoseconds
>>>>>>>>>>>>>>>>>>>>> Firestore - microseconds
>>>>>>>>>>>>>>>>>>>>> Protobuf - nanoseconds
>>>>>>>>>>>>>>>>>>>>> Dataflow backend - microseconds
>>>>>>>>>>>>>>>>>>>>> Postgresql - microseconds
>>>>>>>>>>>>>>>>>>>>> Pubsub publish time - nanoseconds
>>>>>>>>>>>>>>>>>>>>> MSSQL datetime2 - 100 nanoseconds (original datetime about 3 millis)
>>>>>>>>>>>>>>>>>>>>> Cassandra - milliseconds
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> IMO it is important to be able to treat any of these as a Beam
>>>>>>>>>>>>>>>>>>>>> timestamp, even though they aren't all streaming. Who knows when we
>>>>>>>>>>>>>>>>>>>>> might be ingesting a streamed changelog, or using them for reprocessing
>>>>>>>>>>>>>>>>>>>>> an archived stream. I think for this purpose we either should
>>>>>>>>>>>>>>>>>>>>> standardize on nanoseconds or make the runner's resolution independent
>>>>>>>>>>>>>>>>>>>>> of the data representation.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I've had some offline conversations about this. I think we can have
>>>>>>>>>>>>>>>>>>>>> higher-than-runner precision in the user data, and allow WindowFns and
>>>>>>>>>>>>>>>>>>>>> DoFns to operate on this higher-than-runner precision data, and still
>>>>>>>>>>>>>>>>>>>>> have consistent watermark treatment. Watermarks are just bounds, after all.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Tue, Apr 16, 2019 at 6:48 PM Thomas Weise <thw@apache.org
>>>>>>>>>>>>>>>>>>>>> <ma...@apache.org>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>        The Python SDK currently uses timestamps in microsecond resolution
>>>>>>>>>>>>>>>>>>>>>        while Java SDK, as most would probably expect, uses milliseconds.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>        This causes a few difficulties with portability (Python coders need
>>>>>>>>>>>>>>>>>>>>>        to convert to millis for WindowedValue and Timers, which is related
>>>>>>>>>>>>>>>>>>>>>        to a bug I'm looking into:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>        https://issues.apache.org/jira/browse/BEAM-7035
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>        As Luke pointed out, the issue was previously discussed:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>        https://issues.apache.org/jira/browse/BEAM-1524
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>        I'm not privy to the reasons why we decided to go with micros in
>>>>>>>>>>>>>>>>>>>>>        first place, but would it be too big of a change or impractical for
>>>>>>>>>>>>>>>>>>>>>        other reasons to switch Python SDK to millis before it gets more users?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>        Thanks,
>>>>>>>>>>>>>>>>>>>>>        Thomas
>>>>>>>>>>>>>>>>>>>>>

Re: Python SDK timestamp precision

Posted by Robert Bradshaw <ro...@google.com>.
On Fri, Nov 1, 2019 at 2:17 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>  > Yes, this is the "minus epsilon" idea, but assigning this as a bit on
> the WindowedValue rather than on the Timestamp itself. This means that
> pulling the timestamp out then re-assigning it would be lossy. (As a
> basic example, imaging the batching DoFn that batches up elements (with
> their respective metadata), calls an external service on the full batch,
> and then emits the results (with the appropriate, cached, metadata).
>
> I'm not sure it I follow - I'd say that in the example the "limit bit"
> could be part of the metadata, so that if the external service returns
> response, the limiting bit could be be added back. Actually a PTransform
> for manipulating of this limit bit should be accessible by users,
> because it can be useful when reading data from external service (e.g.
> kafka).
>
> Yes, the current approach of subtracting 1 (millisecond, nanosecond,
> picosecond, whatever) ensures that we don't have to hold this metadata,
> because it is part of the timestamp. But we are not modeling the reality
> correctly. In reality the output for a window with range
> <02:00:00,03:00:00) (half-closed) cannot happen *before* 03:00:00. Yes,
> it's validity interval might depend on a precision -
> <02:00:00-02:59:59.999> in case of milliseconds
> <02:00:00-02:59:59.999999999> in case of nanoseconds - that is only due
> to numerical limitations. Neither of that is correct and that is what
> brings this timestamp precision into question. This only arises from the
> fact, that we cannot preserve causality of events with the same
> (numerical value of) timestamp correctly. That's why I'd say if would be
> better to actually fix how we treat the reality. Whether it would be by
> creating Beam's Timestamp with this ability, or adding this to
> WindowedValue along with the other metadata (timestamp, window, pane,
> ...) is an implementation detail. Still there is a question if we can
> actually do that, because that would affect output timestamps of user
> transforms (correct them actually, but even though it is a breaking change).

This is point (3) above--I think it would be very surprising if
extracting and then immediately setting the timestamp is anything but
a no-op. If native libraries (like Java Instant) had a notion of
minus-epsilon than I would want to use it.

> On 10/31/19 6:07 PM, Robert Bradshaw wrote:
> > On Thu, Oct 31, 2019 at 1:49 AM Jan Lukavský <je...@seznam.cz> wrote:
> >
> >>   > This is quite an interesting idea. In some sense, timestamps become
> >> like interval windows, and window assignment is similar to the window
> >> mapping fns that we use for side inputs. I still think the idea of a
> >> timestmap for an element and a window for an element is needed (e.g. one
> >> can have elements in a single window, especially the global window that
> >> have different timestamps), but this could be interesting to explore. It
> >> could definitely get rid of the "minus epsilon" weirdness, though I
> >> don't think it completely solves the granularity issues.
> >>
> >> Thinking about this a little more - maybe we actually don't need a time
> >> interval (not sure), it might be sufficient to actually add a single bit
> >> to the WindowedValue. That bit would be "the timestamp it PRECISELY this
> >> one" or "the timestamp is limiting value". This flag would have to
> >> propagate to timer settings (that is "set timer to exactly timestamp T"
> >> or "set timer as close to T as possible"). Then window timers at the end
> >> of window would set timers with this "limiting" setting (note that
> >> window.maxTimestamp() would have to change definition to be the first
> >> timestamp strictly greater than any timestamp that belongs to the window
> >> - it will actually be the first timestamp of new window with "limit"
> >> flag on). The impact on timers would be that events fired from @OnTimer
> >> would just propagate the flag to the WindowedValue being output.
> >>
> >> That way it seems to not matter how SDK internally handles time
> >> precision, as it would be transparent (at least seems to me). It is
> >> actually precisely what you proposed as "minus epsilon", only taken to
> >> the extreme. Looks useful to me and seems not that hard to implement.
> >> Although it would be of course a somewhat breaking change, because
> >> outputs of windows would become "3:00:00.000" instead of "2:59:59.999"
> >> (but I like the first one much more! :))
> > Yes, this is the "minus epsilon" idea, but assigning this as a bit on
> > the WindowedValue rather than on the Timestamp itself. This means that
> > pulling the timestamp out then re-assigning it would be lossy. (As a
> > basic example, imaging the batching DoFn that batches up elements
> > (with their respective metadata), calls an external service on the
> > full batch, and then emits the results (with the appropriate, cached,
> > metadata).
> >
> >> On 10/30/19 10:32 PM, Robert Bradshaw wrote:
> >>> On Wed, Oct 30, 2019 at 2:00 AM Jan Lukavský <je...@seznam.cz> wrote:
> >>>> TL;DR - can we solve this by representing aggregations as not point-wise
> >>>> events in time, but time ranges? Explanation below.
> >>>>
> >>>> Hi,
> >>>>
> >>>> this is pretty interesting from a theoretical point of view. The
> >>>> question generally seems to be - having two events, can I reliably order
> >>>> them? One event might be end of one window and the other event might be
> >>>> start of another window. There is strictly required causality in this
> >>>> (although these events in fact have the same limiting timestamp). On the
> >>>> other hand, when I have two (random) point-wise events (one with
> >>>> timestamp T1 and the other with timestamp T2), then causality of these
> >>>> two depend on distance in space. If these two events differ by single
> >>>> nanosecond, then if they do not originate very "close" to each other,
> >>>> then there is high probability that different observers will see them in
> >>>> different order (and hence there is no causality possible and the order
> >>>> doesn't matter).
> >>>>
> >>>> That is to say - if I'm dealing with single external event (someone
> >>>> tells me something has happened at time T), then - around the boundaries
> >>>> of windows - it doesn't matter which window is this event placed into.
> >>>> There is no causal connection between start of window and the event.
> >>> I resolve this theoretical issue by working in a space with a single
> >>> time dimension and zero spatial dimensions. That is, the location of
> >>> an event is completely determined by a single coordinate in time, and
> >>> distance and causality are hence well defined for all possible
> >>> observers :). Realistically, there is both error and ambiguity in
> >>> assigning absolute timestamps to real-world events, but it's important
> >>> that this choice of ordering should be preserved (e.g. not compressed
> >>> away) by the system.
> >>>
> >>>> Different situation is when we actually perform an aggregation on
> >>>> multiple elements (GBK) - then although we produce output at certain
> >>>> event-time timestamp, this event is not "point-wise external", but is a
> >>>> running aggregation on multiple (and possibly long term) data. That is
> >>>> of course causally preceding opening a new (tumbling) window.
> >>>>
> >>>> The question now is - could we extend WindowedValue so that it doesn't
> >>>> contain only single point-wise event in time, but a time range in case
> >>>> of aggregations? Then the example of Window.into -> GBK -> Window.into
> >>>> could actually behave correctly (respecting causality) and another
> >>>> positive thing could be, that the resulting timestamp of the aggregation
> >>>> would no longer have to be window.maxTimestamp - 1 (which is kind of
> >>>> strange).
> >>>>
> >>>> This might be actually equivalent to the "minus epsilon" approach, but
> >>>> maybe better understandable. Moreover, this should be probably available
> >>>> to users, because one can easily get to the same problems when using
> >>>> stateful ParDo to perform a GBK-like aggregation.
> >>>>
> >>>> Thoughts?
> >>> This is quite an interesting idea. In some sense, timestamps become
> >>> like interval windows, and window assignment is similar to the window
> >>> mapping fns that we use for side inputs. I still think the idea of a
> >>> timestmap for an element and a window for an element is needed (e.g.
> >>> one can have elements in a single window, especially the global window
> >>> that have different timestamps), but this could be interesting to
> >>> explore. It could definitely get rid of the "minus epsilon" weirdness,
> >>> though I don't think it completely solves the granularity issues.
> >>>
> >>>> On 10/30/19 1:05 AM, Robert Bradshaw wrote:
> >>>>> On Tue, Oct 29, 2019 at 4:20 PM Kenneth Knowles <ke...@apache.org> wrote:
> >>>>>> Point (1) is compelling. Solutions to the "minus epsilon" seem a bit complex. On the other hand, an opaque and abstract Timestamp type (in each SDK) going forward seems like a Pretty Good Idea (tm). Would you really have to go floating point? Could you just have a distinguished representation for non-inclusive upper/lower bounds? These could be at the same reduced resolution as timestamps in element metadata, since that is all they are compared against.
> >>>>> If I were coming up with an abstract, opaque representation of
> >>>>> Timestamp (and Duration) for Beam, I would explicitly include the
> >>>>> "minus epsilon" concept. One could still do arithmetic with these.
> >>>>> This would make any conversion to standard datetime libraries lossy
> >>>>> though.
> >>>>>
> >>>>>> Point (2) is also good, though it seems like something that could be cleverly engineered and/or we just provide one implementation and it is easy to make your own for finer granularity, since a WindowFn separately receives the Timestamp (here I'm pretending it is abstract and opaque and likely approximate) and the original element with whatever precision the original data included.
> >>>>> Yes, but I don't see how a generic WindowFn would reach into the
> >>>>> (arbitrary) element and pull out this original data. One of the
> >>>>> benefits of the Beam model is that the WindowFn does not have to
> >>>>> depend on the element type.
> >>>>>
> >>>>>> Point (3) the model/runner owns the timestamp metadata so I feel fine about it being approximated as long as any original user data is still present. I don't recall seeing a compelling case where the timestamp metadata that the runner tracks and understands is required to be exactly the same as a user value (assuming users understand this distinction, which is another issue that I would separate from whether it is technically feasible).
> >>>>> As we provide the ability to designate user data as the runner
> >>>>> timestamp against which to window, and promote the runner timestamp
> >>>>> back to user data (people are going to want to get DateTime or Instant
> >>>>> objects out of it), it seems tricky to explain to users that one or
> >>>>> both of these operations may be lossy (and, in addition, I don't think
> >>>>> there's a consistently safe direction to round).
> >>>>>
> >>>>>> The more I think about the very real problems you point out, the more I think that our backwards-incompatible move should be to our own abstract Timestamp type, putting the design decision behind a minimal interface. If we see a concrete design for that data type, we might be inspired how to support more possibilities.
> >>>>>>
> >>>>>> As for the rest of the speculation... moving to nanos immediately helps users so I am now +1 on just doing it, or moving ahead with an abstract data type under the assumption that it will basically be nanos under the hood.
> >>>>> If the fact that it's stored as nanos under the hood leaks out (and I
> >>>>> have trouble seeing how it won't) I'd lean towards just using them
> >>>>> directly (e.g. Java Instant) rather than wrapping it.
> >>>>>
> >>>>>> Having a cleverly resolution-independent system is interesting and maybe extremely future proof but maybe preparing for a very distant future that may never come.
> >>>>>>
> >>>>>> Kenn
> >>>>>>
> >>>>>> On Fri, Oct 18, 2019 at 11:35 AM Robert Bradshaw <ro...@google.com> wrote:
> >>>>>>> TL;DR: We should just settle on nanosecond precision ubiquitously for timestamp/windowing in Beam.
> >>>>>>>
> >>>>>>>
> >>>>>>> Re-visiting this discussion in light of cross-language transforms and runners, and trying to tighten up testing. I've spent some more time thinking about how we could make these operations granularity-agnostic, but just can't find a good solution. In particular, the sticklers seem to be:
> >>>>>>>
> >>>>>>> (1) Windows are half-open intervals, and the timestamp associated with a window coming out of a GBK is (by default) as large as possible but must live in that window. (Otherwise WindowInto + GBK + WindowInto would have the unforunate effect of moving aggregate values into subsequent windows, which is clearly not the intent.) In other words, the timestamp of a grouped value is basically End(Window) - epsilon. Unless we choose a representation able to encode "minus epsilon" we must agree on a granularity.
> >>>>>>>
> >>>>>>> (2) Unless we want to have multiple vairants of all our WindowFns (e.g. FixedWindowMillis, FixedWindowMicros, FixedWindowNanos) we must agree on a granularity with which to parameterize these well-known operations. There are cases (e.g. side input window mapping, merging) where these Fns may be used downstream in contexts other than where they are applied/defined.
> >>>>>>>
> >>>>>>> (3) Reification of the timestamp into user-visible data, and the other way around, require a choice of precision to expose to the user. This means that the timestamp is actual data, and truncating/rounding cannot be done implicitly. Also round trip of reification and application of timestamps should hopefully be idempotent no matter the SDK.
> >>>>>>>
> >>>>>>> The closest I've come is possibly parameterizing the timestamp type, where encoding, decoding (including pulling the end out of a window?), comparison (against each other and a watermark), "minus epsilon", etc could be UDFs. Possibly we'd need the full set of arithmetic operations to implement FixedWindows on an unknown timestamp type. Reification would simply be dis-allowed (or return an opaque rather than SDK-native) type if the SDK did not know that window type. The fact that one might need comparison between timestamps of different types, or (lossless) coercion from one type to another, means that timestamp types need to know about each other, or another entity needs to know about the full cross-product, unless there is a common base-type (at which point we might as well always choose that).
> >>>>>>>
> >>>>>>> An intermediate solution is to settle on floating (decimal) point representation, plus a "minus-epsiloin" bit. It wouldn't quite solve the mapping through SDK-native types (which could require rounding or errors or a new opaque type, and few date librarys could faithfully expose the minus epsilon part). It might also be more expensive (compute and storage), and would not allow us to use the protofuf timestamp/duration fields (or any standard date/time libraries).
> >>>>>>>
> >>>>>>> Unless we can come up with a clean solution to the issues above shortly, I think we should fix a precision and move forward. If this makes sense to everyone, then we can start talking about the specific choice of precision and a migration path (possibly only for portability).
> >>>>>>>
> >>>>>>>
> >>>>>>> For reference, the manipulations we do on timestamps are:
> >>>>>>>
> >>>>>>> WindowInto: Timestamp -> Window
> >>>>>>> TimestampCombine: Window, [Timestamp] -> Timestamp
> >>>>>>>        End(Window)
> >>>>>>>        Min(Timestamps)
> >>>>>>>        Max(Timestamps)
> >>>>>>> PastEndOfWindow: Watermark, Window -> {True, False}
> >>>>>>>
> >>>>>>> [SideInput]WindowMappingFn: Window -> Window
> >>>>>>>        WindowInto(End(Window))
> >>>>>>>
> >>>>>>> GetTimestamp: Timestamp -> SDK Native Object
> >>>>>>> EmitAtTimestamp: SDK Native Object -> Timestamp
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, May 10, 2019 at 1:33 PM Robert Bradshaw <ro...@google.com> wrote:
> >>>>>>>> On Thu, May 9, 2019 at 9:32 AM PM Kenneth Knowles <ke...@apache.org> wrote:
> >>>>>>>>
> >>>>>>>>> From: Robert Bradshaw <ro...@google.com>
> >>>>>>>>> Date: Wed, May 8, 2019 at 3:00 PM
> >>>>>>>>> To: dev
> >>>>>>>>>
> >>>>>>>>>> From: Kenneth Knowles <ke...@apache.org>
> >>>>>>>>>> Date: Wed, May 8, 2019 at 6:50 PM
> >>>>>>>>>> To: dev
> >>>>>>>>>>
> >>>>>>>>>>>> The end-of-window, for firing, can be approximate, but it seems it
> >>>>>>>>>>>> should be exact for timestamp assignment of the result (and similarly
> >>>>>>>>>>>> with the other timestamp combiners).
> >>>>>>>>>>> I was thinking that the window itself should be stored as exact data, while just the firing itself is approximated, since it already is, because of watermarks and timers.
> >>>>>>>>>> I think this works where we can compare encoded windows, but some
> >>>>>>>>>> portable interpretation of windows is required for runner-side
> >>>>>>>>>> implementation of merging windows (for example).
> >>>>>>>>> But in this case, you've recognized the URN of the WindowFn anyhow, so you understand its windows. Remembering that IntervalWindow is just one choice, and that windows themselves are totally user-defined and that merging logic is completely arbitrary per WindowFn (we probably should have some restrictions, but see https://issues.apache.org/jira/browse/BEAM-654). So I file this use case in the "runner knows everything about the WindowFn and Window type and window encoding anyhow".
> >>>>>>>> Being able to merge common windows in the runner is just an
> >>>>>>>> optimization, but an important one (especially for bootstrapping
> >>>>>>>> SDKs). However, this is not just about runner to SDK, but SDK to SDK
> >>>>>>>> as well (where a user from one SDK may want to inspect the windows
> >>>>>>>> produced by another). Having MillisIntervalWindow,
> >>>>>>>> MicrosIntervalWindow, NanosIntervalWindow, etc. isn't a path that I
> >>>>>>>> think is worth going down.
> >>>>>>>>
> >>>>>>>> Yes, we need to solve the "extract the endpoint of an unknown encoded
> >>>>>>>> window" problem as well, possibly similar to what we do with length
> >>>>>>>> prefix coders, possibly a restriction on window encodings themselves.
> >>>>>>>>
> >>>>>>>>>> There may also be issues if windows (or timestamps) are assigned to a
> >>>>>>>>>> high precision in one SDK, then inspected/acted on in another SDK, and
> >>>>>>>>>> then passed back to the original SDK where the truncation would be
> >>>>>>>>>> visible.
> >>>>>>>>> This is pretty interesting and complex. But again, a window is just data. An SDK has to know how to deserialize it to operate on it. Unless we do actually standardize some aspects of it. I don't believe BoundedWindow encoding has a defined way to get the timestamp without decoding the window, does it? I thought we had basically default to all InternalWindows. But I am not following that closely.
> >>>>>>>>>
> >>>>>>>>>>> You raise a good point that min/max timestamp combiners require actually understanding the higher-precision timestamp. I can think of a couple things to do. One is the old "standardize all 3 or for precisions we need" and the other is that combiners other than EOW exist primarily to hold the watermark, and that hold does not require the original precision. Still, neither of these is that satisfying.
> >>>>>>>>>> In the current model, the output timestamp is user-visible.
> >>>>>>>>> But as long as the watermark hold is less, it is safe. It requires knowing the coarse-precision lower bound of the timestamps of the input. And there may be situations where you also want the coarse upper bound. But you do know that these are at most one millisecond apart (assuming the runner is in millis) so perhaps no storage overhead. But a lot of complexity and chances for off by ones. And this is pretty hand-wavy.
> >>>>>>>> Yeah. A different SDK may (implicitly or explicitly) ask for the
> >>>>>>>> timestamp of the (transitive) output of a GBK, for which an
> >>>>>>>> approximation (either way) is undesirable.
> >>>>>>>>
> >>>>>>>>>>>>> A correction: Java *now* uses nanoseconds [1]. It uses the same breakdown as proto (int64 seconds since epoch + int32 nanos within second). It has legacy classes that use milliseconds, and Joda itself now encourages moving back to Java's new Instant type. Nanoseconds should complicate the arithmetic only for the one person authoring the date library, which they have already done.
> >>>>>>>>>>>> The encoding and decoding need to be done in a language-consistent way
> >>>>>>>>>>>> as well.
> >>>>>>>>>>> I honestly am not sure what you mean by "language-consistent" here.
> >>>>>>>>>> If we want to make reading and writing of timestamps, windows
> >>>>>>>>>> cross-language, we can't rely on language-specific libraries to do the
> >>>>>>>>>> encoding.
> >>>>>>>>>>
> >>>>>>>>>>>> Also, most date libraries don't division, etc. operators, so
> >>>>>>>>>>>> we have to do that as well. Not that it should be *that* hard.
> >>>>>>>>>>> If the libraries dedicated to time handling haven't found it needful, is there a specific reason you raise this? We do some simple math to find the window things fall into; is that it?
> >>>>>>>>>> Yes. E.g.
> >>>>>>>>>>
> >>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java#L77
> >>>>>>>>>>
> >>>>>>>>>> would be a lot messier if there were no mapping date libraries to raw
> >>>>>>>>>> ints that we can do arithmetic on. Writing this with the (seconds,
> >>>>>>>>>> nanos) representation is painful. But I suppose we'd only have to do
> >>>>>>>>>> it once per SDK.
> >>>>>>>>> Yea I think that arithmetic is not so bad. But this raises the issue of writing a *generic* WindowFn where its idea of timestamp granularity (the WindowFn owns the window type and encoding) may not match the user data coming in. So you need to apply the approximation function to provide type-correct input to the WindowFn. That's kind of exciting and weird and perhaps unsolvable, except by choosing a concrete granularity.
> >>>>>>>>>
> >>>>>>>>> Kenn
> >>>>>>>>>
> >>>>>>>>>>>>>> It would also be really nice to clean up the infinite-future being the
> >>>>>>>>>>>>>> somewhat arbitrary max micros rounded to millis, and
> >>>>>>>>>>>>>> end-of-global-window being infinite-future minus 1 hour (IIRC), etc.
> >>>>>>>>>>>>>> as well as the ugly logic in Python to cope with millis-micros
> >>>>>>>>>>>>>> conversion.
> >>>>>>>>>>>>> I actually don't have a problem with this. If you are trying to keep the representation compact, not add bytes on top of instants, then you just have to choose magic numbers, right?
> >>>>>>>>>>>> It's not about compactness, it's the (historically-derived?)
> >>>>>>>>>>>> arbitrariness of the numbers.
> >>>>>>>>>>> What I mean is that the only reason to fit them into an integer at all is compactness. Otherwise, you could use a proper disjoint union representing your intent directly, and all fiddling goes away, like `Timestamp ::= PosInf | NegInf | EndOfGlobalWindow | ActualTime(Instant)`. It costs a couple of bits.
> >>>>>>>>>> The other cost is not being able to use standard libraries to
> >>>>>>>>>> represent all of your timestamps.
> >>>>>>>>>>
> >>>>>>>>>>>> For example, the bounds are chosen to
> >>>>>>>>>>>> fit within 64-bit mircos despite milliseconds being the "chosen"
> >>>>>>>>>>>> granularity, and care was taken that
> >>>>>>>>>>>>
> >>>>>>>>>>>>        WindowInto(Global) | GBK | WindowInto(Minute) | GBK
> >>>>>>>>>>>>
> >>>>>>>>>>>> works, but
> >>>>>>>>>>>>
> >>>>>>>>>>>>        WindowInto(Global) | GBK | WindowInto(Day) | GBK
> >>>>>>>>>>>>
> >>>>>>>>>>>> may produce elements with timestamps greater than MaxTimestamp.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Kenn
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1] https://docs.oracle.com/javase/8/docs/api/java/time/Instant.html
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Wed, Apr 17, 2019 at 3:13 PM Robert Burke <ro...@frantil.com> wrote:
> >>>>>>>>>>>>>>>> +1 for plan B. Nano second precision on windowing seems... a little much for a system that's aggregating data over time. Even for processing say particle super collider data, they'd get away with artificially increasing the granularity in batch settings.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Now if they were streaming... they'd probably want femtoseconds anyway.
> >>>>>>>>>>>>>>>> The point is, we should see if users demand it before adding in the necessary work.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Wed, 17 Apr 2019 at 14:26, Chamikara Jayalath <ch...@google.com> wrote:
> >>>>>>>>>>>>>>>>> +1 for plan B as well. I think it's important to make timestamp precision consistent now without introducing surprising behaviors for existing users. But we should move towards a higher granularity timestamp precision in the long run to support use-cases that Beam users otherwise might miss out (on a runner that supports such precision).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> - Cham
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Wed, Apr 17, 2019 at 1:35 PM Lukasz Cwik <lc...@google.com> wrote:
> >>>>>>>>>>>>>>>>>> I also like Plan B because in the cross language case, the pipeline would not work since every party (Runners & SDKs) would have to be aware of the new beam:coder:windowed_value:v2 coder. Plan A has the property where if the SDK/Runner wasn't updated then it may start truncating the timestamps unexpectedly.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2019 at 1:24 PM Lukasz Cwik <lc...@google.com> wrote:
> >>>>>>>>>>>>>>>>>>> Kenn, this discussion is about the precision of the timestamp in the user data. As you had mentioned, Runners need not have the same granularity of user data as long as they correctly round the timestamp to guarantee that triggers are executed correctly but the user data should have the same precision across SDKs otherwise user data timestamps will be truncated in cross language scenarios.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Based on the systems that were listed, either microsecond or nanosecond would make sense. The issue with changing the precision is that all Beam runners except for possibly Beam Python on Dataflow are using millisecond precision since they are all using the same Java Runner windowing/trigger logic.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Plan A: Swap precision to nanosecond
> >>>>>>>>>>>>>>>>>>> 1) Change the Python SDK to only expose millisecond precision timestamps (do now)
> >>>>>>>>>>>>>>>>>>> 2) Change the user data encoding to support nanosecond precision (do now)
> >>>>>>>>>>>>>>>>>>> 3) Swap runner libraries to be nanosecond precision aware updating all window/triggering logic (do later)
> >>>>>>>>>>>>>>>>>>> 4) Swap SDKs to expose nanosecond precision (do later)
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Plan B:
> >>>>>>>>>>>>>>>>>>> 1) Change the Python SDK to only expose millisecond precision timestamps and keep the data encoding as is (do now)
> >>>>>>>>>>>>>>>>>>> (We could add greater precision later to plan B by creating a new version beam:coder:windowed_value:v2 which would be nanosecond and would require runners to correctly perform an internal conversions for windowing/triggering.)
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I think we should go with Plan B and when users request greater precision we can make that an explicit effort. What do people think?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2019 at 5:43 AM Maximilian Michels <mx...@apache.org> wrote:
> >>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks for taking care of this issue in the Python SDK, Thomas!
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> It would be nice to have a uniform precision for timestamps but, as Kenn
> >>>>>>>>>>>>>>>>>>>> pointed out, timestamps are extracted from systems that have different
> >>>>>>>>>>>>>>>>>>>> precision.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> To add to the list: Flink - milliseconds
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> After all, it doesn't matter as long as there is sufficient precision
> >>>>>>>>>>>>>>>>>>>> and conversions are done correctly.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I think we could improve the situation by at least adding a
> >>>>>>>>>>>>>>>>>>>> "milliseconds" constructor to the Python SDK's Timestamp.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>> Max
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On 17.04.19 04:13, Kenneth Knowles wrote:
> >>>>>>>>>>>>>>>>>>>>> I am not so sure this is a good idea. Here are some systems and their
> >>>>>>>>>>>>>>>>>>>>> precision:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Arrow - microseconds
> >>>>>>>>>>>>>>>>>>>>> BigQuery - microseconds
> >>>>>>>>>>>>>>>>>>>>> New Java instant - nanoseconds
> >>>>>>>>>>>>>>>>>>>>> Firestore - microseconds
> >>>>>>>>>>>>>>>>>>>>> Protobuf - nanoseconds
> >>>>>>>>>>>>>>>>>>>>> Dataflow backend - microseconds
> >>>>>>>>>>>>>>>>>>>>> Postgresql - microseconds
> >>>>>>>>>>>>>>>>>>>>> Pubsub publish time - nanoseconds
> >>>>>>>>>>>>>>>>>>>>> MSSQL datetime2 - 100 nanoseconds (original datetime about 3 millis)
> >>>>>>>>>>>>>>>>>>>>> Cassandra - milliseconds
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> IMO it is important to be able to treat any of these as a Beam
> >>>>>>>>>>>>>>>>>>>>> timestamp, even though they aren't all streaming. Who knows when we
> >>>>>>>>>>>>>>>>>>>>> might be ingesting a streamed changelog, or using them for reprocessing
> >>>>>>>>>>>>>>>>>>>>> an archived stream. I think for this purpose we either should
> >>>>>>>>>>>>>>>>>>>>> standardize on nanoseconds or make the runner's resolution independent
> >>>>>>>>>>>>>>>>>>>>> of the data representation.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I've had some offline conversations about this. I think we can have
> >>>>>>>>>>>>>>>>>>>>> higher-than-runner precision in the user data, and allow WindowFns and
> >>>>>>>>>>>>>>>>>>>>> DoFns to operate on this higher-than-runner precision data, and still
> >>>>>>>>>>>>>>>>>>>>> have consistent watermark treatment. Watermarks are just bounds, after all.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Kenn
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Tue, Apr 16, 2019 at 6:48 PM Thomas Weise <thw@apache.org
> >>>>>>>>>>>>>>>>>>>>> <ma...@apache.org>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>        The Python SDK currently uses timestamps in microsecond resolution
> >>>>>>>>>>>>>>>>>>>>>        while Java SDK, as most would probably expect, uses milliseconds.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>        This causes a few difficulties with portability (Python coders need
> >>>>>>>>>>>>>>>>>>>>>        to convert to millis for WindowedValue and Timers, which is related
> >>>>>>>>>>>>>>>>>>>>>        to a bug I'm looking into:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>        https://issues.apache.org/jira/browse/BEAM-7035
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>        As Luke pointed out, the issue was previously discussed:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>        https://issues.apache.org/jira/browse/BEAM-1524
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>        I'm not privy to the reasons why we decided to go with micros in
> >>>>>>>>>>>>>>>>>>>>>        first place, but would it be too big of a change or impractical for
> >>>>>>>>>>>>>>>>>>>>>        other reasons to switch Python SDK to millis before it gets more users?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>        Thanks,
> >>>>>>>>>>>>>>>>>>>>>        Thomas
> >>>>>>>>>>>>>>>>>>>>>