You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Akshay Aggarwal <ak...@flipkart.com> on 2020/02/14 17:05:10 UTC

Persisting inactive state outside Flink

Hi,

We have a use case where we have to persist some state information about a
device forever. Each new event will fetch the keyed state and update it.
And this has to be applied in-order of events.

The problem is that the number of devices (keys) will keep growing
infinitely. Usually a device comes online, stays active for a while
(generates new events) and then goes into dormant mode. Is there a way we
can persist the state outside of Flink (say HBase) when the device goes
dormant and later fetch when it's activated?

I know we can do this in process function using timers. But here I'll have
to make a synchronous call to the external store every time a new device
comes live, or when an active device goes dormant, which will stall the
task and become a scalability bottleneck. Using AsyncIO also doesn't seem
to be an option.

Is there a way to achieve this without hacking into Flink code?

Thanks,
Akshay Aggarwal

-- 



*-----------------------------------------------------------------------------------------*


*This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they are 
addressed. If you have received this email in error, please notify the 
system manager. This message contains confidential information and is 
intended only for the individual named. If you are not the named addressee, 
you should not disseminate, distribute or copy this email. Please notify 
the sender immediately by email if you have received this email by mistake 
and delete this email from your system. If you are not the intended 
recipient, you are notified that disclosing, copying, distributing or 
taking any action in reliance on the contents of this information is 
strictly prohibited.*****

 ****

*Any views or opinions presented in this 
email are solely those of the author and do not necessarily represent those 
of the organization. Any information on shares, debentures or similar 
instruments, recommended product pricing, valuations and the like are for 
information purposes only. It is not meant to be an instruction or 
recommendation, as the case may be, to buy or to sell securities, products, 
services nor an offer to buy or sell securities, products or services 
unless specifically stated to be so on behalf of the Flipkart group. 
Employees of the Flipkart group of companies are expressly required not to 
make defamatory statements and not to infringe or authorise any 
infringement of copyright or any other legal right by email communications. 
Any such communication is contrary to organizational policy and outside the 
scope of the employment of the individual concerned. The organization will 
not accept any liability in respect of such communication, and the employee 
responsible will be personally liable for any damages or other liability 
arising.*****

 ****

*Our organization accepts no liability for the 
content of this email, or for the consequences of any actions taken on the 
basis of the information *provided,* unless that information is 
subsequently confirmed in writing. If you are not the intended recipient, 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited.*


_-----------------------------------------------------------------------------------------_


Re: Persisting inactive state outside Flink

Posted by Akshay Aggarwal <ak...@flipkart.com>.
Thanks Till.

Going with your suggestion, I'll run some benchmarks to figure out how the
lookups behave with increasing number of keys, and checkpoints with
increasing state size. I'll take a decision based on the results, and maybe
reach out to you if I need more information.

Thanks a lot,
Akshay

On Tue, Feb 18, 2020 at 11:02 PM Till Rohrmann <tr...@apache.org> wrote:

> Hmm, with this size you will need an aggregated disk capacity of 11 TB
> (for the 1.2 Bn devices). If most of the entries are permanently dormant,
> then this is not ideal. On the other hand, they would occupy the same space
> on your Hbase cluster.
>
> Concerning your questions about RocksDB:
> 1. When using full checkpoints, then the "time to checkpoint" and the
> "time to recovery" will increase with the size of the state in the general
> case. Moreover, it will mostly be I/O bound wrt to your persistent storage.
> If you enable local recovery and don't suffer a machine loss, then the
> recovery should be almost instantaneous. If you activate incremental
> checkpoints, then the "time to checkpoint" depends on your access pattern.
> If the access pattern stays more or less the same, then the checkpoint time
> should stay constant. The "time to recovery" might be a bit worse compared
> to full checkpoints because you might have to download uncompacted sst
> files.
> 2. I think RocksDB's performance should slightly decrease (but I haven't
> ran the numbers). Given that you have more keys, the lookups should become
> slightly more expensive. However, I would expect that this should not
> really matter given that RocksDB uses some proper indexes. The bigger
> difference will probably make whether you are accessing data which is still
> kept in the write buffer (in memory) or whether you need to access one of
> the sst files. Also here, the more keys you have, the more sst files you
> potentially need to touch. I would recommend to run some benchmarks to see
> yourself how it behaves with your workload.
> 3. You can use Flink's state processor API [1] to access Flink state. The
> only thing you need to do is to take a savepoint of your job and then feed
> the savepoint to the state processor API in order to access it.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> Cheers,
> Till
>
> On Tue, Feb 18, 2020 at 5:57 PM Akshay Aggarwal <
> akshay.aggarwal@flipkart.com> wrote:
>
>> Thanks Till, I really appreciate your response.
>>
>> We are in fact considering RocksDB as our state backend. The scale we are
>> looking at is 1.2 Bn new devices every year, with a growth of ~30% YoY, the
>> state per device is not expected to grow beyond few 10s of KBs though. The
>> peak ingestion rates are around 100k events per second. Another
>> consideration here is that many devices will go dormant forever, and it
>> seems pointless to keep that in state.
>>
>> I have few concerns because of which I wasn't completely convinced of
>> using RocksDB (only) -
>> 1. Will the "time to checkpoint" and "time to recovery"  keep increasing
>> with the size of the state?
>> 2. Will there be a slowdown in RocksDB operations as the number of keys
>> increase over time?
>> 3. If we go to production with just RocksDB and no external state
>> persistence, is there a way for us to migrate to lazy loading if we hit
>> scale issues?
>>
>> The iterative solution with AsyncIO seems complex but feasible, it
>> certainly needs more thought to handle edge cases. Also, our use case can
>> manage an occasional glitch that comes with at-least once processing since
>> the output will be used for analytical purposes. Thanks for the suggestion.
>>
>> Cheers!
>> Akshay
>>
>>
>> On Tue, Feb 18, 2020 at 4:06 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Akshay,
>>>
>>> there is no easy out-of-the-box implementation for what you are asking.
>>>
>>> Before drafting a potential solution I wanted to ask whether using the
>>> RocksDB state backend could already solve your problem. With this state
>>> backend Flink is able to spill state data to disk. Would this work for your
>>> use case or do you expect the device data per node to grow so big that it
>>> no longer fits onto disk?
>>>
>>> If using the RocksDB state backend does not work for you and you really
>>> need to offload state data to an external storage system from where you can
>>> load it lazily it become significantly more complicated. One approach I
>>> could think of is the following: You have a primary operator (process)
>>> which is responsible for processing the incoming events and keeps the state
>>> of the non-dormant devices. Once a device becomes dormant, you could send
>>> the data to a secondary operator (offload+fetching) which uses AsyncIO to
>>> offload the state to Hbase, for example. If the process operator should
>>> encounters an event from a dormant device, it would need to ask the
>>> secondary operator to load it (via sending a fetch event downstream). The
>>> secondary operator would again use AsyncIO to load the requested data. Once
>>> it retrieves the data, you would need to send the data back to the primary
>>> operator via a feedback edge (iteration edge).
>>>
>>> The problem with this approach is that Flink does not give you
>>> exactly-once processing guarantees when using iterations at the moment. The
>>> community is working on changing this, though.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Feb 14, 2020 at 6:05 PM Akshay Aggarwal <
>>> akshay.aggarwal@flipkart.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We have a use case where we have to persist some state information
>>>> about a device forever. Each new event will fetch the keyed state and
>>>> update it. And this has to be applied in-order of events.
>>>>
>>>> The problem is that the number of devices (keys) will keep growing
>>>> infinitely. Usually a device comes online, stays active for a while
>>>> (generates new events) and then goes into dormant mode. Is there a way we
>>>> can persist the state outside of Flink (say HBase) when the device goes
>>>> dormant and later fetch when it's activated?
>>>>
>>>> I know we can do this in process function using timers. But here I'll
>>>> have to make a synchronous call to the external store every time a new
>>>> device comes live, or when an active device goes dormant, which will stall
>>>> the task and become a scalability bottleneck. Using AsyncIO also doesn't
>>>> seem to be an option.
>>>>
>>>> Is there a way to achieve this without hacking into Flink code?
>>>>
>>>> Thanks,
>>>> Akshay Aggarwal
>>>>
>>>>
>>>> *-----------------------------------------------------------------------------------------*
>>>>
>>>> *This email and any files transmitted with it are confidential and
>>>> intended solely for the use of the individual or entity to whom they are
>>>> addressed. If you have received this email in error, please notify the
>>>> system manager. This message contains confidential information and is
>>>> intended only for the individual named. If you are not the named addressee,
>>>> you should not disseminate, distribute or copy this email. Please notify
>>>> the sender immediately by email if you have received this email by mistake
>>>> and delete this email from your system. If you are not the intended
>>>> recipient, you are notified that disclosing, copying, distributing or
>>>> taking any action in reliance on the contents of this information is
>>>> strictly prohibited.*
>>>>
>>>>
>>>>
>>>> *Any views or opinions presented in this email are solely those of the
>>>> author and do not necessarily represent those of the organization. Any
>>>> information on shares, debentures or similar instruments, recommended
>>>> product pricing, valuations and the like are for information purposes only.
>>>> It is not meant to be an instruction or recommendation, as the case may be,
>>>> to buy or to sell securities, products, services nor an offer to buy or
>>>> sell securities, products or services unless specifically stated to be so
>>>> on behalf of the Flipkart group. Employees of the Flipkart group of
>>>> companies are expressly required not to make defamatory statements and not
>>>> to infringe or authorise any infringement of copyright or any other legal
>>>> right by email communications. Any such communication is contrary to
>>>> organizational policy and outside the scope of the employment of the
>>>> individual concerned. The organization will not accept any liability in
>>>> respect of such communication, and the employee responsible will be
>>>> personally liable for any damages or other liability arising.*
>>>>
>>>>
>>>>
>>>> *Our organization accepts no liability for the content of this email,
>>>> or for the consequences of any actions taken on the basis of the
>>>> information *provided,* unless that information is subsequently
>>>> confirmed in writing. If you are not the intended recipient, you are
>>>> notified that disclosing, copying, distributing or taking any action in
>>>> reliance on the contents of this information is strictly prohibited.*
>>>>
>>>>
>>>> *-----------------------------------------------------------------------------------------*
>>>>
>>>>
>>
>> *-----------------------------------------------------------------------------------------*
>>
>> *This email and any files transmitted with it are confidential and
>> intended solely for the use of the individual or entity to whom they are
>> addressed. If you have received this email in error, please notify the
>> system manager. This message contains confidential information and is
>> intended only for the individual named. If you are not the named addressee,
>> you should not disseminate, distribute or copy this email. Please notify
>> the sender immediately by email if you have received this email by mistake
>> and delete this email from your system. If you are not the intended
>> recipient, you are notified that disclosing, copying, distributing or
>> taking any action in reliance on the contents of this information is
>> strictly prohibited.*
>>
>>
>>
>> *Any views or opinions presented in this email are solely those of the
>> author and do not necessarily represent those of the organization. Any
>> information on shares, debentures or similar instruments, recommended
>> product pricing, valuations and the like are for information purposes only.
>> It is not meant to be an instruction or recommendation, as the case may be,
>> to buy or to sell securities, products, services nor an offer to buy or
>> sell securities, products or services unless specifically stated to be so
>> on behalf of the Flipkart group. Employees of the Flipkart group of
>> companies are expressly required not to make defamatory statements and not
>> to infringe or authorise any infringement of copyright or any other legal
>> right by email communications. Any such communication is contrary to
>> organizational policy and outside the scope of the employment of the
>> individual concerned. The organization will not accept any liability in
>> respect of such communication, and the employee responsible will be
>> personally liable for any damages or other liability arising.*
>>
>>
>>
>> *Our organization accepts no liability for the content of this email, or
>> for the consequences of any actions taken on the basis of the information *
>> provided,* unless that information is subsequently confirmed in writing.
>> If you are not the intended recipient, you are notified that disclosing,
>> copying, distributing or taking any action in reliance on the contents of
>> this information is strictly prohibited.*
>>
>>
>> *-----------------------------------------------------------------------------------------*
>>
>>

-- 



*-----------------------------------------------------------------------------------------*


*This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they are 
addressed. If you have received this email in error, please notify the 
system manager. This message contains confidential information and is 
intended only for the individual named. If you are not the named addressee, 
you should not disseminate, distribute or copy this email. Please notify 
the sender immediately by email if you have received this email by mistake 
and delete this email from your system. If you are not the intended 
recipient, you are notified that disclosing, copying, distributing or 
taking any action in reliance on the contents of this information is 
strictly prohibited.*****

 ****

*Any views or opinions presented in this 
email are solely those of the author and do not necessarily represent those 
of the organization. Any information on shares, debentures or similar 
instruments, recommended product pricing, valuations and the like are for 
information purposes only. It is not meant to be an instruction or 
recommendation, as the case may be, to buy or to sell securities, products, 
services nor an offer to buy or sell securities, products or services 
unless specifically stated to be so on behalf of the Flipkart group. 
Employees of the Flipkart group of companies are expressly required not to 
make defamatory statements and not to infringe or authorise any 
infringement of copyright or any other legal right by email communications. 
Any such communication is contrary to organizational policy and outside the 
scope of the employment of the individual concerned. The organization will 
not accept any liability in respect of such communication, and the employee 
responsible will be personally liable for any damages or other liability 
arising.*****

 ****

*Our organization accepts no liability for the 
content of this email, or for the consequences of any actions taken on the 
basis of the information *provided,* unless that information is 
subsequently confirmed in writing. If you are not the intended recipient, 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited.*


_-----------------------------------------------------------------------------------------_


Re: Persisting inactive state outside Flink

Posted by Till Rohrmann <tr...@apache.org>.
Hmm, with this size you will need an aggregated disk capacity of 11 TB (for
the 1.2 Bn devices). If most of the entries are permanently dormant, then
this is not ideal. On the other hand, they would occupy the same space on
your Hbase cluster.

Concerning your questions about RocksDB:
1. When using full checkpoints, then the "time to checkpoint" and the "time
to recovery" will increase with the size of the state in the general case.
Moreover, it will mostly be I/O bound wrt to your persistent storage. If
you enable local recovery and don't suffer a machine loss, then the
recovery should be almost instantaneous. If you activate incremental
checkpoints, then the "time to checkpoint" depends on your access pattern.
If the access pattern stays more or less the same, then the checkpoint time
should stay constant. The "time to recovery" might be a bit worse compared
to full checkpoints because you might have to download uncompacted sst
files.
2. I think RocksDB's performance should slightly decrease (but I haven't
ran the numbers). Given that you have more keys, the lookups should become
slightly more expensive. However, I would expect that this should not
really matter given that RocksDB uses some proper indexes. The bigger
difference will probably make whether you are accessing data which is still
kept in the write buffer (in memory) or whether you need to access one of
the sst files. Also here, the more keys you have, the more sst files you
potentially need to touch. I would recommend to run some benchmarks to see
yourself how it behaves with your workload.
3. You can use Flink's state processor API [1] to access Flink state. The
only thing you need to do is to take a savepoint of your job and then feed
the savepoint to the state processor API in order to access it.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

Cheers,
Till

On Tue, Feb 18, 2020 at 5:57 PM Akshay Aggarwal <
akshay.aggarwal@flipkart.com> wrote:

> Thanks Till, I really appreciate your response.
>
> We are in fact considering RocksDB as our state backend. The scale we are
> looking at is 1.2 Bn new devices every year, with a growth of ~30% YoY, the
> state per device is not expected to grow beyond few 10s of KBs though. The
> peak ingestion rates are around 100k events per second. Another
> consideration here is that many devices will go dormant forever, and it
> seems pointless to keep that in state.
>
> I have few concerns because of which I wasn't completely convinced of
> using RocksDB (only) -
> 1. Will the "time to checkpoint" and "time to recovery"  keep increasing
> with the size of the state?
> 2. Will there be a slowdown in RocksDB operations as the number of keys
> increase over time?
> 3. If we go to production with just RocksDB and no external state
> persistence, is there a way for us to migrate to lazy loading if we hit
> scale issues?
>
> The iterative solution with AsyncIO seems complex but feasible, it
> certainly needs more thought to handle edge cases. Also, our use case can
> manage an occasional glitch that comes with at-least once processing since
> the output will be used for analytical purposes. Thanks for the suggestion.
>
> Cheers!
> Akshay
>
>
> On Tue, Feb 18, 2020 at 4:06 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Akshay,
>>
>> there is no easy out-of-the-box implementation for what you are asking.
>>
>> Before drafting a potential solution I wanted to ask whether using the
>> RocksDB state backend could already solve your problem. With this state
>> backend Flink is able to spill state data to disk. Would this work for your
>> use case or do you expect the device data per node to grow so big that it
>> no longer fits onto disk?
>>
>> If using the RocksDB state backend does not work for you and you really
>> need to offload state data to an external storage system from where you can
>> load it lazily it become significantly more complicated. One approach I
>> could think of is the following: You have a primary operator (process)
>> which is responsible for processing the incoming events and keeps the state
>> of the non-dormant devices. Once a device becomes dormant, you could send
>> the data to a secondary operator (offload+fetching) which uses AsyncIO to
>> offload the state to Hbase, for example. If the process operator should
>> encounters an event from a dormant device, it would need to ask the
>> secondary operator to load it (via sending a fetch event downstream). The
>> secondary operator would again use AsyncIO to load the requested data. Once
>> it retrieves the data, you would need to send the data back to the primary
>> operator via a feedback edge (iteration edge).
>>
>> The problem with this approach is that Flink does not give you
>> exactly-once processing guarantees when using iterations at the moment. The
>> community is working on changing this, though.
>>
>> Cheers,
>> Till
>>
>> On Fri, Feb 14, 2020 at 6:05 PM Akshay Aggarwal <
>> akshay.aggarwal@flipkart.com> wrote:
>>
>>> Hi,
>>>
>>> We have a use case where we have to persist some state information about
>>> a device forever. Each new event will fetch the keyed state and update it.
>>> And this has to be applied in-order of events.
>>>
>>> The problem is that the number of devices (keys) will keep growing
>>> infinitely. Usually a device comes online, stays active for a while
>>> (generates new events) and then goes into dormant mode. Is there a way we
>>> can persist the state outside of Flink (say HBase) when the device goes
>>> dormant and later fetch when it's activated?
>>>
>>> I know we can do this in process function using timers. But here I'll
>>> have to make a synchronous call to the external store every time a new
>>> device comes live, or when an active device goes dormant, which will stall
>>> the task and become a scalability bottleneck. Using AsyncIO also doesn't
>>> seem to be an option.
>>>
>>> Is there a way to achieve this without hacking into Flink code?
>>>
>>> Thanks,
>>> Akshay Aggarwal
>>>
>>>
>>> *-----------------------------------------------------------------------------------------*
>>>
>>> *This email and any files transmitted with it are confidential and
>>> intended solely for the use of the individual or entity to whom they are
>>> addressed. If you have received this email in error, please notify the
>>> system manager. This message contains confidential information and is
>>> intended only for the individual named. If you are not the named addressee,
>>> you should not disseminate, distribute or copy this email. Please notify
>>> the sender immediately by email if you have received this email by mistake
>>> and delete this email from your system. If you are not the intended
>>> recipient, you are notified that disclosing, copying, distributing or
>>> taking any action in reliance on the contents of this information is
>>> strictly prohibited.*
>>>
>>>
>>>
>>> *Any views or opinions presented in this email are solely those of the
>>> author and do not necessarily represent those of the organization. Any
>>> information on shares, debentures or similar instruments, recommended
>>> product pricing, valuations and the like are for information purposes only.
>>> It is not meant to be an instruction or recommendation, as the case may be,
>>> to buy or to sell securities, products, services nor an offer to buy or
>>> sell securities, products or services unless specifically stated to be so
>>> on behalf of the Flipkart group. Employees of the Flipkart group of
>>> companies are expressly required not to make defamatory statements and not
>>> to infringe or authorise any infringement of copyright or any other legal
>>> right by email communications. Any such communication is contrary to
>>> organizational policy and outside the scope of the employment of the
>>> individual concerned. The organization will not accept any liability in
>>> respect of such communication, and the employee responsible will be
>>> personally liable for any damages or other liability arising.*
>>>
>>>
>>>
>>> *Our organization accepts no liability for the content of this email, or
>>> for the consequences of any actions taken on the basis of the information *
>>> provided,* unless that information is subsequently confirmed in
>>> writing. If you are not the intended recipient, you are notified that
>>> disclosing, copying, distributing or taking any action in reliance on the
>>> contents of this information is strictly prohibited.*
>>>
>>>
>>> *-----------------------------------------------------------------------------------------*
>>>
>>>
>
> *-----------------------------------------------------------------------------------------*
>
> *This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. If you have received this email in error, please notify the
> system manager. This message contains confidential information and is
> intended only for the individual named. If you are not the named addressee,
> you should not disseminate, distribute or copy this email. Please notify
> the sender immediately by email if you have received this email by mistake
> and delete this email from your system. If you are not the intended
> recipient, you are notified that disclosing, copying, distributing or
> taking any action in reliance on the contents of this information is
> strictly prohibited.*
>
>
>
> *Any views or opinions presented in this email are solely those of the
> author and do not necessarily represent those of the organization. Any
> information on shares, debentures or similar instruments, recommended
> product pricing, valuations and the like are for information purposes only.
> It is not meant to be an instruction or recommendation, as the case may be,
> to buy or to sell securities, products, services nor an offer to buy or
> sell securities, products or services unless specifically stated to be so
> on behalf of the Flipkart group. Employees of the Flipkart group of
> companies are expressly required not to make defamatory statements and not
> to infringe or authorise any infringement of copyright or any other legal
> right by email communications. Any such communication is contrary to
> organizational policy and outside the scope of the employment of the
> individual concerned. The organization will not accept any liability in
> respect of such communication, and the employee responsible will be
> personally liable for any damages or other liability arising.*
>
>
>
> *Our organization accepts no liability for the content of this email, or
> for the consequences of any actions taken on the basis of the information *
> provided,* unless that information is subsequently confirmed in writing.
> If you are not the intended recipient, you are notified that disclosing,
> copying, distributing or taking any action in reliance on the contents of
> this information is strictly prohibited.*
>
>
> *-----------------------------------------------------------------------------------------*
>
>

Re: Persisting inactive state outside Flink

Posted by Akshay Aggarwal <ak...@flipkart.com>.
Thanks Till, I really appreciate your response.

We are in fact considering RocksDB as our state backend. The scale we are
looking at is 1.2 Bn new devices every year, with a growth of ~30% YoY, the
state per device is not expected to grow beyond few 10s of KBs though. The
peak ingestion rates are around 100k events per second. Another
consideration here is that many devices will go dormant forever, and it
seems pointless to keep that in state.

I have few concerns because of which I wasn't completely convinced of using
RocksDB (only) -
1. Will the "time to checkpoint" and "time to recovery"  keep increasing
with the size of the state?
2. Will there be a slowdown in RocksDB operations as the number of keys
increase over time?
3. If we go to production with just RocksDB and no external state
persistence, is there a way for us to migrate to lazy loading if we hit
scale issues?

The iterative solution with AsyncIO seems complex but feasible, it
certainly needs more thought to handle edge cases. Also, our use case can
manage an occasional glitch that comes with at-least once processing since
the output will be used for analytical purposes. Thanks for the suggestion.

Cheers!
Akshay


On Tue, Feb 18, 2020 at 4:06 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Akshay,
>
> there is no easy out-of-the-box implementation for what you are asking.
>
> Before drafting a potential solution I wanted to ask whether using the
> RocksDB state backend could already solve your problem. With this state
> backend Flink is able to spill state data to disk. Would this work for your
> use case or do you expect the device data per node to grow so big that it
> no longer fits onto disk?
>
> If using the RocksDB state backend does not work for you and you really
> need to offload state data to an external storage system from where you can
> load it lazily it become significantly more complicated. One approach I
> could think of is the following: You have a primary operator (process)
> which is responsible for processing the incoming events and keeps the state
> of the non-dormant devices. Once a device becomes dormant, you could send
> the data to a secondary operator (offload+fetching) which uses AsyncIO to
> offload the state to Hbase, for example. If the process operator should
> encounters an event from a dormant device, it would need to ask the
> secondary operator to load it (via sending a fetch event downstream). The
> secondary operator would again use AsyncIO to load the requested data. Once
> it retrieves the data, you would need to send the data back to the primary
> operator via a feedback edge (iteration edge).
>
> The problem with this approach is that Flink does not give you
> exactly-once processing guarantees when using iterations at the moment. The
> community is working on changing this, though.
>
> Cheers,
> Till
>
> On Fri, Feb 14, 2020 at 6:05 PM Akshay Aggarwal <
> akshay.aggarwal@flipkart.com> wrote:
>
>> Hi,
>>
>> We have a use case where we have to persist some state information about
>> a device forever. Each new event will fetch the keyed state and update it.
>> And this has to be applied in-order of events.
>>
>> The problem is that the number of devices (keys) will keep growing
>> infinitely. Usually a device comes online, stays active for a while
>> (generates new events) and then goes into dormant mode. Is there a way we
>> can persist the state outside of Flink (say HBase) when the device goes
>> dormant and later fetch when it's activated?
>>
>> I know we can do this in process function using timers. But here I'll
>> have to make a synchronous call to the external store every time a new
>> device comes live, or when an active device goes dormant, which will stall
>> the task and become a scalability bottleneck. Using AsyncIO also doesn't
>> seem to be an option.
>>
>> Is there a way to achieve this without hacking into Flink code?
>>
>> Thanks,
>> Akshay Aggarwal
>>
>>
>> *-----------------------------------------------------------------------------------------*
>>
>> *This email and any files transmitted with it are confidential and
>> intended solely for the use of the individual or entity to whom they are
>> addressed. If you have received this email in error, please notify the
>> system manager. This message contains confidential information and is
>> intended only for the individual named. If you are not the named addressee,
>> you should not disseminate, distribute or copy this email. Please notify
>> the sender immediately by email if you have received this email by mistake
>> and delete this email from your system. If you are not the intended
>> recipient, you are notified that disclosing, copying, distributing or
>> taking any action in reliance on the contents of this information is
>> strictly prohibited.*
>>
>>
>>
>> *Any views or opinions presented in this email are solely those of the
>> author and do not necessarily represent those of the organization. Any
>> information on shares, debentures or similar instruments, recommended
>> product pricing, valuations and the like are for information purposes only.
>> It is not meant to be an instruction or recommendation, as the case may be,
>> to buy or to sell securities, products, services nor an offer to buy or
>> sell securities, products or services unless specifically stated to be so
>> on behalf of the Flipkart group. Employees of the Flipkart group of
>> companies are expressly required not to make defamatory statements and not
>> to infringe or authorise any infringement of copyright or any other legal
>> right by email communications. Any such communication is contrary to
>> organizational policy and outside the scope of the employment of the
>> individual concerned. The organization will not accept any liability in
>> respect of such communication, and the employee responsible will be
>> personally liable for any damages or other liability arising.*
>>
>>
>>
>> *Our organization accepts no liability for the content of this email, or
>> for the consequences of any actions taken on the basis of the information *
>> provided,* unless that information is subsequently confirmed in writing.
>> If you are not the intended recipient, you are notified that disclosing,
>> copying, distributing or taking any action in reliance on the contents of
>> this information is strictly prohibited.*
>>
>>
>> *-----------------------------------------------------------------------------------------*
>>
>>

-- 



*-----------------------------------------------------------------------------------------*


*This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they are 
addressed. If you have received this email in error, please notify the 
system manager. This message contains confidential information and is 
intended only for the individual named. If you are not the named addressee, 
you should not disseminate, distribute or copy this email. Please notify 
the sender immediately by email if you have received this email by mistake 
and delete this email from your system. If you are not the intended 
recipient, you are notified that disclosing, copying, distributing or 
taking any action in reliance on the contents of this information is 
strictly prohibited.*****

 ****

*Any views or opinions presented in this 
email are solely those of the author and do not necessarily represent those 
of the organization. Any information on shares, debentures or similar 
instruments, recommended product pricing, valuations and the like are for 
information purposes only. It is not meant to be an instruction or 
recommendation, as the case may be, to buy or to sell securities, products, 
services nor an offer to buy or sell securities, products or services 
unless specifically stated to be so on behalf of the Flipkart group. 
Employees of the Flipkart group of companies are expressly required not to 
make defamatory statements and not to infringe or authorise any 
infringement of copyright or any other legal right by email communications. 
Any such communication is contrary to organizational policy and outside the 
scope of the employment of the individual concerned. The organization will 
not accept any liability in respect of such communication, and the employee 
responsible will be personally liable for any damages or other liability 
arising.*****

 ****

*Our organization accepts no liability for the 
content of this email, or for the consequences of any actions taken on the 
basis of the information *provided,* unless that information is 
subsequently confirmed in writing. If you are not the intended recipient, 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited.*


_-----------------------------------------------------------------------------------------_


Re: Persisting inactive state outside Flink

Posted by Till Rohrmann <tr...@apache.org>.
Hi Akshay,

there is no easy out-of-the-box implementation for what you are asking.

Before drafting a potential solution I wanted to ask whether using the
RocksDB state backend could already solve your problem. With this state
backend Flink is able to spill state data to disk. Would this work for your
use case or do you expect the device data per node to grow so big that it
no longer fits onto disk?

If using the RocksDB state backend does not work for you and you really
need to offload state data to an external storage system from where you can
load it lazily it become significantly more complicated. One approach I
could think of is the following: You have a primary operator (process)
which is responsible for processing the incoming events and keeps the state
of the non-dormant devices. Once a device becomes dormant, you could send
the data to a secondary operator (offload+fetching) which uses AsyncIO to
offload the state to Hbase, for example. If the process operator should
encounters an event from a dormant device, it would need to ask the
secondary operator to load it (via sending a fetch event downstream). The
secondary operator would again use AsyncIO to load the requested data. Once
it retrieves the data, you would need to send the data back to the primary
operator via a feedback edge (iteration edge).

The problem with this approach is that Flink does not give you exactly-once
processing guarantees when using iterations at the moment. The community is
working on changing this, though.

Cheers,
Till

On Fri, Feb 14, 2020 at 6:05 PM Akshay Aggarwal <
akshay.aggarwal@flipkart.com> wrote:

> Hi,
>
> We have a use case where we have to persist some state information about a
> device forever. Each new event will fetch the keyed state and update it.
> And this has to be applied in-order of events.
>
> The problem is that the number of devices (keys) will keep growing
> infinitely. Usually a device comes online, stays active for a while
> (generates new events) and then goes into dormant mode. Is there a way we
> can persist the state outside of Flink (say HBase) when the device goes
> dormant and later fetch when it's activated?
>
> I know we can do this in process function using timers. But here I'll have
> to make a synchronous call to the external store every time a new device
> comes live, or when an active device goes dormant, which will stall the
> task and become a scalability bottleneck. Using AsyncIO also doesn't seem
> to be an option.
>
> Is there a way to achieve this without hacking into Flink code?
>
> Thanks,
> Akshay Aggarwal
>
>
> *-----------------------------------------------------------------------------------------*
>
> *This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. If you have received this email in error, please notify the
> system manager. This message contains confidential information and is
> intended only for the individual named. If you are not the named addressee,
> you should not disseminate, distribute or copy this email. Please notify
> the sender immediately by email if you have received this email by mistake
> and delete this email from your system. If you are not the intended
> recipient, you are notified that disclosing, copying, distributing or
> taking any action in reliance on the contents of this information is
> strictly prohibited.*
>
>
>
> *Any views or opinions presented in this email are solely those of the
> author and do not necessarily represent those of the organization. Any
> information on shares, debentures or similar instruments, recommended
> product pricing, valuations and the like are for information purposes only.
> It is not meant to be an instruction or recommendation, as the case may be,
> to buy or to sell securities, products, services nor an offer to buy or
> sell securities, products or services unless specifically stated to be so
> on behalf of the Flipkart group. Employees of the Flipkart group of
> companies are expressly required not to make defamatory statements and not
> to infringe or authorise any infringement of copyright or any other legal
> right by email communications. Any such communication is contrary to
> organizational policy and outside the scope of the employment of the
> individual concerned. The organization will not accept any liability in
> respect of such communication, and the employee responsible will be
> personally liable for any damages or other liability arising.*
>
>
>
> *Our organization accepts no liability for the content of this email, or
> for the consequences of any actions taken on the basis of the information *
> provided,* unless that information is subsequently confirmed in writing.
> If you are not the intended recipient, you are notified that disclosing,
> copying, distributing or taking any action in reliance on the contents of
> this information is strictly prohibited.*
>
>
> *-----------------------------------------------------------------------------------------*
>
>