You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Raman Gupta <ro...@gmail.com> on 2017/01/18 14:39:59 UTC

Window limitations on groupBy

I am investigating Flink. I am considering a relatively simple use
case -- I want to ingest streams of events that are essentially
timestamped state changes. These events may look something like:

{
  sourceId: 111,
  state: OPEN,
  timestamp: <date/time>
}

I want to apply various processing to these state change events, the
output of which can be used for analytics. For example:

1. average time spent in state, by state
2. sources with longest (or shortest) time spent in OPEN state

The time spent in each state may be days or even weeks.

All the examples I have seen of similar logic involve windows on the
order of 15 minutes. Since time spent in each state may far exceed
these window sizes, I'm wondering what the best approach will be.

One thought from reading the docs is to use `every` to operate on the
entire stream. But it seems like this will take longer and longer to
run as the event stream grows, so this is not an ideal solution. Or
does Flink apply some clever optimizations to avoid the potential
performance issue?

Another thought was to split the event stream into multiple streams by
source, each of which will have a small (and limited) amount of data.
This will make processing each stream simpler, but since there can be
thousands of sources, it will result in a lot of streams to handle and
persist (probably in Kafka). This does not seem ideal either.

It seems like this should be simple, but I'm struggling with
understanding how to solve it elegantly.

Regards,
Raman


Re: Operational concerns with state (was Re: Window limitations on groupBy)

Posted by Raman Gupta <ro...@gmail.com>.
Thank you Fabian, the blog articles were very useful. I will continue
experimenting.

On Thu, Jan 19, 2017 at 3:36 PM, Fabian Hueske <fh...@gmail.com> wrote:
> Hi Raman,
>
> Checkpoints are used to recover from task or process failures and usually
> automatically taken at periodic intervals if configured correctly.
> Checkpoints are usually removed when a more recent checkpoint is completed
> (the exact policy can be configured).
>
> Savepoints are used to restart a job that was previously shutdown, to
> migrate a job to another cluster (e.g., when upgrading Flink), updating the
> job itself etc. So more for planned maintenance.
> Nonetheless they can also be used for more coarse-grained fault tolerance
> and it is a common practice to periodically trigger a savepoint.
>
> These blog posts might be helpful to understand the potential of savepoints
> [1] [2].
>
> Best, Fabian
>
> [1] http://data-artisans.com/turning-back-time-savepoints/
> [2] http://data-artisans.com/savepoints-part-2-updating-applications/
>
> 2017-01-19 19:02 GMT+01:00 Raman Gupta <ro...@gmail.com>:
>>
>> I was able to get it working well with the original approach you
>> described. Thanks! Note that the documentation on how to do this with the
>> Java API is... sparse, to say the least. I was able to look at the
>> implementation of the scala flatMapWithState function as a starting point.
>>
>> Now I'm trying to understand all the operational concerns related to the
>> stored state. My checkpoints are in rocksdb configured via the job
>> definition.
>>
>> It seems that the checkpointed state of the streaming job is lost when I
>> stop and restart flink normally, or Flink terminates abnormally and is
>> restarted. I was able to take an explicit savepoint and then restart the job
>> with it.
>>
>> Is the correct approach as of now to take savepoints periodically via
>> cron, and use those to re-run jobs in case of flink failure or restart?
>>
>> Regards,
>> Raman
>>
>> On 19/01/17 05:43 AM, Fabian Hueske wrote:
>>
>> Hi Raman,
>>
>> I think you would need a sliding count window of size 2 with slide 1.
>> This is basically a GlobalWindow with a special trigger.
>>
>> However, you would need to modify the custom trigger to be able to
>> - identify a terminal event (if there is such a thing) or to
>> - close the window after a certain period of inactivity to clean up the
>> state.
>>
>> Best, Fabian
>>
>> 2017-01-19 1:43 GMT+01:00 Raman Gupta <ro...@gmail.com>:
>>>
>>> Thank you for your reply.
>>>
>>> If I were to use a keyed stream with a count-based window of 2, would
>>> Flink keep the last state persistently until the next state is
>>> received? Would this be another way of having Flink keep this
>>> information persistently without having to implement it manually?
>>>
>>> Thanks,
>>> Raman
>>>
>>> On 18/01/17 11:22 AM, Fabian Hueske wrote:
>>> > Hi Raman,
>>> >
>>> > I would approach this issues as follows.
>>> >
>>> > You key the input stream on the sourceId and apply a stateful
>>> > FlatMapFunction.
>>> > The FlatMapFunction has a key-partioned state and stores for each key
>>> > (sourceId) the latest event as state.
>>> > When a new event arrives, you can compute the time spend in the last
>>> > state by looking up the event from the state and the latest received
>>> > event.
>>> > Then you put the new event in the state.
>>> >
>>> > This solution works well if you have a finite number of sources or if
>>> > you have an terminal event that signals that no more events will
>>> > arrive for a key.
>>> > Otherwise, the number of events stored in the state will grow
>>> > infinitely and eventually become a problem.
>>> >
>>> > If the  number of sources increases, you need to evict data at some
>>> > point in time. A ProcessFunction can help here, because you can
>>> > register a timer which
>>> > you can use to evict up old state.
>>> >
>>> > Hope this helps,
>>> > Fabian
>>> >
>>> > 2017-01-18 15:39 GMT+01:00 Raman Gupta <rocketraman@gmail.com
>>> > <ma...@gmail.com>>:
>>> >
>>> >     I am investigating Flink. I am considering a relatively simple use
>>> >     case -- I want to ingest streams of events that are essentially
>>> >     timestamped state changes. These events may look something like:
>>> >
>>> >     {
>>> >       sourceId: 111,
>>> >       state: OPEN,
>>> >       timestamp: <date/time>
>>> >     }
>>> >
>>> >     I want to apply various processing to these state change events,
>>> > the
>>> >     output of which can be used for analytics. For example:
>>> >
>>> >     1. average time spent in state, by state
>>> >     2. sources with longest (or shortest) time spent in OPEN state
>>> >
>>> >     The time spent in each state may be days or even weeks.
>>> >
>>> >     All the examples I have seen of similar logic involve windows on
>>> > the
>>> >     order of 15 minutes. Since time spent in each state may far exceed
>>> >     these window sizes, I'm wondering what the best approach will be.
>>> >
>>> >     One thought from reading the docs is to use `every` to operate on
>>> > the
>>> >     entire stream. But it seems like this will take longer and longer
>>> > to
>>> >     run as the event stream grows, so this is not an ideal solution. Or
>>> >     does Flink apply some clever optimizations to avoid the potential
>>> >     performance issue?
>>> >
>>> >     Another thought was to split the event stream into multiple streams
>>> > by
>>> >     source, each of which will have a small (and limited) amount of
>>> > data.
>>> >     This will make processing each stream simpler, but since there can
>>> > be
>>> >     thousands of sources, it will result in a lot of streams to handle
>>> > and
>>> >     persist (probably in Kafka). This does not seem ideal either.
>>> >
>>> >     It seems like this should be simple, but I'm struggling with
>>> >     understanding how to solve it elegantly.
>>> >
>>> >     Regards,
>>> >     Raman
>>> >
>>> >
>>
>>
>>
>

Re: Operational concerns with state (was Re: Window limitations on groupBy)

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Raman,

Checkpoints are used to recover from task or process failures and usually
automatically taken at periodic intervals if configured correctly.
Checkpoints are usually removed when a more recent checkpoint is completed
(the exact policy can be configured).

Savepoints are used to restart a job that was previously shutdown, to
migrate a job to another cluster (e.g., when upgrading Flink), updating the
job itself etc. So more for planned maintenance.
Nonetheless they can also be used for more coarse-grained fault tolerance
and it is a common practice to periodically trigger a savepoint.

These blog posts might be helpful to understand the potential of savepoints
[1] [2].

Best, Fabian

[1] http://data-artisans.com/turning-back-time-savepoints/
[2] http://data-artisans.com/savepoints-part-2-updating-applications/

2017-01-19 19:02 GMT+01:00 Raman Gupta <ro...@gmail.com>:

> I was able to get it working well with the original approach you
> described. Thanks! Note that the documentation on how to do this with the
> Java API is... sparse, to say the least. I was able to look at the
> implementation of the scala flatMapWithState function as a starting point.
>
> Now I'm trying to understand all the operational concerns related to the
> stored state. My checkpoints are in rocksdb configured via the job
> definition.
>
> It seems that the checkpointed state of the streaming job is lost when I
> stop and restart flink normally, or Flink terminates abnormally and is
> restarted. I was able to take an explicit savepoint and then restart the
> job with it.
>
> Is the correct approach as of now to take savepoints periodically via
> cron, and use those to re-run jobs in case of flink failure or restart?
>
> Regards,
> Raman
>
> On 19/01/17 05:43 AM, Fabian Hueske wrote:
>
> Hi Raman,
>
> I think you would need a sliding count window of size 2 with slide 1.
> This is basically a GlobalWindow with a special trigger.
>
> However, you would need to modify the custom trigger to be able to
> - identify a terminal event (if there is such a thing) or to
> - close the window after a certain period of inactivity to clean up the
> state.
>
> Best, Fabian
>
> 2017-01-19 1:43 GMT+01:00 Raman Gupta <ro...@gmail.com>:
>
>> Thank you for your reply.
>>
>> If I were to use a keyed stream with a count-based window of 2, would
>> Flink keep the last state persistently until the next state is
>> received? Would this be another way of having Flink keep this
>> information persistently without having to implement it manually?
>>
>> Thanks,
>> Raman
>>
>> On 18/01/17 11:22 AM, Fabian Hueske wrote:
>> > Hi Raman,
>> >
>> > I would approach this issues as follows.
>> >
>> > You key the input stream on the sourceId and apply a stateful
>> > FlatMapFunction.
>> > The FlatMapFunction has a key-partioned state and stores for each key
>> > (sourceId) the latest event as state.
>> > When a new event arrives, you can compute the time spend in the last
>> > state by looking up the event from the state and the latest received
>> > event.
>> > Then you put the new event in the state.
>> >
>> > This solution works well if you have a finite number of sources or if
>> > you have an terminal event that signals that no more events will
>> > arrive for a key.
>> > Otherwise, the number of events stored in the state will grow
>> > infinitely and eventually become a problem.
>> >
>> > If the  number of sources increases, you need to evict data at some
>> > point in time. A ProcessFunction can help here, because you can
>> > register a timer which
>> > you can use to evict up old state.
>> >
>> > Hope this helps,
>> > Fabian
>> >
>> > 2017-01-18 15:39 GMT+01:00 Raman Gupta <rocketraman@gmail.com
>> > <ma...@gmail.com>>:
>> >
>> >     I am investigating Flink. I am considering a relatively simple use
>> >     case -- I want to ingest streams of events that are essentially
>> >     timestamped state changes. These events may look something like:
>> >
>> >     {
>> >       sourceId: 111,
>> >       state: OPEN,
>> >       timestamp: <date/time>
>> >     }
>> >
>> >     I want to apply various processing to these state change events, the
>> >     output of which can be used for analytics. For example:
>> >
>> >     1. average time spent in state, by state
>> >     2. sources with longest (or shortest) time spent in OPEN state
>> >
>> >     The time spent in each state may be days or even weeks.
>> >
>> >     All the examples I have seen of similar logic involve windows on the
>> >     order of 15 minutes. Since time spent in each state may far exceed
>> >     these window sizes, I'm wondering what the best approach will be.
>> >
>> >     One thought from reading the docs is to use `every` to operate on
>> the
>> >     entire stream. But it seems like this will take longer and longer to
>> >     run as the event stream grows, so this is not an ideal solution. Or
>> >     does Flink apply some clever optimizations to avoid the potential
>> >     performance issue?
>> >
>> >     Another thought was to split the event stream into multiple streams
>> by
>> >     source, each of which will have a small (and limited) amount of
>> data.
>> >     This will make processing each stream simpler, but since there can
>> be
>> >     thousands of sources, it will result in a lot of streams to handle
>> and
>> >     persist (probably in Kafka). This does not seem ideal either.
>> >
>> >     It seems like this should be simple, but I'm struggling with
>> >     understanding how to solve it elegantly.
>> >
>> >     Regards,
>> >     Raman
>> >
>> >
>>
>
>
>

Operational concerns with state (was Re: Window limitations on groupBy)

Posted by Raman Gupta <ro...@gmail.com>.
I was able to get it working well with the original approach you
described. Thanks! Note that the documentation on how to do this with
the Java API is... sparse, to say the least. I was able to look at the
implementation of the scala flatMapWithState function as a starting point.

Now I'm trying to understand all the operational concerns related to
the stored state. My checkpoints are in rocksdb configured via the job
definition.

It seems that the checkpointed state of the streaming job is lost when
I stop and restart flink normally, or Flink terminates abnormally and
is restarted. I was able to take an explicit savepoint and then
restart the job with it.

Is the correct approach as of now to take savepoints periodically via
cron, and use those to re-run jobs in case of flink failure or restart?

Regards,
Raman

On 19/01/17 05:43 AM, Fabian Hueske wrote:
> Hi Raman,
>
> I think you would need a sliding count window of size 2 with slide 1.
> This is basically a GlobalWindow with a special trigger.
>
> However, you would need to modify the custom trigger to be able to
> - identify a terminal event (if there is such a thing) or to
> - close the window after a certain period of inactivity to clean up
> the state.
>
> Best, Fabian
>
> 2017-01-19 1:43 GMT+01:00 Raman Gupta <rocketraman@gmail.com
> <ma...@gmail.com>>:
>
>     Thank you for your reply.
>
>     If I were to use a keyed stream with a count-based window of 2,
>     would
>     Flink keep the last state persistently until the next state is
>     received? Would this be another way of having Flink keep this
>     information persistently without having to implement it manually?
>
>     Thanks,
>     Raman
>
>     On 18/01/17 11:22 AM, Fabian Hueske wrote:
>     > Hi Raman,
>     >
>     > I would approach this issues as follows.
>     >
>     > You key the input stream on the sourceId and apply a stateful
>     > FlatMapFunction.
>     > The FlatMapFunction has a key-partioned state and stores for
>     each key
>     > (sourceId) the latest event as state.
>     > When a new event arrives, you can compute the time spend in
>     the last
>     > state by looking up the event from the state and the latest
>     received
>     > event.
>     > Then you put the new event in the state.
>     >
>     > This solution works well if you have a finite number of
>     sources or if
>     > you have an terminal event that signals that no more events will
>     > arrive for a key.
>     > Otherwise, the number of events stored in the state will grow
>     > infinitely and eventually become a problem.
>     >
>     > If the  number of sources increases, you need to evict data at
>     some
>     > point in time. A ProcessFunction can help here, because you can
>     > register a timer which
>     > you can use to evict up old state.
>     >
>     > Hope this helps,
>     > Fabian
>     >
>     > 2017-01-18 15:39 GMT+01:00 Raman Gupta <rocketraman@gmail.com
>     <ma...@gmail.com>
>     > <mailto:rocketraman@gmail.com <ma...@gmail.com>>>:
>     >
>     >     I am investigating Flink. I am considering a relatively
>     simple use
>     >     case -- I want to ingest streams of events that are
>     essentially
>     >     timestamped state changes. These events may look something
>     like:
>     >
>     >     {
>     >       sourceId: 111,
>     >       state: OPEN,
>     >       timestamp: <date/time>
>     >     }
>     >
>     >     I want to apply various processing to these state change
>     events, the
>     >     output of which can be used for analytics. For example:
>     >
>     >     1. average time spent in state, by state
>     >     2. sources with longest (or shortest) time spent in OPEN state
>     >
>     >     The time spent in each state may be days or even weeks.
>     >
>     >     All the examples I have seen of similar logic involve
>     windows on the
>     >     order of 15 minutes. Since time spent in each state may
>     far exceed
>     >     these window sizes, I'm wondering what the best approach
>     will be.
>     >
>     >     One thought from reading the docs is to use `every` to
>     operate on the
>     >     entire stream. But it seems like this will take longer and
>     longer to
>     >     run as the event stream grows, so this is not an ideal
>     solution. Or
>     >     does Flink apply some clever optimizations to avoid the
>     potential
>     >     performance issue?
>     >
>     >     Another thought was to split the event stream into
>     multiple streams by
>     >     source, each of which will have a small (and limited)
>     amount of data.
>     >     This will make processing each stream simpler, but since
>     there can be
>     >     thousands of sources, it will result in a lot of streams
>     to handle and
>     >     persist (probably in Kafka). This does not seem ideal either.
>     >
>     >     It seems like this should be simple, but I'm struggling with
>     >     understanding how to solve it elegantly.
>     >
>     >     Regards,
>     >     Raman
>     >
>     >
>
>


Re: Window limitations on groupBy

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Raman,

I think you would need a sliding count window of size 2 with slide 1.
This is basically a GlobalWindow with a special trigger.

However, you would need to modify the custom trigger to be able to
- identify a terminal event (if there is such a thing) or to
- close the window after a certain period of inactivity to clean up the
state.

Best, Fabian

2017-01-19 1:43 GMT+01:00 Raman Gupta <ro...@gmail.com>:

> Thank you for your reply.
>
> If I were to use a keyed stream with a count-based window of 2, would
> Flink keep the last state persistently until the next state is
> received? Would this be another way of having Flink keep this
> information persistently without having to implement it manually?
>
> Thanks,
> Raman
>
> On 18/01/17 11:22 AM, Fabian Hueske wrote:
> > Hi Raman,
> >
> > I would approach this issues as follows.
> >
> > You key the input stream on the sourceId and apply a stateful
> > FlatMapFunction.
> > The FlatMapFunction has a key-partioned state and stores for each key
> > (sourceId) the latest event as state.
> > When a new event arrives, you can compute the time spend in the last
> > state by looking up the event from the state and the latest received
> > event.
> > Then you put the new event in the state.
> >
> > This solution works well if you have a finite number of sources or if
> > you have an terminal event that signals that no more events will
> > arrive for a key.
> > Otherwise, the number of events stored in the state will grow
> > infinitely and eventually become a problem.
> >
> > If the  number of sources increases, you need to evict data at some
> > point in time. A ProcessFunction can help here, because you can
> > register a timer which
> > you can use to evict up old state.
> >
> > Hope this helps,
> > Fabian
> >
> > 2017-01-18 15:39 GMT+01:00 Raman Gupta <rocketraman@gmail.com
> > <ma...@gmail.com>>:
> >
> >     I am investigating Flink. I am considering a relatively simple use
> >     case -- I want to ingest streams of events that are essentially
> >     timestamped state changes. These events may look something like:
> >
> >     {
> >       sourceId: 111,
> >       state: OPEN,
> >       timestamp: <date/time>
> >     }
> >
> >     I want to apply various processing to these state change events, the
> >     output of which can be used for analytics. For example:
> >
> >     1. average time spent in state, by state
> >     2. sources with longest (or shortest) time spent in OPEN state
> >
> >     The time spent in each state may be days or even weeks.
> >
> >     All the examples I have seen of similar logic involve windows on the
> >     order of 15 minutes. Since time spent in each state may far exceed
> >     these window sizes, I'm wondering what the best approach will be.
> >
> >     One thought from reading the docs is to use `every` to operate on the
> >     entire stream. But it seems like this will take longer and longer to
> >     run as the event stream grows, so this is not an ideal solution. Or
> >     does Flink apply some clever optimizations to avoid the potential
> >     performance issue?
> >
> >     Another thought was to split the event stream into multiple streams
> by
> >     source, each of which will have a small (and limited) amount of data.
> >     This will make processing each stream simpler, but since there can be
> >     thousands of sources, it will result in a lot of streams to handle
> and
> >     persist (probably in Kafka). This does not seem ideal either.
> >
> >     It seems like this should be simple, but I'm struggling with
> >     understanding how to solve it elegantly.
> >
> >     Regards,
> >     Raman
> >
> >
>

Re: Window limitations on groupBy

Posted by Raman Gupta <ro...@gmail.com>.
Thank you for your reply.

If I were to use a keyed stream with a count-based window of 2, would
Flink keep the last state persistently until the next state is
received? Would this be another way of having Flink keep this
information persistently without having to implement it manually?

Thanks,
Raman

On 18/01/17 11:22 AM, Fabian Hueske wrote:
> Hi Raman,
> 
> I would approach this issues as follows.
> 
> You key the input stream on the sourceId and apply a stateful
> FlatMapFunction.
> The FlatMapFunction has a key-partioned state and stores for each key
> (sourceId) the latest event as state.
> When a new event arrives, you can compute the time spend in the last
> state by looking up the event from the state and the latest received
> event.
> Then you put the new event in the state.
> 
> This solution works well if you have a finite number of sources or if
> you have an terminal event that signals that no more events will
> arrive for a key.
> Otherwise, the number of events stored in the state will grow
> infinitely and eventually become a problem.
> 
> If the  number of sources increases, you need to evict data at some
> point in time. A ProcessFunction can help here, because you can
> register a timer which
> you can use to evict up old state.
> 
> Hope this helps,
> Fabian
> 
> 2017-01-18 15:39 GMT+01:00 Raman Gupta <rocketraman@gmail.com
> <ma...@gmail.com>>:
> 
>     I am investigating Flink. I am considering a relatively simple use
>     case -- I want to ingest streams of events that are essentially
>     timestamped state changes. These events may look something like:
> 
>     {
>       sourceId: 111,
>       state: OPEN,
>       timestamp: <date/time>
>     }
> 
>     I want to apply various processing to these state change events, the
>     output of which can be used for analytics. For example:
> 
>     1. average time spent in state, by state
>     2. sources with longest (or shortest) time spent in OPEN state
> 
>     The time spent in each state may be days or even weeks.
> 
>     All the examples I have seen of similar logic involve windows on the
>     order of 15 minutes. Since time spent in each state may far exceed
>     these window sizes, I'm wondering what the best approach will be.
> 
>     One thought from reading the docs is to use `every` to operate on the
>     entire stream. But it seems like this will take longer and longer to
>     run as the event stream grows, so this is not an ideal solution. Or
>     does Flink apply some clever optimizations to avoid the potential
>     performance issue?
> 
>     Another thought was to split the event stream into multiple streams by
>     source, each of which will have a small (and limited) amount of data.
>     This will make processing each stream simpler, but since there can be
>     thousands of sources, it will result in a lot of streams to handle and
>     persist (probably in Kafka). This does not seem ideal either.
> 
>     It seems like this should be simple, but I'm struggling with
>     understanding how to solve it elegantly.
> 
>     Regards,
>     Raman
> 
> 

Re: Window limitations on groupBy

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Raman,

I would approach this issues as follows.

You key the input stream on the sourceId and apply a stateful
FlatMapFunction.
The FlatMapFunction has a key-partioned state and stores for each key
(sourceId) the latest event as state.
When a new event arrives, you can compute the time spend in the last state
by looking up the event from the state and the latest received event.
Then you put the new event in the state.

This solution works well if you have a finite number of sources or if you
have an terminal event that signals that no more events will arrive for a
key.
Otherwise, the number of events stored in the state will grow infinitely
and eventually become a problem.

If the  number of sources increases, you need to evict data at some point
in time. A ProcessFunction can help here, because you can register a timer
which
you can use to evict up old state.

Hope this helps,
Fabian

2017-01-18 15:39 GMT+01:00 Raman Gupta <ro...@gmail.com>:

> I am investigating Flink. I am considering a relatively simple use
> case -- I want to ingest streams of events that are essentially
> timestamped state changes. These events may look something like:
>
> {
>   sourceId: 111,
>   state: OPEN,
>   timestamp: <date/time>
> }
>
> I want to apply various processing to these state change events, the
> output of which can be used for analytics. For example:
>
> 1. average time spent in state, by state
> 2. sources with longest (or shortest) time spent in OPEN state
>
> The time spent in each state may be days or even weeks.
>
> All the examples I have seen of similar logic involve windows on the
> order of 15 minutes. Since time spent in each state may far exceed
> these window sizes, I'm wondering what the best approach will be.
>
> One thought from reading the docs is to use `every` to operate on the
> entire stream. But it seems like this will take longer and longer to
> run as the event stream grows, so this is not an ideal solution. Or
> does Flink apply some clever optimizations to avoid the potential
> performance issue?
>
> Another thought was to split the event stream into multiple streams by
> source, each of which will have a small (and limited) amount of data.
> This will make processing each stream simpler, but since there can be
> thousands of sources, it will result in a lot of streams to handle and
> persist (probably in Kafka). This does not seem ideal either.
>
> It seems like this should be simple, but I'm struggling with
> understanding how to solve it elegantly.
>
> Regards,
> Raman
>
>