You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Chris Wildman <cw...@newrelic.com> on 2016/06/07 18:43:48 UTC

Event timestamps after data transformations

I have a question around event timestamps after a flatMap transformation, I
am using the event time time characteristic. I have two streams entering a
CoFlatMap. Stream A simply updates state in that CoFlatMap and does not
output any events. Stream B inserts events of type B which then output a
third type C.

Will the event timestamp from B be propagated to C? Do I need to add an
explicit timestamp assigner for C?

All windowing in this topology is done on event C so my assumption was:
Stream A does not need a timestamp assigner or watermark generator
Stream B does not need a timestamp assigner or watermark generator
Stream C needs a timestamp assigner and watermark generator

The confusion as to whether event B's timestamp is propagated to event C
arose from this sentence in the documentation: "Operators that consume
multiple input streams (e.g., after a *keyBy(…)* or *partition(…)* function,
or a union) track the event time on each of their input streams. The
operator’s current event time is the minimum of the input streams’ event
time. As the input streams update their event time, so does the operator."
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_time.html#watermarks-in-parallel-streams

Thanks for your help,
Chris

Re: Event timestamps after data transformations

Posted by Jamie Grier <ja...@data-artisans.com>.
You can handle this multiple ways..  If there is a natural timestamp in
StreamB you can just use it very naturally by doing this:

streamB
    .assignTimestamps(...) // your assigner
   .connect(streamA)
   .flatMap(...) // your CoFlatMapFunction
   .timeWindow(...)
   .whatever()

Here the event timestamp will be propagated from streamB to the output of
your CoFlatMapFunction.  The collector that is passed to your
CoFlatMapFunction will ensure that the elements emitted using that
collector have the same timestamp as the input event.

I hope that helps :)

-Jamie



On Tue, Jun 7, 2016 at 11:43 AM, Chris Wildman <cw...@newrelic.com>
wrote:

> I have a question around event timestamps after a flatMap transformation,
> I am using the event time time characteristic. I have two streams entering
> a CoFlatMap. Stream A simply updates state in that CoFlatMap and does not
> output any events. Stream B inserts events of type B which then output a
> third type C.
>
> Will the event timestamp from B be propagated to C? Do I need to add an
> explicit timestamp assigner for C?
>
> All windowing in this topology is done on event C so my assumption was:
> Stream A does not need a timestamp assigner or watermark generator
> Stream B does not need a timestamp assigner or watermark generator
> Stream C needs a timestamp assigner and watermark generator
>
> The confusion as to whether event B's timestamp is propagated to event C
> arose from this sentence in the documentation: "Operators that consume
> multiple input streams (e.g., after a *keyBy(…)* or *partition(…)* function,
> or a union) track the event time on each of their input streams. The
> operator’s current event time is the minimum of the input streams’ event
> time. As the input streams update their event time, so does the operator."
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_time.html#watermarks-in-parallel-streams
>
> Thanks for your help,
> Chris
>
>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com