You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mike Urbach <mi...@gmail.com> on 2018/05/24 05:46:25 UTC

Efficient Stateful Processing with Time-Series Data and Enrichments

Hi,

I have a two-part question related to processing and storing large amounts
of time-series data. The first part is related to the preferred way to keep
state on the time-series data in an efficient way, and the second part is
about how to further enrich the processed data and feed it back into the
state.

For the sake of discussion, let's say that I am tracking tens to hundreds
of millions of IoT devices. This could grow but that's what I'm looking at
right now. I will receive an initial event from each device, as well as an
unknown number of subsequent events. I will need to aggregate together all
the events related to one device for some period of time after the initial
event, say 1 hour, at which point I can discard the state. After that, I
will never hear from that device again. (I'm not actually working with IoT
devices, but that is the gist. At any given point in time, I will have
millions of active keys, and as some keys expire new keys are added).

The output of my application should contain the full state for a given
device, and a new output should be generated every time a new event comes
in. This application must be fault tolerant. I am currently checkpointing
my state using the RocksDB state backend.

Part 1 of my question is how best to manage this state. This sounds like an
excellent use case for State TTL (https://cwiki.apache.org/
confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively). Since
this is still a pending feature under active discussion, I did some reading
about how others have dealt with similar use-cases. What I gleaned boils
down to this: naively storing everything in one large MapState keyed by
device ID, and using Triggers to clear the state 1 hour after the initial
event will lead to far to many Triggers to be efficient.

An alternate approach is to bucket my devices into a far smaller amount of
keys (not in the millions, maybe thousands), and maintain a MapState for
each bucket. I can fire a Trigger every minute for every bucket, and
iterate over the MapState to clear any state that has past its TTL.

A similar, alternative approach is to use tumbling Windows to achieve the
same effect. Every incoming event has a copy of the timestamp of the
initial event for that device (think of it as when the device came online),
so I can use that for event time, and let the watermarks lag by 1 hour. The
devices are bucketed into some fixed amount of keys like above, so I will
have a Window for each bucket, for each time slice. The Window has a
Trigger that eagerly fires and purges each element, and a
ProcessWindowFunction updates a MapState using per-window state, so that
when a Window expires I can clear the state. I am currently using this
approach, since it uses Flink's own Windowing and per-window state to clear
old data, rather than manually doing it with Triggers.

Other than waiting for the State TTL feature, is there a more efficient
approach to maintain the aggregate state of all events related to one
device, and output this every time a new event arrives?

Part 2 of my question relates to how I can enrich the state I have
accumulated before generating outputs. I have some set of enrichments I'd
like to do using AsyncFunctions to call out to external services. The issue
is some enrichments require data that may never be present on any one
event; I need to work with the stream of aggregated data described above to
be able to make some of those calls. Furthermore, some enrichments might
need the data added by other enrichments. I would like to feed the enriched
data back into the state.

This initially sounded like a perfect use case for an IterativeStream,
until I tried to use it and realized the only way to enable checkpointing
was to force it using a feature that is deprecated in Flink 1.4.2. Is that
approach a dead end? If checkpoints will never be supported for
IterativeStream, I don't want to explore this route, but it would be nice
if checkpointed IterativeStreams are on the roadmap, or at least a
possibility.

Now I'm kind of stumped. The only way I can think of aggregating together
all the state *before* applying enrichments, and feeding the enriched data
back into that state *after* the enrichments is to sink the enriched data
to Kafka or something, and then create a source that reads it back and
feeds into the operator that keeps the state. That works, but I'd prefer to
keep all the data flowing within the Flink application if possible. Are
there other approaches to creating feedback loops that play well with fault
tolerance and checkpoints?

I appreciate any suggestions related to the two points above.

Thanks,

Mike Urbach


-- 
Mike Urbach

Re: Efficient Stateful Processing with Time-Series Data and Enrichments

Posted by Mike Urbach <mi...@gmail.com>.
Hi Sihua,

I will test keying by device ID. I was trying to implement this suggestion:
https://stackoverflow.com/a/49395606, but I guess that may be unnecessary
in my case.

Thanks,

Mike

On Wed, May 23, 2018 at 11:30 PM, sihua zhou <su...@163.com> wrote:

> Hi Mike,
> if I'm not misunderstand, you are doing aggregation for every device on
> the stream. You mentioned that, you want to use the MapState to store the
> state for each device ID? this is a bit confusing to me, I think what you
> need maybe a ValueState. In flink, every keyed state(Value, MapState,...so
> on) is already scoped to the key that you keyed. For example,
> *source.keyBy(deviceId).process(processFunction);*
> if you keyBy the source by deviceId, then in processFunction every keyed
> state is scoped to the deviceID internally, you don't need to use the
> MapState<DeviceID, ?> to maintance the device state yourself.
>
> Concerning to the TTL question. I think the tumbling windows & the
> per-window state is enough for you, than that is a better way to go
> currently.
>
> Best,
> Sihua
> On 05/24/2018 13:46,Mike Urbach<mi...@gmail.com>
> <mi...@gmail.com> wrote:
>
> Hi,
>
> I have a two-part question related to processing and storing large amounts
> of time-series data. The first part is related to the preferred way to keep
> state on the time-series data in an efficient way, and the second part is
> about how to further enrich the processed data and feed it back into the
> state.
>
> For the sake of discussion, let's say that I am tracking tens to hundreds
> of millions of IoT devices. This could grow but that's what I'm looking at
> right now. I will receive an initial event from each device, as well as an
> unknown number of subsequent events. I will need to aggregate together all
> the events related to one device for some period of time after the initial
> event, say 1 hour, at which point I can discard the state. After that, I
> will never hear from that device again. (I'm not actually working with IoT
> devices, but that is the gist. At any given point in time, I will have
> millions of active keys, and as some keys expire new keys are added).
>
> The output of my application should contain the full state for a given
> device, and a new output should be generated every time a new event comes
> in. This application must be fault tolerant. I am currently checkpointing
> my state using the RocksDB state backend.
>
> Part 1 of my question is how best to manage this state. This sounds like
> an excellent use case for State TTL (https://cwiki.apache.org/conf
> luence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively). Since
> this is still a pending feature under active discussion, I did some reading
> about how others have dealt with similar use-cases. What I gleaned boils
> down to this: naively storing everything in one large MapState keyed by
> device ID, and using Triggers to clear the state 1 hour after the initial
> event will lead to far to many Triggers to be efficient.
>
> An alternate approach is to bucket my devices into a far smaller amount of
> keys (not in the millions, maybe thousands), and maintain a MapState for
> each bucket. I can fire a Trigger every minute for every bucket, and
> iterate over the MapState to clear any state that has past its TTL.
>
> A similar, alternative approach is to use tumbling Windows to achieve the
> same effect. Every incoming event has a copy of the timestamp of the
> initial event for that device (think of it as when the device came online),
> so I can use that for event time, and let the watermarks lag by 1 hour. The
> devices are bucketed into some fixed amount of keys like above, so I will
> have a Window for each bucket, for each time slice. The Window has a
> Trigger that eagerly fires and purges each element, and a
> ProcessWindowFunction updates a MapState using per-window state, so that
> when a Window expires I can clear the state. I am currently using this
> approach, since it uses Flink's own Windowing and per-window state to clear
> old data, rather than manually doing it with Triggers.
>
> Other than waiting for the State TTL feature, is there a more efficient
> approach to maintain the aggregate state of all events related to one
> device, and output this every time a new event arrives?
>
> Part 2 of my question relates to how I can enrich the state I have
> accumulated before generating outputs. I have some set of enrichments I'd
> like to do using AsyncFunctions to call out to external services. The issue
> is some enrichments require data that may never be present on any one
> event; I need to work with the stream of aggregated data described above to
> be able to make some of those calls. Furthermore, some enrichments might
> need the data added by other enrichments. I would like to feed the enriched
> data back into the state.
>
> This initially sounded like a perfect use case for an IterativeStream,
> until I tried to use it and realized the only way to enable checkpointing
> was to force it using a feature that is deprecated in Flink 1.4.2. Is that
> approach a dead end? If checkpoints will never be supported for
> IterativeStream, I don't want to explore this route, but it would be nice
> if checkpointed IterativeStreams are on the roadmap, or at least a
> possibility.
>
> Now I'm kind of stumped. The only way I can think of aggregating together
> all the state *before* applying enrichments, and feeding the enriched data
> back into that state *after* the enrichments is to sink the enriched data
> to Kafka or something, and then create a source that reads it back and
> feeds into the operator that keeps the state. That works, but I'd prefer to
> keep all the data flowing within the Flink application if possible. Are
> there other approaches to creating feedback loops that play well with fault
> tolerance and checkpoints?
>
> I appreciate any suggestions related to the two points above.
>
> Thanks,
>
> Mike Urbach
>
>
> --
> Mike Urbach
>
>


-- 
Mike Urbach

Re:Efficient Stateful Processing with Time-Series Data and Enrichments

Posted by sihua zhou <su...@163.com>.
Hi Mike,
if I'm not misunderstand, you are doing aggregation for every device on the stream. You mentioned that, you want to use the MapState to store the state for each device ID? this is a bit confusing to me, I think what you need maybe a ValueState. In flink, every keyed state(Value, MapState,...so on) is already scoped to the key that you keyed. For example,
source.keyBy(deviceId).process(processFunction);
if you keyBy the source by deviceId, then in processFunction every keyed state is scoped to the deviceID internally, you don't need to use the MapState<DeviceID, ?> to maintance the device state yourself.


Concerning to the TTL question. I think the tumbling windows & the per-window state is enough for you, than that is a better way to go currently.


Best,
Sihua
On 05/24/2018 13:46,Mike Urbach<mi...@gmail.com> wrote:
Hi,


I have a two-part question related to processing and storing large amounts of time-series data. The first part is related to the preferred way to keep state on the time-series data in an efficient way, and the second part is about how to further enrich the processed data and feed it back into the state.


For the sake of discussion, let's say that I am tracking tens to hundreds of millions of IoT devices. This could grow but that's what I'm looking at right now. I will receive an initial event from each device, as well as an unknown number of subsequent events. I will need to aggregate together all the events related to one device for some period of time after the initial event, say 1 hour, at which point I can discard the state. After that, I will never hear from that device again. (I'm not actually working with IoT devices, but that is the gist. At any given point in time, I will have millions of active keys, and as some keys expire new keys are added).


The output of my application should contain the full state for a given device, and a new output should be generated every time a new event comes in. This application must be fault tolerant. I am currently checkpointing my state using the RocksDB state backend.


Part 1 of my question is how best to manage this state. This sounds like an excellent use case for State TTL (https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively). Since this is still a pending feature under active discussion, I did some reading about how others have dealt with similar use-cases. What I gleaned boils down to this: naively storing everything in one large MapState keyed by device ID, and using Triggers to clear the state 1 hour after the initial event will lead to far to many Triggers to be efficient. 


An alternate approach is to bucket my devices into a far smaller amount of keys (not in the millions, maybe thousands), and maintain a MapState for each bucket. I can fire a Trigger every minute for every bucket, and iterate over the MapState to clear any state that has past its TTL.


A similar, alternative approach is to use tumbling Windows to achieve the same effect. Every incoming event has a copy of the timestamp of the initial event for that device (think of it as when the device came online), so I can use that for event time, and let the watermarks lag by 1 hour. The devices are bucketed into some fixed amount of keys like above, so I will have a Window for each bucket, for each time slice. The Window has a Trigger that eagerly fires and purges each element, and a ProcessWindowFunction updates a MapState using per-window state, so that when a Window expires I can clear the state. I am currently using this approach, since it uses Flink's own Windowing and per-window state to clear old data, rather than manually doing it with Triggers.


Other than waiting for the State TTL feature, is there a more efficient approach to maintain the aggregate state of all events related to one device, and output this every time a new event arrives?


Part 2 of my question relates to how I can enrich the state I have accumulated before generating outputs. I have some set of enrichments I'd like to do using AsyncFunctions to call out to external services. The issue is some enrichments require data that may never be present on any one event; I need to work with the stream of aggregated data described above to be able to make some of those calls. Furthermore, some enrichments might need the data added by other enrichments. I would like to feed the enriched data back into the state.


This initially sounded like a perfect use case for an IterativeStream, until I tried to use it and realized the only way to enable checkpointing was to force it using a feature that is deprecated in Flink 1.4.2. Is that approach a dead end? If checkpoints will never be supported for IterativeStream, I don't want to explore this route, but it would be nice if checkpointed IterativeStreams are on the roadmap, or at least a possibility.


Now I'm kind of stumped. The only way I can think of aggregating together all the state *before* applying enrichments, and feeding the enriched data back into that state *after* the enrichments is to sink the enriched data to Kafka or something, and then create a source that reads it back and feeds into the operator that keeps the state. That works, but I'd prefer to keep all the data flowing within the Flink application if possible. Are there other approaches to creating feedback loops that play well with fault tolerance and checkpoints?


I appreciate any suggestions related to the two points above.


Thanks,


Mike Urbach




--

Mike Urbach