You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Josh <jo...@gmail.com> on 2016/06/13 08:14:49 UTC

Re: Accessing StateBackend snapshots outside of Flink

Hello,
I have a follow-up question to this: since Flink doesn't support state
expiration at the moment (e.g. expiring state which hasn't been updated for
a certain amount of time), would it be possible to clear up old UDF states
by:
- store a 'last_updated" timestamp in the state value
- periodically (e.g. monthly) go through all the state values in RocksDB,
deserialize them using TypeSerializer and read the "last_updated" property
- delete the key from RocksDB if the state's "last_updated" property is
over a month ago

Is there any reason this approach wouldn't work, or anything to be careful
of?

Thanks,
Josh


On Mon, Apr 18, 2016 at 8:23 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> key refers to the key extracted by your KeySelector. Right now, for every
> named state (i.e. the name in the StateDescriptor) there is a an isolated
> RocksDB instance.
>
> Cheers,
> Aljoscha
>
> On Sat, 16 Apr 2016 at 15:43 Igor Berman <ig...@gmail.com> wrote:
>
>> thanks a lot for the info, seems not too complex
>> I'll try to write simple tool to read this state.
>>
>> Aljoscha, does the key reflects unique id of operator in some way? Or key
>> is just a "name" that passed to ValueStateDescriptor.
>>
>> thanks in advance
>>
>>
>> On 15 April 2016 at 15:10, Stephan Ewen <se...@apache.org> wrote:
>>
>>> One thing to add is that you can always trigger a persistent checkpoint
>>> via the "savepoints" feature:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>>>
>>>
>>>
>>> On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> for RocksDB we simply use a TypeSerializer to serialize the key and
>>>> value to a byte[] array and store that in RocksDB. For a ListState, we
>>>> serialize the individual elements using a TypeSerializer and store them in
>>>> a comma-separated list in RocksDB. The snapshots of RocksDB that we write
>>>> to HDFS are regular backups of a RocksDB database, as described here:
>>>> https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You
>>>> should be possible to read them from HDFS and restore them to a RocksDB
>>>> data base as described in the linked documentation.
>>>>
>>>> tl;dr As long as you know the type of values stored in the state you
>>>> should be able to read them from RocksDB and deserialize the values using
>>>> TypeSerializer.
>>>>
>>>> One more bit of information: Internally the state is keyed by (key,
>>>> namespace) -> value where namespace can be an arbitrary type that has a
>>>> TypeSerializer. We use this to store window state that is both local to key
>>>> and the current window. For state that you store in a user-defined function
>>>> the namespace will always be null and that will be serialized by a
>>>> VoidSerializer that simply always writes a "0" byte.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Fri, 15 Apr 2016 at 00:18 igor.berman <ig...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> we are evaluating Flink for new solution and several people raised
>>>>> concern
>>>>> of coupling too much to Flink -
>>>>> 1. we understand that if we want to get full fault tolerance and best
>>>>> performance we'll need to use Flink managed state(probably RocksDB
>>>>> backend
>>>>> due to volume of state)
>>>>> 2. but then if we latter find that Flink doesn't answer our needs(for
>>>>> any
>>>>> reason) - we'll need to extract this state in some way(since it's the
>>>>> only
>>>>> source of consistent state)
>>>>> In general I'd like to be able to take snapshot of backend and try to
>>>>> read
>>>>> it...do you think it's will be trivial task?
>>>>> say If I'm holding list state per partitioned key, would it be easy to
>>>>> take
>>>>> RocksDb file and open it?
>>>>>
>>>>> any thoughts regarding how can I convince people in our team?
>>>>>
>>>>> thanks in advance!
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive at Nabble.com.
>>>>>
>>>>
>>>
>>

Re: Accessing StateBackend snapshots outside of Flink

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
there are two open issues about this:
 * https://issues.apache.org/jira/browse/FLINK-3946
 * https://issues.apache.org/jira/browse/FLINK-3089

no work was done on this yet. You can, however, simulate TTL for state by
using a TimelyFlatMapFunction and manually setting a timer for clearing out
state. (available in Flink 1.2-SNAPSHOT).

Cheers,
Aljoscha

On Thu, 3 Nov 2016 at 01:30 bwong247 <bw...@247-inc.com> wrote:

> We're currently investigating Flink, and one of the features that we'd like
> to have is a TTL feature to time out older values in state.  I saw this
> thread and it sounds like the functionality was being considered.  Is there
> any update?
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116p9846.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Accessing StateBackend snapshots outside of Flink

Posted by bwong247 <bw...@247-inc.com>.
We're currently investigating Flink, and one of the features that we'd like
to have is a TTL feature to time out older values in state.  I saw this
thread and it sounds like the functionality was being considered.  Is there
any update?

 



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116p9846.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Accessing StateBackend snapshots outside of Flink

Posted by Maximilian Michels <mx...@apache.org>.
+1 to what Aljoscha said. We should rather fix this programmatically.

On Mon, Jun 13, 2016 at 4:25 PM, Aljoscha Krettek <al...@apache.org> wrote:
> Hi Josh,
> I think RocksDB does not allow accessing a data base instance from more than
> one process concurrently. Even if it were possible I would highly recommend
> not to fiddle with Flink state internals (in RocksDB or elsewhere) from the
> outside. All kinds of things might be going on at any given moment, such as:
> locking of state due to checkpoint, state restore after failure and simple
> state access.
>
> If you are interested in this we can work together on adding proper support
> for TTL (time-to-live) to the Flink state abstraction.
>
> Cheers,
> Aljoscha
>
> On Mon, 13 Jun 2016 at 12:21 Maximilian Michels <mx...@apache.org> wrote:
>>
>> Hi Josh,
>>
>> I'm not a RocksDB expert but the workaround you described should work.
>> Just bear in mind that accessing RocksDB concurrently with a Flink job
>> can result in an inconsistent state. Make sure to perform atomic
>> updates and clear the RocksDB cache for the item.
>>
>> Cheers,
>> Max
>>
>> On Mon, Jun 13, 2016 at 10:14 AM, Josh <jo...@gmail.com> wrote:
>> > Hello,
>> > I have a follow-up question to this: since Flink doesn't support state
>> > expiration at the moment (e.g. expiring state which hasn't been updated
>> > for
>> > a certain amount of time), would it be possible to clear up old UDF
>> > states
>> > by:
>> > - store a 'last_updated" timestamp in the state value
>> > - periodically (e.g. monthly) go through all the state values in
>> > RocksDB,
>> > deserialize them using TypeSerializer and read the "last_updated"
>> > property
>> > - delete the key from RocksDB if the state's "last_updated" property is
>> > over
>> > a month ago
>> >
>> > Is there any reason this approach wouldn't work, or anything to be
>> > careful
>> > of?
>> >
>> > Thanks,
>> > Josh
>> >
>> >
>> > On Mon, Apr 18, 2016 at 8:23 AM, Aljoscha Krettek <al...@apache.org>
>> > wrote:
>> >>
>> >> Hi,
>> >> key refers to the key extracted by your KeySelector. Right now, for
>> >> every
>> >> named state (i.e. the name in the StateDescriptor) there is a an
>> >> isolated
>> >> RocksDB instance.
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >> On Sat, 16 Apr 2016 at 15:43 Igor Berman <ig...@gmail.com> wrote:
>> >>>
>> >>> thanks a lot for the info, seems not too complex
>> >>> I'll try to write simple tool to read this state.
>> >>>
>> >>> Aljoscha, does the key reflects unique id of operator in some way? Or
>> >>> key
>> >>> is just a "name" that passed to ValueStateDescriptor.
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>> On 15 April 2016 at 15:10, Stephan Ewen <se...@apache.org> wrote:
>> >>>>
>> >>>> One thing to add is that you can always trigger a persistent
>> >>>> checkpoint
>> >>>> via the "savepoints" feature:
>> >>>>
>> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek
>> >>>> <al...@apache.org>
>> >>>> wrote:
>> >>>>>
>> >>>>> Hi,
>> >>>>> for RocksDB we simply use a TypeSerializer to serialize the key and
>> >>>>> value to a byte[] array and store that in RocksDB. For a ListState,
>> >>>>> we
>> >>>>> serialize the individual elements using a TypeSerializer and store
>> >>>>> them in a
>> >>>>> comma-separated list in RocksDB. The snapshots of RocksDB that we
>> >>>>> write to
>> >>>>> HDFS are regular backups of a RocksDB database, as described here:
>> >>>>> https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F.
>> >>>>> You
>> >>>>> should be possible to read them from HDFS and restore them to a
>> >>>>> RocksDB data
>> >>>>> base as described in the linked documentation.
>> >>>>>
>> >>>>> tl;dr As long as you know the type of values stored in the state you
>> >>>>> should be able to read them from RocksDB and deserialize the values
>> >>>>> using
>> >>>>> TypeSerializer.
>> >>>>>
>> >>>>> One more bit of information: Internally the state is keyed by (key,
>> >>>>> namespace) -> value where namespace can be an arbitrary type that
>> >>>>> has a
>> >>>>> TypeSerializer. We use this to store window state that is both local
>> >>>>> to key
>> >>>>> and the current window. For state that you store in a user-defined
>> >>>>> function
>> >>>>> the namespace will always be null and that will be serialized by a
>> >>>>> VoidSerializer that simply always writes a "0" byte.
>> >>>>>
>> >>>>> Cheers,
>> >>>>> Aljoscha
>> >>>>>
>> >>>>> On Fri, 15 Apr 2016 at 00:18 igor.berman <ig...@gmail.com>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>> Hi,
>> >>>>>> we are evaluating Flink for new solution and several people raised
>> >>>>>> concern
>> >>>>>> of coupling too much to Flink -
>> >>>>>> 1. we understand that if we want to get full fault tolerance and
>> >>>>>> best
>> >>>>>> performance we'll need to use Flink managed state(probably RocksDB
>> >>>>>> backend
>> >>>>>> due to volume of state)
>> >>>>>> 2. but then if we latter find that Flink doesn't answer our
>> >>>>>> needs(for
>> >>>>>> any
>> >>>>>> reason) - we'll need to extract this state in some way(since it's
>> >>>>>> the
>> >>>>>> only
>> >>>>>> source of consistent state)
>> >>>>>> In general I'd like to be able to take snapshot of backend and try
>> >>>>>> to
>> >>>>>> read
>> >>>>>> it...do you think it's will be trivial task?
>> >>>>>> say If I'm holding list state per partitioned key, would it be easy
>> >>>>>> to
>> >>>>>> take
>> >>>>>> RocksDb file and open it?
>> >>>>>>
>> >>>>>> any thoughts regarding how can I convince people in our team?
>> >>>>>>
>> >>>>>> thanks in advance!
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> --
>> >>>>>> View this message in context:
>> >>>>>>
>> >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
>> >>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>> >>>>>> archive at Nabble.com.
>> >>>>
>> >>>>
>> >>>
>> >

Re: Accessing StateBackend snapshots outside of Flink

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Josh,
I think RocksDB does not allow accessing a data base instance from more
than one process concurrently. Even if it were possible I would highly
recommend not to fiddle with Flink state internals (in RocksDB or
elsewhere) from the outside. All kinds of things might be going on at any
given moment, such as: locking of state due to checkpoint, state restore
after failure and simple state access.

If you are interested in this we can work together on adding proper support
for TTL (time-to-live) to the Flink state abstraction.

Cheers,
Aljoscha

On Mon, 13 Jun 2016 at 12:21 Maximilian Michels <mx...@apache.org> wrote:

> Hi Josh,
>
> I'm not a RocksDB expert but the workaround you described should work.
> Just bear in mind that accessing RocksDB concurrently with a Flink job
> can result in an inconsistent state. Make sure to perform atomic
> updates and clear the RocksDB cache for the item.
>
> Cheers,
> Max
>
> On Mon, Jun 13, 2016 at 10:14 AM, Josh <jo...@gmail.com> wrote:
> > Hello,
> > I have a follow-up question to this: since Flink doesn't support state
> > expiration at the moment (e.g. expiring state which hasn't been updated
> for
> > a certain amount of time), would it be possible to clear up old UDF
> states
> > by:
> > - store a 'last_updated" timestamp in the state value
> > - periodically (e.g. monthly) go through all the state values in RocksDB,
> > deserialize them using TypeSerializer and read the "last_updated"
> property
> > - delete the key from RocksDB if the state's "last_updated" property is
> over
> > a month ago
> >
> > Is there any reason this approach wouldn't work, or anything to be
> careful
> > of?
> >
> > Thanks,
> > Josh
> >
> >
> > On Mon, Apr 18, 2016 at 8:23 AM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >>
> >> Hi,
> >> key refers to the key extracted by your KeySelector. Right now, for
> every
> >> named state (i.e. the name in the StateDescriptor) there is a an
> isolated
> >> RocksDB instance.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Sat, 16 Apr 2016 at 15:43 Igor Berman <ig...@gmail.com> wrote:
> >>>
> >>> thanks a lot for the info, seems not too complex
> >>> I'll try to write simple tool to read this state.
> >>>
> >>> Aljoscha, does the key reflects unique id of operator in some way? Or
> key
> >>> is just a "name" that passed to ValueStateDescriptor.
> >>>
> >>> thanks in advance
> >>>
> >>>
> >>> On 15 April 2016 at 15:10, Stephan Ewen <se...@apache.org> wrote:
> >>>>
> >>>> One thing to add is that you can always trigger a persistent
> checkpoint
> >>>> via the "savepoints" feature:
> >>>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
> >>>>
> >>>>
> >>>>
> >>>> On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek <
> aljoscha@apache.org>
> >>>> wrote:
> >>>>>
> >>>>> Hi,
> >>>>> for RocksDB we simply use a TypeSerializer to serialize the key and
> >>>>> value to a byte[] array and store that in RocksDB. For a ListState,
> we
> >>>>> serialize the individual elements using a TypeSerializer and store
> them in a
> >>>>> comma-separated list in RocksDB. The snapshots of RocksDB that we
> write to
> >>>>> HDFS are regular backups of a RocksDB database, as described here:
> >>>>> https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F.
> You
> >>>>> should be possible to read them from HDFS and restore them to a
> RocksDB data
> >>>>> base as described in the linked documentation.
> >>>>>
> >>>>> tl;dr As long as you know the type of values stored in the state you
> >>>>> should be able to read them from RocksDB and deserialize the values
> using
> >>>>> TypeSerializer.
> >>>>>
> >>>>> One more bit of information: Internally the state is keyed by (key,
> >>>>> namespace) -> value where namespace can be an arbitrary type that
> has a
> >>>>> TypeSerializer. We use this to store window state that is both local
> to key
> >>>>> and the current window. For state that you store in a user-defined
> function
> >>>>> the namespace will always be null and that will be serialized by a
> >>>>> VoidSerializer that simply always writes a "0" byte.
> >>>>>
> >>>>> Cheers,
> >>>>> Aljoscha
> >>>>>
> >>>>> On Fri, 15 Apr 2016 at 00:18 igor.berman <ig...@gmail.com>
> wrote:
> >>>>>>
> >>>>>> Hi,
> >>>>>> we are evaluating Flink for new solution and several people raised
> >>>>>> concern
> >>>>>> of coupling too much to Flink -
> >>>>>> 1. we understand that if we want to get full fault tolerance and
> best
> >>>>>> performance we'll need to use Flink managed state(probably RocksDB
> >>>>>> backend
> >>>>>> due to volume of state)
> >>>>>> 2. but then if we latter find that Flink doesn't answer our
> needs(for
> >>>>>> any
> >>>>>> reason) - we'll need to extract this state in some way(since it's
> the
> >>>>>> only
> >>>>>> source of consistent state)
> >>>>>> In general I'd like to be able to take snapshot of backend and try
> to
> >>>>>> read
> >>>>>> it...do you think it's will be trivial task?
> >>>>>> say If I'm holding list state per partitioned key, would it be easy
> to
> >>>>>> take
> >>>>>> RocksDb file and open it?
> >>>>>>
> >>>>>> any thoughts regarding how can I convince people in our team?
> >>>>>>
> >>>>>> thanks in advance!
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> View this message in context:
> >>>>>>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
> >>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
> >>>>>> archive at Nabble.com.
> >>>>
> >>>>
> >>>
> >
>

Re: Accessing StateBackend snapshots outside of Flink

Posted by Maximilian Michels <mx...@apache.org>.
Hi Josh,

I'm not a RocksDB expert but the workaround you described should work.
Just bear in mind that accessing RocksDB concurrently with a Flink job
can result in an inconsistent state. Make sure to perform atomic
updates and clear the RocksDB cache for the item.

Cheers,
Max

On Mon, Jun 13, 2016 at 10:14 AM, Josh <jo...@gmail.com> wrote:
> Hello,
> I have a follow-up question to this: since Flink doesn't support state
> expiration at the moment (e.g. expiring state which hasn't been updated for
> a certain amount of time), would it be possible to clear up old UDF states
> by:
> - store a 'last_updated" timestamp in the state value
> - periodically (e.g. monthly) go through all the state values in RocksDB,
> deserialize them using TypeSerializer and read the "last_updated" property
> - delete the key from RocksDB if the state's "last_updated" property is over
> a month ago
>
> Is there any reason this approach wouldn't work, or anything to be careful
> of?
>
> Thanks,
> Josh
>
>
> On Mon, Apr 18, 2016 at 8:23 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>>
>> Hi,
>> key refers to the key extracted by your KeySelector. Right now, for every
>> named state (i.e. the name in the StateDescriptor) there is a an isolated
>> RocksDB instance.
>>
>> Cheers,
>> Aljoscha
>>
>> On Sat, 16 Apr 2016 at 15:43 Igor Berman <ig...@gmail.com> wrote:
>>>
>>> thanks a lot for the info, seems not too complex
>>> I'll try to write simple tool to read this state.
>>>
>>> Aljoscha, does the key reflects unique id of operator in some way? Or key
>>> is just a "name" that passed to ValueStateDescriptor.
>>>
>>> thanks in advance
>>>
>>>
>>> On 15 April 2016 at 15:10, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>> One thing to add is that you can always trigger a persistent checkpoint
>>>> via the "savepoints" feature:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>>>>
>>>>
>>>>
>>>> On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek <al...@apache.org>
>>>> wrote:
>>>>>
>>>>> Hi,
>>>>> for RocksDB we simply use a TypeSerializer to serialize the key and
>>>>> value to a byte[] array and store that in RocksDB. For a ListState, we
>>>>> serialize the individual elements using a TypeSerializer and store them in a
>>>>> comma-separated list in RocksDB. The snapshots of RocksDB that we write to
>>>>> HDFS are regular backups of a RocksDB database, as described here:
>>>>> https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You
>>>>> should be possible to read them from HDFS and restore them to a RocksDB data
>>>>> base as described in the linked documentation.
>>>>>
>>>>> tl;dr As long as you know the type of values stored in the state you
>>>>> should be able to read them from RocksDB and deserialize the values using
>>>>> TypeSerializer.
>>>>>
>>>>> One more bit of information: Internally the state is keyed by (key,
>>>>> namespace) -> value where namespace can be an arbitrary type that has a
>>>>> TypeSerializer. We use this to store window state that is both local to key
>>>>> and the current window. For state that you store in a user-defined function
>>>>> the namespace will always be null and that will be serialized by a
>>>>> VoidSerializer that simply always writes a "0" byte.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Fri, 15 Apr 2016 at 00:18 igor.berman <ig...@gmail.com> wrote:
>>>>>>
>>>>>> Hi,
>>>>>> we are evaluating Flink for new solution and several people raised
>>>>>> concern
>>>>>> of coupling too much to Flink -
>>>>>> 1. we understand that if we want to get full fault tolerance and best
>>>>>> performance we'll need to use Flink managed state(probably RocksDB
>>>>>> backend
>>>>>> due to volume of state)
>>>>>> 2. but then if we latter find that Flink doesn't answer our needs(for
>>>>>> any
>>>>>> reason) - we'll need to extract this state in some way(since it's the
>>>>>> only
>>>>>> source of consistent state)
>>>>>> In general I'd like to be able to take snapshot of backend and try to
>>>>>> read
>>>>>> it...do you think it's will be trivial task?
>>>>>> say If I'm holding list state per partitioned key, would it be easy to
>>>>>> take
>>>>>> RocksDb file and open it?
>>>>>>
>>>>>> any thoughts regarding how can I convince people in our team?
>>>>>>
>>>>>> thanks in advance!
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>> archive at Nabble.com.
>>>>
>>>>
>>>
>