You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Jamie Grier <ja...@data-artisans.com> on 2017/02/23 00:01:48 UTC

[DISCUSS] Per-key event time

Hi Flink Devs,

Use cases that I see quite frequently in the real world would benefit from
a different watermarking / event time model than the one currently
implemented in Flink.

I would call Flink's current approach partition-based watermarking or maybe
subtask-based watermarking.  In this model the current "event time" is a
property local to each subtask instance in a dataflow graph.  The event
time at any subtask is the minimum of the watermarks it has received on
each of it's input streams.

There are a couple of issues with this model that are not optimal for some
(maybe many) use cases.

1) A single slow subtask (or say source partition) anywhere in the dataflow
can mean no progress can be made on the computation at all.

2) In many real world scenarios the time skew across keys can be *many*
times greater than the time skew within the data with the same key.

In this discussion I'll use "time skew" to refer to the out-of-orderness
with respect to timestamp of the data.  Out-of-orderness is a mouthful ;)

Anyway, let me provide an example or two.

In IoT applications the source of events is a particular device out in the
world, let's say a device in a connected car application.  The data for
some particular device may be very bursty and we will certainly get events
from these devices in Flink out-of-order just because of things like
partitions in Kafka, shuffles in Flink, etc.  However, the time skew in the
data for a single device should likely be very small (milliseconds or maybe
seconds)..

However, in the same application the time skew across different devices can
be huge (hours or even days).  An obvious example of this, again using
connected cars as a representative example is the following:  Car A is
recording data locally at 12:00 pm on Saturday but doesn't currently have a
network connection.  Car B is doing the same thing but does have a network
connection.  Car A will transmit it's data when the network comes back on
line.  Let's say this is at 4pm.  Car B was transmitting it's data
immediately.  This creates a huge time skew (4 hours) in the observed
datastream when looked at as a whole.  However, the time skew in that data
for Car A or Car B alone could be tiny.  It will be out of order of course
but maybe by only milliseconds or seconds.

What the above means in the end for Flink is that the watermarks must be
delayed by up to 4 hours or more because we're looking at the data stream
as a whole -- otherwise the data for Car A will be considered late.  The
time skew in the data stream when looked at as a whole is large even though
the time skew for any key may be tiny.

This is the problem I would like to see a solution for.  The basic idea of
keeping track of watermarks and event time "per-key" rather than per
partition or subtask would solve I think both of these problems stated
above and both of these are real issues for production applications.

The obvious downside of trying to do this per-key is that the amount of
state you have to track is much larger and potentially unbounded.  However,
I could see this approach working if the keyspace isn't growing rapidly but
is stable or grows slowly.  The saving grace here is that this may actually
be true of the types of applications where this would be especially
useful.  Think IoT use cases.  Another approach to keeping state size in
check would be a configurable TTL for a key.

Anyway, I'm throwing this out here on the mailing list in case anyone is
interested in this discussion, has thought about the problem deeply
already, has use cases of their own they've run into or has ideas for a
solution to this problem.

Thanks for reading..

-Jamie


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com

Re: [DISCUSS] Per-key event time

Posted by Jamie Grier <ja...@data-artisans.com>.
Thinking about this a bit more...

I think it may be interesting to enable two modes for event-time
advancement in Flink

1) The current mode which I'll call partition-based, pessimistic,
event-time advancement
2) Key-based, eager, event-time advancement

In this key-based eager mode it's actually quite simple and it basically
becomes a completely local thing as Paris stated.  In this mode you would
advance event time, per-key, along with the maximum (adjusted) timestamp
you've seen rather than the minimum.  So the current event time at any node
for some key is simply the maximum timestamp you've seen - adjusted (like
now) with the logic from a timestamp extractor -- for example the
BoundedOutOfOrderness extractor.  This is very simple and could possibly
work well as long as the delay used in the event-time calculation is enough
to adjust for the real time skew you're likely to observe for any key.

I wonder how well this might work in practice?

On Tue, Feb 28, 2017 at 6:22 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> @Tzu-Li Yes, the deluxe stream would not allow another keyBy(). Or we could
> allow it but then we would exit the world of the deluxe stream and per-key
> watermarks and go back to the realm of normal streams and keyed streams.
>
> On Tue, 28 Feb 2017 at 10:08 Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
> > Throwing in some thoughts:
> >
> > When a source determines that no more data will come for a key (which
> > in itself is a bit of a tricky problem) then it should signal to
> > downstream
> > operations to take the key out of watermark calculations, that is that we
> > can release some space.
> > I don’t think this is possible without exposing API for the UDF to signal
> > there will be no more data for a specific key. We could detect idleness
> of
> > a key at the source operator, but without any help from user logic,
> > essentially it can only be seen as "temporarily idle", which is not
> helpful
> > in reducing the state as the watermark state for that key still needs to
> be
> > kept downstream.
> >
> > So to achieve this, I think the only option would be to expose new APIs
> > here too.
> >
> > It’s like how we recently exposed a new `markAsTemporarilyIdle` method in
> > the SourceFunction.SourceContext interface, but instead a
> > `markKeyTerminated` that must be called by the source UDF to be able to
> > save state space and have no feasible fallback detection strategy.
> >
> > DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor);
> > input
> > .map()
> > .window(...) // notice that we don't need keyBy because it is implicit
> > .reduce(...)
> > .map(...)
> > .window(...)
> > ...
> >
> > Would this mean that another `keyBy` isn’t allowed downstream? Or still
> > allowed, but we’re using the keys in `DeluxeKeyedStream` as the “meta
> key”
> > to track key lineage?
> >
> > On February 27, 2017 at 9:37:27 PM, Aljoscha Krettek (
> aljoscha@apache.org)
> > wrote:
> >
> > This is indeed an interesting topic, thanks for starting the discussion,
> > Jamie!
> >
> > I now thought about this for a while, since more and more people seem to
> be
> > asking about it lately. First, I thought that per-key watermark handling
> > would not be necessary because it can be done locally (as Paris
> suggested),
> > then I realised that that's not actually the case and thought that this
> > wouldn't be possible. In the end, I came to realise that it is indeed
> > possible (with some caveats), although with a huge overhead in the amount
> > of state that we have to keep and with changes to our API. I'll try and
> > walk you through my thought process.
> >
> > Let's first look at local watermark tracking, that is, tracking the
> > watermark locally at the operator that needs it, for example a
> > WindowOperator. I initially thought that this would be sufficient. Assume
> > we have a pipeline like this:
> >
> > Source -> KeyBy -> WindowOperator -> ...
> >
> > If we have parallelism=1, then all elements for a given key k will be
> read
> > by the same source operator instance and they will arrive (in-order) at
> the
> > WindowOperator. It doesn't matter whether we track the per-key watermarks
> > at the Source or at the WindowOperator because we see the same elements
> in
> > the same order at each operator, per key.
> >
> > Now, think about this pipeline:
> >
> > Source1 --+
> > |-> Union -> KeyBy -> WindowOperator -> ...
> > Source2 --+
> >
> > (you can either think about two sources or once source that has several
> > parallel instances, i.e. parallelism > 1)
> >
> > Here, both Source1 and Source2 can emit elements with our key k. If
> Source1
> > is faster than Source2 and the watermarking logic at the WindowOperator
> > determines the watermark based on the incoming element timestamps (for
> > example, using the BoundedLatenessTimestampExtractor) then the elements
> > coming from Source2 will be considered late at the WindowOperator.
> >
> > From this we know that our WindowOperator needs to calculate the
> watermark
> > similarly to how watermark calculation currently happens in Flink: the
> > watermark is the minimum of the watermark of all upstream operations. In
> > this case it would be: the minimum upstream watermarks of operations that
> > emit elements with key k. For per-partition watermarks this works because
> > the number of upstream operations is know and we simply keep an array
> that
> > has the current upstream watermark for each input operation. For per-key
> > watermarks this would mean that we have to keep k*u upstream watermarks
> > where u is the number of upstream operations. This can be quite large.
> > Another problem is that the observed keys change, i.e. the key space is
> > evolving and we need to retire keys from our calculations lest we run out
> > of space.
> >
> > We could find a solution based on a feature we recently introduced in
> > Flink: https://github.com/apache/flink/pull/2801. The sources keep track
> > of
> > whether they have input and signal to downstream operations whether they
> > should be included in the watermark calculation logic. A similar thing
> > could be done per-key, where each source signals to downstream operations
> > that there is a new key and that we should start calculating watermarks
> for
> > this. When a source determines that no more data will come for a key
> (which
> > in itself is a bit of a tricky problem) then it should signal to
> downstream
> > operations to take the key out of watermark calculations, that is that we
> > can release some space.
> >
> > The above is analysing, on a purely technical level, the feasibility of
> > such a feature. I think it is feasible but can be very expensive in terms
> > of state size requirements. Gabor also pointed this out above and gave a
> > few suggestions on reducing that size.
> >
> > We would also need to change our API to allow tracking the lineage of
> keys
> > or to enforce that a key stays the same throughout a pipeline. Consider
> > this pipeline:
> >
> > Source -> KeyBy1 -> WindowOperator -> KeyBy2 -> WindowOperator
> >
> > where KeyBy1 and KeyBy2 extract a different key, respectively. How would
> > watermarks be tracked across this change of keys? Would we know which of
> > the prior keys and up being keys according to KeyBy2, i.e. do we have
> some
> > kind of key lineage information?
> >
> > One approach for solving this would be to introduce a new API that allows
> > extracting a key at the source and will keep this key on the elements
> until
> > the sink. For example:
> >
> > DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor);
> > input
> > .map()
> > .window(...) // notice that we don't need keyBy because it is implicit
> > .reduce(...)
> > .map(...)
> > .window(...)
> > ...
> >
> > The DeluxeKeyedStream (name preliminary ;-) would allow the operations
> that
> > we today have on KeyedStream and on DataStream and it would always
> maintain
> > the key that was assigned at the sources. The result of each operation
> > would again be a DeluxeKeyedStream. This way, we could track watermarks
> per
> > key.
> >
> > I know it's a bit of a (very) lengthy mail, but what do you think?
> >
> >
> > On Thu, 23 Feb 2017 at 11:14 Gábor Hermann <ma...@gaborhermann.com>
> wrote:
> >
> > > Hey all,
> > >
> > > Let me share some ideas about this.
> > >
> > > @Paris: The local-only progress tracking indeed seems easier, we do not
> > > need to broadcast anything. Implementation-wise it is easier, but
> > > performance-wise probably not. If one key can come from multiple
> > > sources, there could be a lot more network overhead with per-key
> > > tracking then broadcasting, somewhat paradoxically. Say source instance
> > > S1 sends messages and watermarks to operator instances O1, O2. In the
> > > broadcasting case, S1 would send one message to O1 and one to O2 per
> > > watermark (of course it depends on how fast the watermarks arrive),
> > > total of 2. Although, if we keep track of per-key watermarks, S1 would
> > > need to send watermarks for every key directed to O1, also for O2. So
> if
> > > 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if
> watermarks
> > > arrive at the same rate per-key as per-source in the previous case) we
> > > S1 would send a total of 20 watermarks.
> > >
> > > Another question is whether how large the state-per-key is? If it's
> > > really small (an integer maybe, or state of a small state machine),
> then
> > > the overhead of keeping track of a (Long) watermark is large
> > > memory-wise. E.g. Int state vs. Long watermark results in 3x as large
> > > state. Also, the checkpointing would be ~3x as slow. Of course, for
> > > large states a Long watermark would not mean much overhead.
> > >
> > > We could resolve the memory issue by using some kind of sketch data
> > > structure. Right now the granularity of watermark handling is
> > > per-operator-instance. On the other hand, per-key granularity might be
> > > costly. What if we increased the granularity of watermarks inside an
> > > operator by keeping more than one watermark tracker in one operator?
> > > This could be quite simply done with a hash table. With a hash table of
> > > size 1, we would yield the current semantics (per-operator-instance
> > > granularity). With a hash table large enough to have at most one key
> per
> > > bucket, we would yield per-key watermark tracking. In between lies the
> > > trade-off between handling time-skew and a lot of memory overhead. This
> > > does not seem hard to implement.
> > >
> > > Of course, at some point we would still need to take care of watermarks
> > > per-key. Imagine that keys A and B would go to the same bucket of the
> > > hash table, and watermarks are coming in like this: (B,20), (A,10),
> > > (A,15), (A,40). Then the watermark of the bucket should be the minimum
> > > as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of
> > > the watermarks of A and B separately. But after we have a correct
> > > watermark for the bucket, all we need to care about is the bucket
> > > watermarks. So somewhere (most probably at the source) we would have to
> > > pay memory overhead of tracking every key, but nowhere else in the
> > > topology.
> > >
> > > Regarding the potentially large network overhead, the same compression
> > > could be useful. I.e. we would not send watermarks from one operator
> > > per-key, but rather per-hash. Again, the trade-off between time skew
> and
> > > memory consumption is configurable by the size of the hash table used.
> > >
> > > Cheers,
> > > Gabor
> > >
> > > On 2017-02-23 08:57, Paris Carbone wrote:
> > >
> > > > Hey Jamie!
> > > >
> > > > Key-based progress tracking sounds like local-only progress tracking
> to
> > > me, there is no need to use a low watermarking mechanism at all since
> all
> > > streams of a key are handled by a single partition at a time (per
> > operator).
> > > > Thus, this could be much easier to implement and support (i.e., no
> need
> > > to broadcast the progress state of each partition all the time).
> > > > State-wise it should be fine too if it is backed by rocksdb,
> especially
> > > if we have MapState in the future.
> > > >
> > > > Just my quick thoughts on this, to get the discussion going :)
> > > >
> > > > cheers
> > > > Paris
> > > >
> > > >> On 23 Feb 2017, at 01:01, Jamie Grier <ja...@data-artisans.com>
> > wrote:
> > > >>
> > > >> Hi Flink Devs,
> > > >>
> > > >> Use cases that I see quite frequently in the real world would
> benefit
> > > from
> > > >> a different watermarking / event time model than the one currently
> > > >> implemented in Flink.
> > > >>
> > > >> I would call Flink's current approach partition-based watermarking
> or
> > > maybe
> > > >> subtask-based watermarking. In this model the current "event time"
> is
> > a
> > > >> property local to each subtask instance in a dataflow graph. The
> event
> > > >> time at any subtask is the minimum of the watermarks it has received
> > on
> > > >> each of it's input streams.
> > > >>
> > > >> There are a couple of issues with this model that are not optimal
> for
> > > some
> > > >> (maybe many) use cases.
> > > >>
> > > >> 1) A single slow subtask (or say source partition) anywhere in the
> > > dataflow
> > > >> can mean no progress can be made on the computation at all.
> > > >>
> > > >> 2) In many real world scenarios the time skew across keys can be
> > *many*
> > > >> times greater than the time skew within the data with the same key.
> > > >>
> > > >> In this discussion I'll use "time skew" to refer to the
> > out-of-orderness
> > > >> with respect to timestamp of the data. Out-of-orderness is a
> mouthful
> > > ;)
> > > >>
> > > >> Anyway, let me provide an example or two.
> > > >>
> > > >> In IoT applications the source of events is a particular device out
> in
> > > the
> > > >> world, let's say a device in a connected car application. The data
> for
> > > >> some particular device may be very bursty and we will certainly get
> > > events
> > > >> from these devices in Flink out-of-order just because of things like
> > > >> partitions in Kafka, shuffles in Flink, etc. However, the time skew
> in
> > > the
> > > >> data for a single device should likely be very small (milliseconds
> or
> > > maybe
> > > >> seconds)..
> > > >>
> > > >> However, in the same application the time skew across different
> > devices
> > > can
> > > >> be huge (hours or even days). An obvious example of this, again
> using
> > > >> connected cars as a representative example is the following: Car A
> is
> > > >> recording data locally at 12:00 pm on Saturday but doesn't currently
> > > have a
> > > >> network connection. Car B is doing the same thing but does have a
> > > network
> > > >> connection. Car A will transmit it's data when the network comes
> back
> > > on
> > > >> line. Let's say this is at 4pm. Car B was transmitting it's data
> > > >> immediately. This creates a huge time skew (4 hours) in the observed
> > > >> datastream when looked at as a whole. However, the time skew in that
> > > data
> > > >> for Car A or Car B alone could be tiny. It will be out of order of
> > > course
> > > >> but maybe by only milliseconds or seconds.
> > > >>
> > > >> What the above means in the end for Flink is that the watermarks
> must
> > be
> > > >> delayed by up to 4 hours or more because we're looking at the data
> > > stream
> > > >> as a whole -- otherwise the data for Car A will be considered late.
> > The
> > > >> time skew in the data stream when looked at as a whole is large even
> > > though
> > > >> the time skew for any key may be tiny.
> > > >>
> > > >> This is the problem I would like to see a solution for. The basic
> idea
> > > of
> > > >> keeping track of watermarks and event time "per-key" rather than per
> > > >> partition or subtask would solve I think both of these problems
> stated
> > > >> above and both of these are real issues for production applications.
> > > >>
> > > >> The obvious downside of trying to do this per-key is that the amount
> > of
> > > >> state you have to track is much larger and potentially unbounded.
> > > However,
> > > >> I could see this approach working if the keyspace isn't growing
> > rapidly
> > > but
> > > >> is stable or grows slowly. The saving grace here is that this may
> > > actually
> > > >> be true of the types of applications where this would be especially
> > > >> useful. Think IoT use cases. Another approach to keeping state size
> in
> > > >> check would be a configurable TTL for a key.
> > > >>
> > > >> Anyway, I'm throwing this out here on the mailing list in case
> anyone
> > is
> > > >> interested in this discussion, has thought about the problem deeply
> > > >> already, has use cases of their own they've run into or has ideas
> for
> > a
> > > >> solution to this problem.
> > > >>
> > > >> Thanks for reading..
> > > >>
> > > >> -Jamie
> > > >>
> > > >>
> > > >> --
> > > >>
> > > >> Jamie Grier
> > > >> data Artisans, Director of Applications Engineering
> > > >> @jamiegrier <https://twitter.com/jamiegrier>
> > > >> jamie@data-artisans.com
> > > >
> > >
> > >
> >
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com

Re: [DISCUSS] Per-key event time

Posted by Aljoscha Krettek <al...@apache.org>.
@Tzu-Li Yes, the deluxe stream would not allow another keyBy(). Or we could
allow it but then we would exit the world of the deluxe stream and per-key
watermarks and go back to the realm of normal streams and keyed streams.

On Tue, 28 Feb 2017 at 10:08 Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Throwing in some thoughts:
>
> When a source determines that no more data will come for a key (which
> in itself is a bit of a tricky problem) then it should signal to
> downstream
> operations to take the key out of watermark calculations, that is that we
> can release some space.
> I don’t think this is possible without exposing API for the UDF to signal
> there will be no more data for a specific key. We could detect idleness of
> a key at the source operator, but without any help from user logic,
> essentially it can only be seen as "temporarily idle", which is not helpful
> in reducing the state as the watermark state for that key still needs to be
> kept downstream.
>
> So to achieve this, I think the only option would be to expose new APIs
> here too.
>
> It’s like how we recently exposed a new `markAsTemporarilyIdle` method in
> the SourceFunction.SourceContext interface, but instead a
> `markKeyTerminated` that must be called by the source UDF to be able to
> save state space and have no feasible fallback detection strategy.
>
> DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor);
> input
> .map()
> .window(...) // notice that we don't need keyBy because it is implicit
> .reduce(...)
> .map(...)
> .window(...)
> ...
>
> Would this mean that another `keyBy` isn’t allowed downstream? Or still
> allowed, but we’re using the keys in `DeluxeKeyedStream` as the “meta key”
> to track key lineage?
>
> On February 27, 2017 at 9:37:27 PM, Aljoscha Krettek (aljoscha@apache.org)
> wrote:
>
> This is indeed an interesting topic, thanks for starting the discussion,
> Jamie!
>
> I now thought about this for a while, since more and more people seem to be
> asking about it lately. First, I thought that per-key watermark handling
> would not be necessary because it can be done locally (as Paris suggested),
> then I realised that that's not actually the case and thought that this
> wouldn't be possible. In the end, I came to realise that it is indeed
> possible (with some caveats), although with a huge overhead in the amount
> of state that we have to keep and with changes to our API. I'll try and
> walk you through my thought process.
>
> Let's first look at local watermark tracking, that is, tracking the
> watermark locally at the operator that needs it, for example a
> WindowOperator. I initially thought that this would be sufficient. Assume
> we have a pipeline like this:
>
> Source -> KeyBy -> WindowOperator -> ...
>
> If we have parallelism=1, then all elements for a given key k will be read
> by the same source operator instance and they will arrive (in-order) at the
> WindowOperator. It doesn't matter whether we track the per-key watermarks
> at the Source or at the WindowOperator because we see the same elements in
> the same order at each operator, per key.
>
> Now, think about this pipeline:
>
> Source1 --+
> |-> Union -> KeyBy -> WindowOperator -> ...
> Source2 --+
>
> (you can either think about two sources or once source that has several
> parallel instances, i.e. parallelism > 1)
>
> Here, both Source1 and Source2 can emit elements with our key k. If Source1
> is faster than Source2 and the watermarking logic at the WindowOperator
> determines the watermark based on the incoming element timestamps (for
> example, using the BoundedLatenessTimestampExtractor) then the elements
> coming from Source2 will be considered late at the WindowOperator.
>
> From this we know that our WindowOperator needs to calculate the watermark
> similarly to how watermark calculation currently happens in Flink: the
> watermark is the minimum of the watermark of all upstream operations. In
> this case it would be: the minimum upstream watermarks of operations that
> emit elements with key k. For per-partition watermarks this works because
> the number of upstream operations is know and we simply keep an array that
> has the current upstream watermark for each input operation. For per-key
> watermarks this would mean that we have to keep k*u upstream watermarks
> where u is the number of upstream operations. This can be quite large.
> Another problem is that the observed keys change, i.e. the key space is
> evolving and we need to retire keys from our calculations lest we run out
> of space.
>
> We could find a solution based on a feature we recently introduced in
> Flink: https://github.com/apache/flink/pull/2801. The sources keep track
> of
> whether they have input and signal to downstream operations whether they
> should be included in the watermark calculation logic. A similar thing
> could be done per-key, where each source signals to downstream operations
> that there is a new key and that we should start calculating watermarks for
> this. When a source determines that no more data will come for a key (which
> in itself is a bit of a tricky problem) then it should signal to downstream
> operations to take the key out of watermark calculations, that is that we
> can release some space.
>
> The above is analysing, on a purely technical level, the feasibility of
> such a feature. I think it is feasible but can be very expensive in terms
> of state size requirements. Gabor also pointed this out above and gave a
> few suggestions on reducing that size.
>
> We would also need to change our API to allow tracking the lineage of keys
> or to enforce that a key stays the same throughout a pipeline. Consider
> this pipeline:
>
> Source -> KeyBy1 -> WindowOperator -> KeyBy2 -> WindowOperator
>
> where KeyBy1 and KeyBy2 extract a different key, respectively. How would
> watermarks be tracked across this change of keys? Would we know which of
> the prior keys and up being keys according to KeyBy2, i.e. do we have some
> kind of key lineage information?
>
> One approach for solving this would be to introduce a new API that allows
> extracting a key at the source and will keep this key on the elements until
> the sink. For example:
>
> DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor);
> input
> .map()
> .window(...) // notice that we don't need keyBy because it is implicit
> .reduce(...)
> .map(...)
> .window(...)
> ...
>
> The DeluxeKeyedStream (name preliminary ;-) would allow the operations that
> we today have on KeyedStream and on DataStream and it would always maintain
> the key that was assigned at the sources. The result of each operation
> would again be a DeluxeKeyedStream. This way, we could track watermarks per
> key.
>
> I know it's a bit of a (very) lengthy mail, but what do you think?
>
>
> On Thu, 23 Feb 2017 at 11:14 Gábor Hermann <ma...@gaborhermann.com> wrote:
>
> > Hey all,
> >
> > Let me share some ideas about this.
> >
> > @Paris: The local-only progress tracking indeed seems easier, we do not
> > need to broadcast anything. Implementation-wise it is easier, but
> > performance-wise probably not. If one key can come from multiple
> > sources, there could be a lot more network overhead with per-key
> > tracking then broadcasting, somewhat paradoxically. Say source instance
> > S1 sends messages and watermarks to operator instances O1, O2. In the
> > broadcasting case, S1 would send one message to O1 and one to O2 per
> > watermark (of course it depends on how fast the watermarks arrive),
> > total of 2. Although, if we keep track of per-key watermarks, S1 would
> > need to send watermarks for every key directed to O1, also for O2. So if
> > 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if watermarks
> > arrive at the same rate per-key as per-source in the previous case) we
> > S1 would send a total of 20 watermarks.
> >
> > Another question is whether how large the state-per-key is? If it's
> > really small (an integer maybe, or state of a small state machine), then
> > the overhead of keeping track of a (Long) watermark is large
> > memory-wise. E.g. Int state vs. Long watermark results in 3x as large
> > state. Also, the checkpointing would be ~3x as slow. Of course, for
> > large states a Long watermark would not mean much overhead.
> >
> > We could resolve the memory issue by using some kind of sketch data
> > structure. Right now the granularity of watermark handling is
> > per-operator-instance. On the other hand, per-key granularity might be
> > costly. What if we increased the granularity of watermarks inside an
> > operator by keeping more than one watermark tracker in one operator?
> > This could be quite simply done with a hash table. With a hash table of
> > size 1, we would yield the current semantics (per-operator-instance
> > granularity). With a hash table large enough to have at most one key per
> > bucket, we would yield per-key watermark tracking. In between lies the
> > trade-off between handling time-skew and a lot of memory overhead. This
> > does not seem hard to implement.
> >
> > Of course, at some point we would still need to take care of watermarks
> > per-key. Imagine that keys A and B would go to the same bucket of the
> > hash table, and watermarks are coming in like this: (B,20), (A,10),
> > (A,15), (A,40). Then the watermark of the bucket should be the minimum
> > as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of
> > the watermarks of A and B separately. But after we have a correct
> > watermark for the bucket, all we need to care about is the bucket
> > watermarks. So somewhere (most probably at the source) we would have to
> > pay memory overhead of tracking every key, but nowhere else in the
> > topology.
> >
> > Regarding the potentially large network overhead, the same compression
> > could be useful. I.e. we would not send watermarks from one operator
> > per-key, but rather per-hash. Again, the trade-off between time skew and
> > memory consumption is configurable by the size of the hash table used.
> >
> > Cheers,
> > Gabor
> >
> > On 2017-02-23 08:57, Paris Carbone wrote:
> >
> > > Hey Jamie!
> > >
> > > Key-based progress tracking sounds like local-only progress tracking to
> > me, there is no need to use a low watermarking mechanism at all since all
> > streams of a key are handled by a single partition at a time (per
> operator).
> > > Thus, this could be much easier to implement and support (i.e., no need
> > to broadcast the progress state of each partition all the time).
> > > State-wise it should be fine too if it is backed by rocksdb, especially
> > if we have MapState in the future.
> > >
> > > Just my quick thoughts on this, to get the discussion going :)
> > >
> > > cheers
> > > Paris
> > >
> > >> On 23 Feb 2017, at 01:01, Jamie Grier <ja...@data-artisans.com>
> wrote:
> > >>
> > >> Hi Flink Devs,
> > >>
> > >> Use cases that I see quite frequently in the real world would benefit
> > from
> > >> a different watermarking / event time model than the one currently
> > >> implemented in Flink.
> > >>
> > >> I would call Flink's current approach partition-based watermarking or
> > maybe
> > >> subtask-based watermarking. In this model the current "event time" is
> a
> > >> property local to each subtask instance in a dataflow graph. The event
> > >> time at any subtask is the minimum of the watermarks it has received
> on
> > >> each of it's input streams.
> > >>
> > >> There are a couple of issues with this model that are not optimal for
> > some
> > >> (maybe many) use cases.
> > >>
> > >> 1) A single slow subtask (or say source partition) anywhere in the
> > dataflow
> > >> can mean no progress can be made on the computation at all.
> > >>
> > >> 2) In many real world scenarios the time skew across keys can be
> *many*
> > >> times greater than the time skew within the data with the same key.
> > >>
> > >> In this discussion I'll use "time skew" to refer to the
> out-of-orderness
> > >> with respect to timestamp of the data. Out-of-orderness is a mouthful
> > ;)
> > >>
> > >> Anyway, let me provide an example or two.
> > >>
> > >> In IoT applications the source of events is a particular device out in
> > the
> > >> world, let's say a device in a connected car application. The data for
> > >> some particular device may be very bursty and we will certainly get
> > events
> > >> from these devices in Flink out-of-order just because of things like
> > >> partitions in Kafka, shuffles in Flink, etc. However, the time skew in
> > the
> > >> data for a single device should likely be very small (milliseconds or
> > maybe
> > >> seconds)..
> > >>
> > >> However, in the same application the time skew across different
> devices
> > can
> > >> be huge (hours or even days). An obvious example of this, again using
> > >> connected cars as a representative example is the following: Car A is
> > >> recording data locally at 12:00 pm on Saturday but doesn't currently
> > have a
> > >> network connection. Car B is doing the same thing but does have a
> > network
> > >> connection. Car A will transmit it's data when the network comes back
> > on
> > >> line. Let's say this is at 4pm. Car B was transmitting it's data
> > >> immediately. This creates a huge time skew (4 hours) in the observed
> > >> datastream when looked at as a whole. However, the time skew in that
> > data
> > >> for Car A or Car B alone could be tiny. It will be out of order of
> > course
> > >> but maybe by only milliseconds or seconds.
> > >>
> > >> What the above means in the end for Flink is that the watermarks must
> be
> > >> delayed by up to 4 hours or more because we're looking at the data
> > stream
> > >> as a whole -- otherwise the data for Car A will be considered late.
> The
> > >> time skew in the data stream when looked at as a whole is large even
> > though
> > >> the time skew for any key may be tiny.
> > >>
> > >> This is the problem I would like to see a solution for. The basic idea
> > of
> > >> keeping track of watermarks and event time "per-key" rather than per
> > >> partition or subtask would solve I think both of these problems stated
> > >> above and both of these are real issues for production applications.
> > >>
> > >> The obvious downside of trying to do this per-key is that the amount
> of
> > >> state you have to track is much larger and potentially unbounded.
> > However,
> > >> I could see this approach working if the keyspace isn't growing
> rapidly
> > but
> > >> is stable or grows slowly. The saving grace here is that this may
> > actually
> > >> be true of the types of applications where this would be especially
> > >> useful. Think IoT use cases. Another approach to keeping state size in
> > >> check would be a configurable TTL for a key.
> > >>
> > >> Anyway, I'm throwing this out here on the mailing list in case anyone
> is
> > >> interested in this discussion, has thought about the problem deeply
> > >> already, has use cases of their own they've run into or has ideas for
> a
> > >> solution to this problem.
> > >>
> > >> Thanks for reading..
> > >>
> > >> -Jamie
> > >>
> > >>
> > >> --
> > >>
> > >> Jamie Grier
> > >> data Artisans, Director of Applications Engineering
> > >> @jamiegrier <https://twitter.com/jamiegrier>
> > >> jamie@data-artisans.com
> > >
> >
> >
>

Re: [DISCUSS] Per-key event time

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Throwing in some thoughts:

When a source determines that no more data will come for a key (which 
in itself is a bit of a tricky problem) then it should signal to downstream 
operations to take the key out of watermark calculations, that is that we 
can release some space. 
I don’t think this is possible without exposing API for the UDF to signal there will be no more data for a specific key. We could detect idleness of a key at the source operator, but without any help from user logic, essentially it can only be seen as "temporarily idle", which is not helpful in reducing the state as the watermark state for that key still needs to be kept downstream.

So to achieve this, I think the only option would be to expose new APIs here too.

It’s like how we recently exposed a new `markAsTemporarilyIdle` method in the SourceFunction.SourceContext interface, but instead a `markKeyTerminated` that must be called by the source UDF to be able to save state space and have no feasible fallback detection strategy.

DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor); 
input 
.map() 
.window(...) // notice that we don't need keyBy because it is implicit 
.reduce(...) 
.map(...) 
.window(...) 
... 

Would this mean that another `keyBy` isn’t allowed downstream? Or still allowed, but we’re using the keys in `DeluxeKeyedStream` as the “meta key” to track key lineage?

On February 27, 2017 at 9:37:27 PM, Aljoscha Krettek (aljoscha@apache.org) wrote:

This is indeed an interesting topic, thanks for starting the discussion,  
Jamie!  

I now thought about this for a while, since more and more people seem to be  
asking about it lately. First, I thought that per-key watermark handling  
would not be necessary because it can be done locally (as Paris suggested),  
then I realised that that's not actually the case and thought that this  
wouldn't be possible. In the end, I came to realise that it is indeed  
possible (with some caveats), although with a huge overhead in the amount  
of state that we have to keep and with changes to our API. I'll try and  
walk you through my thought process.  

Let's first look at local watermark tracking, that is, tracking the  
watermark locally at the operator that needs it, for example a  
WindowOperator. I initially thought that this would be sufficient. Assume  
we have a pipeline like this:  

Source -> KeyBy -> WindowOperator -> ...  

If we have parallelism=1, then all elements for a given key k will be read  
by the same source operator instance and they will arrive (in-order) at the  
WindowOperator. It doesn't matter whether we track the per-key watermarks  
at the Source or at the WindowOperator because we see the same elements in  
the same order at each operator, per key.  

Now, think about this pipeline:  

Source1 --+  
|-> Union -> KeyBy -> WindowOperator -> ...  
Source2 --+  

(you can either think about two sources or once source that has several  
parallel instances, i.e. parallelism > 1)  

Here, both Source1 and Source2 can emit elements with our key k. If Source1  
is faster than Source2 and the watermarking logic at the WindowOperator  
determines the watermark based on the incoming element timestamps (for  
example, using the BoundedLatenessTimestampExtractor) then the elements  
coming from Source2 will be considered late at the WindowOperator.  

From this we know that our WindowOperator needs to calculate the watermark  
similarly to how watermark calculation currently happens in Flink: the  
watermark is the minimum of the watermark of all upstream operations. In  
this case it would be: the minimum upstream watermarks of operations that  
emit elements with key k. For per-partition watermarks this works because  
the number of upstream operations is know and we simply keep an array that  
has the current upstream watermark for each input operation. For per-key  
watermarks this would mean that we have to keep k*u upstream watermarks  
where u is the number of upstream operations. This can be quite large.  
Another problem is that the observed keys change, i.e. the key space is  
evolving and we need to retire keys from our calculations lest we run out  
of space.  

We could find a solution based on a feature we recently introduced in  
Flink: https://github.com/apache/flink/pull/2801. The sources keep track of  
whether they have input and signal to downstream operations whether they  
should be included in the watermark calculation logic. A similar thing  
could be done per-key, where each source signals to downstream operations  
that there is a new key and that we should start calculating watermarks for  
this. When a source determines that no more data will come for a key (which  
in itself is a bit of a tricky problem) then it should signal to downstream  
operations to take the key out of watermark calculations, that is that we  
can release some space.  

The above is analysing, on a purely technical level, the feasibility of  
such a feature. I think it is feasible but can be very expensive in terms  
of state size requirements. Gabor also pointed this out above and gave a  
few suggestions on reducing that size.  

We would also need to change our API to allow tracking the lineage of keys  
or to enforce that a key stays the same throughout a pipeline. Consider  
this pipeline:  

Source -> KeyBy1 -> WindowOperator -> KeyBy2 -> WindowOperator  

where KeyBy1 and KeyBy2 extract a different key, respectively. How would  
watermarks be tracked across this change of keys? Would we know which of  
the prior keys and up being keys according to KeyBy2, i.e. do we have some  
kind of key lineage information?  

One approach for solving this would be to introduce a new API that allows  
extracting a key at the source and will keep this key on the elements until  
the sink. For example:  

DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor);  
input  
.map()  
.window(...) // notice that we don't need keyBy because it is implicit  
.reduce(...)  
.map(...)  
.window(...)  
...  

The DeluxeKeyedStream (name preliminary ;-) would allow the operations that  
we today have on KeyedStream and on DataStream and it would always maintain  
the key that was assigned at the sources. The result of each operation  
would again be a DeluxeKeyedStream. This way, we could track watermarks per  
key.  

I know it's a bit of a (very) lengthy mail, but what do you think?  


On Thu, 23 Feb 2017 at 11:14 Gábor Hermann <ma...@gaborhermann.com> wrote:  

> Hey all,  
>  
> Let me share some ideas about this.  
>  
> @Paris: The local-only progress tracking indeed seems easier, we do not  
> need to broadcast anything. Implementation-wise it is easier, but  
> performance-wise probably not. If one key can come from multiple  
> sources, there could be a lot more network overhead with per-key  
> tracking then broadcasting, somewhat paradoxically. Say source instance  
> S1 sends messages and watermarks to operator instances O1, O2. In the  
> broadcasting case, S1 would send one message to O1 and one to O2 per  
> watermark (of course it depends on how fast the watermarks arrive),  
> total of 2. Although, if we keep track of per-key watermarks, S1 would  
> need to send watermarks for every key directed to O1, also for O2. So if  
> 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if watermarks  
> arrive at the same rate per-key as per-source in the previous case) we  
> S1 would send a total of 20 watermarks.  
>  
> Another question is whether how large the state-per-key is? If it's  
> really small (an integer maybe, or state of a small state machine), then  
> the overhead of keeping track of a (Long) watermark is large  
> memory-wise. E.g. Int state vs. Long watermark results in 3x as large  
> state. Also, the checkpointing would be ~3x as slow. Of course, for  
> large states a Long watermark would not mean much overhead.  
>  
> We could resolve the memory issue by using some kind of sketch data  
> structure. Right now the granularity of watermark handling is  
> per-operator-instance. On the other hand, per-key granularity might be  
> costly. What if we increased the granularity of watermarks inside an  
> operator by keeping more than one watermark tracker in one operator?  
> This could be quite simply done with a hash table. With a hash table of  
> size 1, we would yield the current semantics (per-operator-instance  
> granularity). With a hash table large enough to have at most one key per  
> bucket, we would yield per-key watermark tracking. In between lies the  
> trade-off between handling time-skew and a lot of memory overhead. This  
> does not seem hard to implement.  
>  
> Of course, at some point we would still need to take care of watermarks  
> per-key. Imagine that keys A and B would go to the same bucket of the  
> hash table, and watermarks are coming in like this: (B,20), (A,10),  
> (A,15), (A,40). Then the watermark of the bucket should be the minimum  
> as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of  
> the watermarks of A and B separately. But after we have a correct  
> watermark for the bucket, all we need to care about is the bucket  
> watermarks. So somewhere (most probably at the source) we would have to  
> pay memory overhead of tracking every key, but nowhere else in the  
> topology.  
>  
> Regarding the potentially large network overhead, the same compression  
> could be useful. I.e. we would not send watermarks from one operator  
> per-key, but rather per-hash. Again, the trade-off between time skew and  
> memory consumption is configurable by the size of the hash table used.  
>  
> Cheers,  
> Gabor  
>  
> On 2017-02-23 08:57, Paris Carbone wrote:  
>  
> > Hey Jamie!  
> >  
> > Key-based progress tracking sounds like local-only progress tracking to  
> me, there is no need to use a low watermarking mechanism at all since all  
> streams of a key are handled by a single partition at a time (per operator).  
> > Thus, this could be much easier to implement and support (i.e., no need  
> to broadcast the progress state of each partition all the time).  
> > State-wise it should be fine too if it is backed by rocksdb, especially  
> if we have MapState in the future.  
> >  
> > Just my quick thoughts on this, to get the discussion going :)  
> >  
> > cheers  
> > Paris  
> >  
> >> On 23 Feb 2017, at 01:01, Jamie Grier <ja...@data-artisans.com> wrote:  
> >>  
> >> Hi Flink Devs,  
> >>  
> >> Use cases that I see quite frequently in the real world would benefit  
> from  
> >> a different watermarking / event time model than the one currently  
> >> implemented in Flink.  
> >>  
> >> I would call Flink's current approach partition-based watermarking or  
> maybe  
> >> subtask-based watermarking. In this model the current "event time" is a  
> >> property local to each subtask instance in a dataflow graph. The event  
> >> time at any subtask is the minimum of the watermarks it has received on  
> >> each of it's input streams.  
> >>  
> >> There are a couple of issues with this model that are not optimal for  
> some  
> >> (maybe many) use cases.  
> >>  
> >> 1) A single slow subtask (or say source partition) anywhere in the  
> dataflow  
> >> can mean no progress can be made on the computation at all.  
> >>  
> >> 2) In many real world scenarios the time skew across keys can be *many*  
> >> times greater than the time skew within the data with the same key.  
> >>  
> >> In this discussion I'll use "time skew" to refer to the out-of-orderness  
> >> with respect to timestamp of the data. Out-of-orderness is a mouthful  
> ;)  
> >>  
> >> Anyway, let me provide an example or two.  
> >>  
> >> In IoT applications the source of events is a particular device out in  
> the  
> >> world, let's say a device in a connected car application. The data for  
> >> some particular device may be very bursty and we will certainly get  
> events  
> >> from these devices in Flink out-of-order just because of things like  
> >> partitions in Kafka, shuffles in Flink, etc. However, the time skew in  
> the  
> >> data for a single device should likely be very small (milliseconds or  
> maybe  
> >> seconds)..  
> >>  
> >> However, in the same application the time skew across different devices  
> can  
> >> be huge (hours or even days). An obvious example of this, again using  
> >> connected cars as a representative example is the following: Car A is  
> >> recording data locally at 12:00 pm on Saturday but doesn't currently  
> have a  
> >> network connection. Car B is doing the same thing but does have a  
> network  
> >> connection. Car A will transmit it's data when the network comes back  
> on  
> >> line. Let's say this is at 4pm. Car B was transmitting it's data  
> >> immediately. This creates a huge time skew (4 hours) in the observed  
> >> datastream when looked at as a whole. However, the time skew in that  
> data  
> >> for Car A or Car B alone could be tiny. It will be out of order of  
> course  
> >> but maybe by only milliseconds or seconds.  
> >>  
> >> What the above means in the end for Flink is that the watermarks must be  
> >> delayed by up to 4 hours or more because we're looking at the data  
> stream  
> >> as a whole -- otherwise the data for Car A will be considered late. The  
> >> time skew in the data stream when looked at as a whole is large even  
> though  
> >> the time skew for any key may be tiny.  
> >>  
> >> This is the problem I would like to see a solution for. The basic idea  
> of  
> >> keeping track of watermarks and event time "per-key" rather than per  
> >> partition or subtask would solve I think both of these problems stated  
> >> above and both of these are real issues for production applications.  
> >>  
> >> The obvious downside of trying to do this per-key is that the amount of  
> >> state you have to track is much larger and potentially unbounded.  
> However,  
> >> I could see this approach working if the keyspace isn't growing rapidly  
> but  
> >> is stable or grows slowly. The saving grace here is that this may  
> actually  
> >> be true of the types of applications where this would be especially  
> >> useful. Think IoT use cases. Another approach to keeping state size in  
> >> check would be a configurable TTL for a key.  
> >>  
> >> Anyway, I'm throwing this out here on the mailing list in case anyone is  
> >> interested in this discussion, has thought about the problem deeply  
> >> already, has use cases of their own they've run into or has ideas for a  
> >> solution to this problem.  
> >>  
> >> Thanks for reading..  
> >>  
> >> -Jamie  
> >>  
> >>  
> >> --  
> >>  
> >> Jamie Grier  
> >> data Artisans, Director of Applications Engineering  
> >> @jamiegrier <https://twitter.com/jamiegrier>  
> >> jamie@data-artisans.com  
> >  
>  
>  

Re: [DISCUSS] Per-key event time

Posted by Aljoscha Krettek <al...@apache.org>.
This is indeed an interesting topic, thanks for starting the discussion,
Jamie!

I now thought about this for a while, since more and more people seem to be
asking about it lately. First, I thought that per-key watermark handling
would not be necessary because it can be done locally (as Paris suggested),
then I realised that that's not actually the case and thought that this
wouldn't be possible. In the end, I came to realise that it is indeed
possible (with some caveats), although with a huge overhead in the amount
of state that we have to keep and with changes to our API. I'll try and
walk you through my thought process.

Let's first look at local watermark tracking, that is, tracking the
watermark locally at the operator that needs it, for example a
WindowOperator. I initially thought that this would be sufficient. Assume
we have a pipeline like this:

Source -> KeyBy -> WindowOperator -> ...

If we have parallelism=1, then all elements for a given key k will be read
by the same source operator instance and they will arrive (in-order) at the
WindowOperator. It doesn't matter whether we track the per-key watermarks
at the Source or at the WindowOperator because we see the same elements in
the same order at each operator, per key.

Now, think about this pipeline:

Source1 --+
          |-> Union -> KeyBy -> WindowOperator -> ...
Source2 --+

(you can either think about two sources or once source that has several
parallel instances, i.e. parallelism > 1)

Here, both Source1 and Source2 can emit elements with our key k. If Source1
is faster than Source2 and the watermarking logic at the WindowOperator
determines the watermark based on the incoming element timestamps (for
example, using the BoundedLatenessTimestampExtractor) then the elements
coming from Source2 will be considered late at the WindowOperator.

From this we know that our WindowOperator needs to calculate the watermark
similarly to how watermark calculation currently happens in Flink: the
watermark is the minimum of the watermark of all upstream operations. In
this case it would be: the minimum upstream watermarks of operations that
emit elements with key k. For per-partition watermarks this works because
the number of upstream operations is know and we simply keep an array that
has the current upstream watermark for each input operation. For per-key
watermarks this would mean that we have to keep k*u upstream watermarks
where u is the number of upstream operations. This can be quite large.
Another problem is that the observed keys change, i.e. the key space is
evolving and we need to retire keys from our calculations lest we run out
of space.

We could find a solution based on a feature we recently introduced in
Flink: https://github.com/apache/flink/pull/2801. The sources keep track of
whether they have input and signal to downstream operations whether they
should be included in the watermark calculation logic. A similar thing
could be done per-key, where each source signals to downstream operations
that there is a new key and that we should start calculating watermarks for
this. When a source determines that no more data will come for a key (which
in itself is a bit of a tricky problem) then it should signal to downstream
operations to take the key out of watermark calculations, that is that we
can release some space.

The above is analysing, on a purely technical level, the feasibility of
such a feature. I think it is feasible but can be very expensive in terms
of state size requirements. Gabor also pointed this out above and gave a
few suggestions on reducing that size.

We would also need to change our API to allow tracking the lineage of keys
or to enforce that a key stays the same throughout a pipeline. Consider
this pipeline:

Source -> KeyBy1 -> WindowOperator -> KeyBy2 -> WindowOperator

where KeyBy1 and KeyBy2 extract a different key, respectively. How would
watermarks be tracked across this change of keys? Would we know which of
the prior keys and up being keys according to KeyBy2, i.e. do we have some
kind of key lineage information?

One approach for solving this would be to introduce a new API that allows
extracting a key at the source and will keep this key on the elements until
the sink. For example:

DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor);
input
  .map()
  .window(...) // notice that we don't need keyBy because it is implicit
  .reduce(...)
  .map(...)
  .window(...)
  ...

The DeluxeKeyedStream (name preliminary ;-) would allow the operations that
we today have on KeyedStream and on DataStream and it would always maintain
the key that was assigned at the sources. The result of each operation
would again be a DeluxeKeyedStream. This way, we could track watermarks per
key.

I know it's a bit of a (very) lengthy mail, but what do you think?


On Thu, 23 Feb 2017 at 11:14 Gábor Hermann <ma...@gaborhermann.com> wrote:

> Hey all,
>
> Let me share some ideas about this.
>
> @Paris: The local-only progress tracking indeed seems easier, we do not
> need to broadcast anything. Implementation-wise it is easier, but
> performance-wise probably not. If one key can come from multiple
> sources, there could be a lot more network overhead with per-key
> tracking then broadcasting, somewhat paradoxically. Say source instance
> S1 sends messages and watermarks to operator instances O1, O2. In the
> broadcasting case, S1 would send one message to O1 and one to O2 per
> watermark (of course it depends on how fast the watermarks arrive),
> total of 2. Although, if we keep track of per-key watermarks, S1 would
> need to send watermarks for every key directed to O1, also for O2. So if
> 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if watermarks
> arrive at the same rate per-key as per-source in the previous case) we
> S1 would send a total of 20 watermarks.
>
> Another question is whether how large the state-per-key is? If it's
> really small (an integer maybe, or state of a small state machine), then
> the overhead of keeping track of a (Long) watermark is large
> memory-wise. E.g. Int state vs. Long watermark results in 3x as large
> state. Also, the checkpointing would be ~3x as slow. Of course, for
> large states a Long watermark would not mean much overhead.
>
> We could resolve the memory issue by using some kind of sketch data
> structure. Right now the granularity of watermark handling is
> per-operator-instance. On the other hand, per-key granularity might be
> costly. What if we increased the granularity of watermarks inside an
> operator by keeping more than one watermark tracker in one operator?
> This could be quite simply done with a hash table. With a hash table of
> size 1, we would yield the current semantics (per-operator-instance
> granularity). With a hash table large enough to have at most one key per
> bucket, we would yield per-key watermark tracking. In between lies the
> trade-off between handling time-skew and a lot of memory overhead. This
> does not seem hard to implement.
>
> Of course, at some point we would still need to take care of watermarks
> per-key. Imagine that keys A and B would go to the same bucket of the
> hash table, and watermarks are coming in like this: (B,20), (A,10),
> (A,15), (A,40). Then the watermark of the bucket should be the minimum
> as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of
> the watermarks of A and B separately. But after we have a correct
> watermark for the bucket, all we need to care about is the bucket
> watermarks. So somewhere (most probably at the source) we would have to
> pay memory overhead of tracking every key, but nowhere else in the
> topology.
>
> Regarding the potentially large network overhead, the same compression
> could be useful. I.e. we would not send watermarks from one operator
> per-key, but rather per-hash. Again, the trade-off between time skew and
> memory consumption is configurable by the size of the hash table used.
>
> Cheers,
> Gabor
>
> On 2017-02-23 08:57, Paris Carbone wrote:
>
> > Hey Jamie!
> >
> > Key-based progress tracking sounds like local-only progress tracking to
> me, there is no need to use a low watermarking mechanism at all since all
> streams of a key are handled by a single partition at a time (per operator).
> > Thus, this could be much easier to implement and support (i.e., no need
> to broadcast the progress state of each partition all the time).
> > State-wise it should be fine too if it is backed by rocksdb, especially
> if we have MapState in the future.
> >
> > Just my quick thoughts on this, to get the discussion going :)
> >
> > cheers
> > Paris
> >
> >> On 23 Feb 2017, at 01:01, Jamie Grier <ja...@data-artisans.com> wrote:
> >>
> >> Hi Flink Devs,
> >>
> >> Use cases that I see quite frequently in the real world would benefit
> from
> >> a different watermarking / event time model than the one currently
> >> implemented in Flink.
> >>
> >> I would call Flink's current approach partition-based watermarking or
> maybe
> >> subtask-based watermarking.  In this model the current "event time" is a
> >> property local to each subtask instance in a dataflow graph.  The event
> >> time at any subtask is the minimum of the watermarks it has received on
> >> each of it's input streams.
> >>
> >> There are a couple of issues with this model that are not optimal for
> some
> >> (maybe many) use cases.
> >>
> >> 1) A single slow subtask (or say source partition) anywhere in the
> dataflow
> >> can mean no progress can be made on the computation at all.
> >>
> >> 2) In many real world scenarios the time skew across keys can be *many*
> >> times greater than the time skew within the data with the same key.
> >>
> >> In this discussion I'll use "time skew" to refer to the out-of-orderness
> >> with respect to timestamp of the data.  Out-of-orderness is a mouthful
> ;)
> >>
> >> Anyway, let me provide an example or two.
> >>
> >> In IoT applications the source of events is a particular device out in
> the
> >> world, let's say a device in a connected car application.  The data for
> >> some particular device may be very bursty and we will certainly get
> events
> >> from these devices in Flink out-of-order just because of things like
> >> partitions in Kafka, shuffles in Flink, etc.  However, the time skew in
> the
> >> data for a single device should likely be very small (milliseconds or
> maybe
> >> seconds)..
> >>
> >> However, in the same application the time skew across different devices
> can
> >> be huge (hours or even days).  An obvious example of this, again using
> >> connected cars as a representative example is the following:  Car A is
> >> recording data locally at 12:00 pm on Saturday but doesn't currently
> have a
> >> network connection.  Car B is doing the same thing but does have a
> network
> >> connection.  Car A will transmit it's data when the network comes back
> on
> >> line.  Let's say this is at 4pm.  Car B was transmitting it's data
> >> immediately.  This creates a huge time skew (4 hours) in the observed
> >> datastream when looked at as a whole.  However, the time skew in that
> data
> >> for Car A or Car B alone could be tiny.  It will be out of order of
> course
> >> but maybe by only milliseconds or seconds.
> >>
> >> What the above means in the end for Flink is that the watermarks must be
> >> delayed by up to 4 hours or more because we're looking at the data
> stream
> >> as a whole -- otherwise the data for Car A will be considered late.  The
> >> time skew in the data stream when looked at as a whole is large even
> though
> >> the time skew for any key may be tiny.
> >>
> >> This is the problem I would like to see a solution for.  The basic idea
> of
> >> keeping track of watermarks and event time "per-key" rather than per
> >> partition or subtask would solve I think both of these problems stated
> >> above and both of these are real issues for production applications.
> >>
> >> The obvious downside of trying to do this per-key is that the amount of
> >> state you have to track is much larger and potentially unbounded.
> However,
> >> I could see this approach working if the keyspace isn't growing rapidly
> but
> >> is stable or grows slowly.  The saving grace here is that this may
> actually
> >> be true of the types of applications where this would be especially
> >> useful.  Think IoT use cases.  Another approach to keeping state size in
> >> check would be a configurable TTL for a key.
> >>
> >> Anyway, I'm throwing this out here on the mailing list in case anyone is
> >> interested in this discussion, has thought about the problem deeply
> >> already, has use cases of their own they've run into or has ideas for a
> >> solution to this problem.
> >>
> >> Thanks for reading..
> >>
> >> -Jamie
> >>
> >>
> >> --
> >>
> >> Jamie Grier
> >> data Artisans, Director of Applications Engineering
> >> @jamiegrier <https://twitter.com/jamiegrier>
> >> jamie@data-artisans.com
> >
>
>

Re: [DISCUSS] Per-key event time

Posted by Gábor Hermann <ma...@gaborhermann.com>.
Hey all,

Let me share some ideas about this.

@Paris: The local-only progress tracking indeed seems easier, we do not 
need to broadcast anything. Implementation-wise it is easier, but 
performance-wise probably not. If one key can come from multiple 
sources, there could be a lot more network overhead with per-key 
tracking then broadcasting, somewhat paradoxically. Say source instance 
S1 sends messages and watermarks to operator instances O1, O2. In the 
broadcasting case, S1 would send one message to O1 and one to O2 per 
watermark (of course it depends on how fast the watermarks arrive), 
total of 2. Although, if we keep track of per-key watermarks, S1 would 
need to send watermarks for every key directed to O1, also for O2. So if 
10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if watermarks 
arrive at the same rate per-key as per-source in the previous case) we 
S1 would send a total of 20 watermarks.

Another question is whether how large the state-per-key is? If it's 
really small (an integer maybe, or state of a small state machine), then 
the overhead of keeping track of a (Long) watermark is large 
memory-wise. E.g. Int state vs. Long watermark results in 3x as large 
state. Also, the checkpointing would be ~3x as slow. Of course, for 
large states a Long watermark would not mean much overhead.

We could resolve the memory issue by using some kind of sketch data 
structure. Right now the granularity of watermark handling is 
per-operator-instance. On the other hand, per-key granularity might be 
costly. What if we increased the granularity of watermarks inside an 
operator by keeping more than one watermark tracker in one operator? 
This could be quite simply done with a hash table. With a hash table of 
size 1, we would yield the current semantics (per-operator-instance 
granularity). With a hash table large enough to have at most one key per 
bucket, we would yield per-key watermark tracking. In between lies the 
trade-off between handling time-skew and a lot of memory overhead. This 
does not seem hard to implement.

Of course, at some point we would still need to take care of watermarks 
per-key. Imagine that keys A and B would go to the same bucket of the 
hash table, and watermarks are coming in like this: (B,20), (A,10), 
(A,15), (A,40). Then the watermark of the bucket should be the minimum 
as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of 
the watermarks of A and B separately. But after we have a correct 
watermark for the bucket, all we need to care about is the bucket 
watermarks. So somewhere (most probably at the source) we would have to 
pay memory overhead of tracking every key, but nowhere else in the topology.

Regarding the potentially large network overhead, the same compression 
could be useful. I.e. we would not send watermarks from one operator 
per-key, but rather per-hash. Again, the trade-off between time skew and 
memory consumption is configurable by the size of the hash table used.

Cheers,
Gabor

On 2017-02-23 08:57, Paris Carbone wrote:

> Hey Jamie!
>
> Key-based progress tracking sounds like local-only progress tracking to me, there is no need to use a low watermarking mechanism at all since all streams of a key are handled by a single partition at a time (per operator).
> Thus, this could be much easier to implement and support (i.e., no need to broadcast the progress state of each partition all the time).
> State-wise it should be fine too if it is backed by rocksdb, especially if we have MapState in the future.
>
> Just my quick thoughts on this, to get the discussion going :)
>
> cheers
> Paris
>
>> On 23 Feb 2017, at 01:01, Jamie Grier <ja...@data-artisans.com> wrote:
>>
>> Hi Flink Devs,
>>
>> Use cases that I see quite frequently in the real world would benefit from
>> a different watermarking / event time model than the one currently
>> implemented in Flink.
>>
>> I would call Flink's current approach partition-based watermarking or maybe
>> subtask-based watermarking.  In this model the current "event time" is a
>> property local to each subtask instance in a dataflow graph.  The event
>> time at any subtask is the minimum of the watermarks it has received on
>> each of it's input streams.
>>
>> There are a couple of issues with this model that are not optimal for some
>> (maybe many) use cases.
>>
>> 1) A single slow subtask (or say source partition) anywhere in the dataflow
>> can mean no progress can be made on the computation at all.
>>
>> 2) In many real world scenarios the time skew across keys can be *many*
>> times greater than the time skew within the data with the same key.
>>
>> In this discussion I'll use "time skew" to refer to the out-of-orderness
>> with respect to timestamp of the data.  Out-of-orderness is a mouthful ;)
>>
>> Anyway, let me provide an example or two.
>>
>> In IoT applications the source of events is a particular device out in the
>> world, let's say a device in a connected car application.  The data for
>> some particular device may be very bursty and we will certainly get events
>> from these devices in Flink out-of-order just because of things like
>> partitions in Kafka, shuffles in Flink, etc.  However, the time skew in the
>> data for a single device should likely be very small (milliseconds or maybe
>> seconds)..
>>
>> However, in the same application the time skew across different devices can
>> be huge (hours or even days).  An obvious example of this, again using
>> connected cars as a representative example is the following:  Car A is
>> recording data locally at 12:00 pm on Saturday but doesn't currently have a
>> network connection.  Car B is doing the same thing but does have a network
>> connection.  Car A will transmit it's data when the network comes back on
>> line.  Let's say this is at 4pm.  Car B was transmitting it's data
>> immediately.  This creates a huge time skew (4 hours) in the observed
>> datastream when looked at as a whole.  However, the time skew in that data
>> for Car A or Car B alone could be tiny.  It will be out of order of course
>> but maybe by only milliseconds or seconds.
>>
>> What the above means in the end for Flink is that the watermarks must be
>> delayed by up to 4 hours or more because we're looking at the data stream
>> as a whole -- otherwise the data for Car A will be considered late.  The
>> time skew in the data stream when looked at as a whole is large even though
>> the time skew for any key may be tiny.
>>
>> This is the problem I would like to see a solution for.  The basic idea of
>> keeping track of watermarks and event time "per-key" rather than per
>> partition or subtask would solve I think both of these problems stated
>> above and both of these are real issues for production applications.
>>
>> The obvious downside of trying to do this per-key is that the amount of
>> state you have to track is much larger and potentially unbounded.  However,
>> I could see this approach working if the keyspace isn't growing rapidly but
>> is stable or grows slowly.  The saving grace here is that this may actually
>> be true of the types of applications where this would be especially
>> useful.  Think IoT use cases.  Another approach to keeping state size in
>> check would be a configurable TTL for a key.
>>
>> Anyway, I'm throwing this out here on the mailing list in case anyone is
>> interested in this discussion, has thought about the problem deeply
>> already, has use cases of their own they've run into or has ideas for a
>> solution to this problem.
>>
>> Thanks for reading..
>>
>> -Jamie
>>
>>
>> -- 
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier <https://twitter.com/jamiegrier>
>> jamie@data-artisans.com
>


Re: [DISCUSS] Per-key event time

Posted by Paris Carbone <pa...@kth.se>.
Hey Jamie!

Key-based progress tracking sounds like local-only progress tracking to me, there is no need to use a low watermarking mechanism at all since all streams of a key are handled by a single partition at a time (per operator).
Thus, this could be much easier to implement and support (i.e., no need to broadcast the progress state of each partition all the time). 
State-wise it should be fine too if it is backed by rocksdb, especially if we have MapState in the future.

Just my quick thoughts on this, to get the discussion going :)

cheers
Paris

> On 23 Feb 2017, at 01:01, Jamie Grier <ja...@data-artisans.com> wrote:
> 
> Hi Flink Devs,
> 
> Use cases that I see quite frequently in the real world would benefit from
> a different watermarking / event time model than the one currently
> implemented in Flink.
> 
> I would call Flink's current approach partition-based watermarking or maybe
> subtask-based watermarking.  In this model the current "event time" is a
> property local to each subtask instance in a dataflow graph.  The event
> time at any subtask is the minimum of the watermarks it has received on
> each of it's input streams.
> 
> There are a couple of issues with this model that are not optimal for some
> (maybe many) use cases.
> 
> 1) A single slow subtask (or say source partition) anywhere in the dataflow
> can mean no progress can be made on the computation at all.
> 
> 2) In many real world scenarios the time skew across keys can be *many*
> times greater than the time skew within the data with the same key.
> 
> In this discussion I'll use "time skew" to refer to the out-of-orderness
> with respect to timestamp of the data.  Out-of-orderness is a mouthful ;)
> 
> Anyway, let me provide an example or two.
> 
> In IoT applications the source of events is a particular device out in the
> world, let's say a device in a connected car application.  The data for
> some particular device may be very bursty and we will certainly get events
> from these devices in Flink out-of-order just because of things like
> partitions in Kafka, shuffles in Flink, etc.  However, the time skew in the
> data for a single device should likely be very small (milliseconds or maybe
> seconds)..
> 
> However, in the same application the time skew across different devices can
> be huge (hours or even days).  An obvious example of this, again using
> connected cars as a representative example is the following:  Car A is
> recording data locally at 12:00 pm on Saturday but doesn't currently have a
> network connection.  Car B is doing the same thing but does have a network
> connection.  Car A will transmit it's data when the network comes back on
> line.  Let's say this is at 4pm.  Car B was transmitting it's data
> immediately.  This creates a huge time skew (4 hours) in the observed
> datastream when looked at as a whole.  However, the time skew in that data
> for Car A or Car B alone could be tiny.  It will be out of order of course
> but maybe by only milliseconds or seconds.
> 
> What the above means in the end for Flink is that the watermarks must be
> delayed by up to 4 hours or more because we're looking at the data stream
> as a whole -- otherwise the data for Car A will be considered late.  The
> time skew in the data stream when looked at as a whole is large even though
> the time skew for any key may be tiny.
> 
> This is the problem I would like to see a solution for.  The basic idea of
> keeping track of watermarks and event time "per-key" rather than per
> partition or subtask would solve I think both of these problems stated
> above and both of these are real issues for production applications.
> 
> The obvious downside of trying to do this per-key is that the amount of
> state you have to track is much larger and potentially unbounded.  However,
> I could see this approach working if the keyspace isn't growing rapidly but
> is stable or grows slowly.  The saving grace here is that this may actually
> be true of the types of applications where this would be especially
> useful.  Think IoT use cases.  Another approach to keeping state size in
> check would be a configurable TTL for a key.
> 
> Anyway, I'm throwing this out here on the mailing list in case anyone is
> interested in this discussion, has thought about the problem deeply
> already, has use cases of their own they've run into or has ideas for a
> solution to this problem.
> 
> Thanks for reading..
> 
> -Jamie
> 
> 
> -- 
> 
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> jamie@data-artisans.com