You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shannon Carey <sc...@expedia.com> on 2016/04/06 07:55:53 UTC

State in external db (dynamodb)

Hi, new Flink user here!

I found a discussion on user@flink.apache.org about using DynamoDB as a sink. However, as noted, sinks have an at-least-once guarantee so your operations must idempotent.

However, another way to go about this (and correct me if I'm wrong) is to write the state to the external store via a custom State Backend. Since the state participates in checkpointing, you don't have to worry about idempotency: every time state is checkpointed, overwrite the value of that key.

We are starting a project with Flink, and we are interested in evicting the state from memory once a TTL is reached during which no events have come in for that state. Subsequently, when an event is processed, we must be able to quickly load up any evicted state. Does this sound reasonable? We are considering using DynamoDB for our state backend because it seems like all we will need is a key-value store. The only weakness of this is that if state gets older than, say, 2 years we would like to get rid of it which might not be easy in DynamoDB. I don't suppose Flink has any behind-the-scenes features that deal with getting rid of old state (either evicting from memory or TTL/aging out entirely)?

Thanks for your time!
Shannon Carey

Re: State in external db (dynamodb)

Posted by Chen Qin <qi...@gmail.com>.
> I wonder if it can be solved by storing state in the external store with a
> tuple: (state, previous_state, checkpoint_id). Then when reading from the
> store, if checkpoint_id is in the future, read the previous_state,
> otherwise take the current_state.
>

I think it did "filtering" part work. Pointing to previous checkpoint
states that is complete.
It do need to hold two states per snapshot(or base+delta). It would be nice
if Flink can support incremental checkpointing and reason about who ref who
during clean up so that each records are not growing larger and larger. But
it seems off topic from your question.



> On Mon, Jul 25, 2016 at 3:20 PM, Josh <jo...@gmail.com> wrote:
>
>> Hi Chen,
>>
>> Can you explain what you mean a bit more? I'm not sure I understand the
>> problem.
>>
>> Does anyone know if the tooling discussed here has been merged into Flink
>> already? Or if there's an example of what this custom sink would look like?
>> I guess the sink would buffer updates in-memory between checkpoints. Then
>> it would implement the Checkpointed interface and write to the external
>> store in snapshotState(...)?
>>
>> Thanks,
>> Josh
>>
>> On Sun, Jul 24, 2016 at 6:00 PM, Chen Qin <qi...@gmail.com> wrote:
>>
>>>
>>>
>>> On Jul 22, 2016, at 2:54 AM, Josh <jo...@gmail.com> wrote:
>>>
>>> Hi all,
>>>
>>> >(1)  Only write to the DB upon a checkpoint, at which point it is known
>>> that no replay of that data will occur any more. Values from partially
>>> successful writes will be overwritten >with correct value. I assume that is
>>> what you thought of when referring to the State Backend, because in some sense,
>>> that is what that state backend would do.
>>>
>>>
>>> I feel the problem is about how to commit all snapshots as a
>>> transaction. Partial writes pose cleanup challenges when job restore.
>>> A easy hack would be treat Rocksdb as cache and keep states updates
>>> there. Aka aemanifest. do cleanup check before actual restore.
>>>
>>>
>>>
>>> >I think it is simpler to realize that in a custom sink, than developing
>>>  a new state backend.  Another Flink committer (Chesnay) has developed
>>> some nice tooling for that, to >be merged into Flink soon.
>>>
>>> I am planning to implement something like this:
>>>
>>> Say I have a topology which looks like this: [source => operator =>
>>> sink], I would like it to work like this:
>>> 1. Upon receiving an element, the operator retrieves some state from an
>>> external key-value store (would like to put an in-memory cache on top of
>>> this with a TTL)
>>> 2. The operator emits a new state (and updates its in-memory cache with
>>> the new state)
>>> 3. The sink batches up all the new states and upon checkpoint flushes
>>> them to the external store
>>>
>>> Could anyone point me at the work that's already been done on this? Has
>>> it already been merged into Flink?
>>>
>>> Thanks,
>>> Josh
>>>
>>> On Thu, Apr 7, 2016 at 12:53 PM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> regarding windows and incremental aggregation. This is already
>>>> happening in Flink as of now. When you give a ReduceFunction on a window,
>>>> which "sum" internally does, the result for a window is incrementally
>>>> updated whenever a new element comes in. This incremental aggregation only
>>>> happens when you specify a ReduceFunction or a FoldFunction, not for the
>>>> general case of a WindowFunction, where all elements in the window are
>>>> required.
>>>>
>>>> You are right about incremental snapshots. We mainly want to introduce
>>>> them to reduce latency incurred by snapshotting. Right now, processing
>>>> stalls when a checkpoint happens.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 7 Apr 2016 at 13:12 Shannon Carey <sc...@expedia.com> wrote:
>>>>
>>>>> Thanks very kindly for your response, Stephan!
>>>>>
>>>>> We will definitely use a custom sink for persistence of idempotent
>>>>> mutations whenever possible. Exposing state as read-only to external
>>>>> systems is a complication we will try to avoid. Also, we will definitely
>>>>> only write to the DB upon checkpoint, and the write will be synchronous and
>>>>> transactional (no possibility of partial success/failure).
>>>>>
>>>>> However, we do want Flink state to be durable, we want it to be in
>>>>> memory when possible, and we want to avoid running out of memory due to the
>>>>> size of the state. For example, if you have a wide window that hasn't
>>>>> gotten an event for a long time, we want to evict that window state from
>>>>> memory. We're now thinking of using Redis (via AWS Elasticache) which also
>>>>> conveniently has TTL, instead of DynamoDB.
>>>>>
>>>>> I just wanted to check whether eviction of (inactive/quiet) state from
>>>>> memory is something that I should consider implementing, or whether Flink
>>>>> already had some built-in way of doing it.
>>>>>
>>>>> Along the same lines, I am also wondering whether Flink already has
>>>>> means of compacting the state of a window by applying an aggregation
>>>>> function to the elements so-far (eg. every time window is triggered)? For
>>>>> example, if you are only executing a sum on the contents of the window, the
>>>>> window state doesn't need to store all the individual items in the window,
>>>>> it only needs to store the sum. Aggregations other than "sum" might have
>>>>> that characteristic too. I don't know if Flink is already that intelligent
>>>>> or whether I should figure out how to aggregate window contents myself when
>>>>> possible with something like a window fold? Another poster (Aljoscha) was
>>>>> talking about adding incremental snapshots, but it sounds like that would
>>>>> only improve the write throughput not the memory usage.
>>>>>
>>>>> Thanks again!
>>>>> Shannon Carey
>>>>>
>>>>>
>>>>> From: Stephan Ewen <se...@apache.org>
>>>>> Date: Wednesday, April 6, 2016 at 10:37 PM
>>>>> To: <us...@flink.apache.org>
>>>>> Subject: Re: State in external db (dynamodb)
>>>>>
>>>>> Hi Shannon!
>>>>>
>>>>> Welcome to the Flink community!
>>>>>
>>>>> You are right, sinks need in general to be idempotent if you want
>>>>> "exactly-once" semantics, because there can be a replay of elements that
>>>>> were already written.
>>>>>
>>>>> However, what you describe later, overwriting of a key with a new
>>>>> value (or the same value again) is pretty much sufficient. That means that
>>>>> when a duplicate write happens during replay, the value for the key is
>>>>> simply overwritten with the same value again.
>>>>> As long as all computation is purely in Flink and you only write to
>>>>> the key/value store (rather than read from k/v, modify in Flink, write to
>>>>> k/v), you get the consistency that for example counts/aggregates never have
>>>>> duplicates.
>>>>>
>>>>> If Flink needs to look up state from the database (because it is no
>>>>> longer in Flink), it is a bit more tricky. I assume that is where you are
>>>>> going with "Subsequently, when an event is processed, we must be able
>>>>> to quickly load up any evicted state".  In that case, there are two
>>>>> things you can do:
>>>>>
>>>>> (1)  Only write to the DB upon a checkpoint, at which point it is
>>>>> known that no replay of that data will occur any more. Values from
>>>>> partially successful writes will be overwritten with correct value. I
>>>>> assume that is what you thought of when referring to the State Backend,
>>>>> because in some sense, that is what that state backend would do.
>>>>>
>>>>> I think it is simpler to realize that in a custom sink, than
>>>>> developing a new state backend.  Another Flink committer (Chesnay)
>>>>> has developed some nice tooling for that, to be merged into Flink soon.
>>>>>
>>>>> (2) You could attach version numbers to every write, and increment the
>>>>> versions upon each checkpoint. That allows you to always refer to a
>>>>> consistent previous value, if some writes were made, but a failure occurred
>>>>> before the checkpoint completed.
>>>>>
>>>>> I hope these answers apply to your case. Let us know if some things
>>>>> are still unclear, or if I misunderstood your question!
>>>>>
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever <
>>>>> sanne.de.roever@gmail.com> wrote:
>>>>>
>>>>>> FYI Cassandra has a TTL on data:
>>>>>> https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html
>>>>>>
>>>>>> On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey <sc...@expedia.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, new Flink user here!
>>>>>>>
>>>>>>> I found a discussion on user@flink.apache.org about using DynamoDB
>>>>>>> as a sink. However, as noted, sinks have an at-least-once guarantee so your
>>>>>>> operations must idempotent.
>>>>>>>
>>>>>>> However, another way to go about this (and correct me if I'm wrong)
>>>>>>> is to write the state to the external store via a custom State Backend.
>>>>>>> Since the state participates in checkpointing, you don't have to worry
>>>>>>> about idempotency: every time state is checkpointed, overwrite the value of
>>>>>>> that key.
>>>>>>>
>>>>>>> We are starting a project with Flink, and we are interested in
>>>>>>> evicting the state from memory once a TTL is reached during which no events
>>>>>>> have come in for that state. Subsequently, when an event is processed, we
>>>>>>> must be able to quickly load up any evicted state. Does this sound
>>>>>>> reasonable? We are considering using DynamoDB for our state backend because
>>>>>>> it seems like all we will need is a key-value store. The only weakness of
>>>>>>> this is that if state gets older than, say, 2 years we would like to get
>>>>>>> rid of it which might not be easy in DynamoDB. I don't suppose Flink has
>>>>>>> any behind-the-scenes features that deal with getting rid of old state
>>>>>>> (either evicting from memory or TTL/aging out entirely)?
>>>>>>>
>>>>>>> Thanks for your time!
>>>>>>> Shannon Carey
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Re: State in external db (dynamodb)

Posted by Josh <jo...@gmail.com>.
Nevermind I think I understand the point about partial writes. Is it that
if we write out our buffer of updates to the external store and the batch
update is not atomic, then the external store is in an inconsistent state?
(with some state from the attempted checkpoint, and some from the previous
checkpoint).

I guess that means this solution will only work in the case where you only
write to the external store, and writes are idempotent - and it won't work
for my use case where I need to read from the external store, apply an
update and then write.

I wonder if it can be solved by storing state in the external store with a
tuple: (state, previous_state, checkpoint_id). Then when reading from the
store, if checkpoint_id is in the future, read the previous_state,
otherwise take the current_state.


On Mon, Jul 25, 2016 at 3:20 PM, Josh <jo...@gmail.com> wrote:

> Hi Chen,
>
> Can you explain what you mean a bit more? I'm not sure I understand the
> problem.
>
> Does anyone know if the tooling discussed here has been merged into Flink
> already? Or if there's an example of what this custom sink would look like?
> I guess the sink would buffer updates in-memory between checkpoints. Then
> it would implement the Checkpointed interface and write to the external
> store in snapshotState(...)?
>
> Thanks,
> Josh
>
> On Sun, Jul 24, 2016 at 6:00 PM, Chen Qin <qi...@gmail.com> wrote:
>
>>
>>
>> On Jul 22, 2016, at 2:54 AM, Josh <jo...@gmail.com> wrote:
>>
>> Hi all,
>>
>> >(1)  Only write to the DB upon a checkpoint, at which point it is known
>> that no replay of that data will occur any more. Values from partially
>> successful writes will be overwritten >with correct value. I assume that is
>> what you thought of when referring to the State Backend, because in some sense,
>> that is what that state backend would do.
>>
>>
>> I feel the problem is about how to commit all snapshots as a transaction.
>> Partial writes pose cleanup challenges when job restore.
>> A easy hack would be treat Rocksdb as cache and keep states updates
>> there. Aka aemanifest. do cleanup check before actual restore.
>>
>>
>>
>> >I think it is simpler to realize that in a custom sink, than developing
>>  a new state backend.  Another Flink committer (Chesnay) has developed
>> some nice tooling for that, to >be merged into Flink soon.
>>
>> I am planning to implement something like this:
>>
>> Say I have a topology which looks like this: [source => operator =>
>> sink], I would like it to work like this:
>> 1. Upon receiving an element, the operator retrieves some state from an
>> external key-value store (would like to put an in-memory cache on top of
>> this with a TTL)
>> 2. The operator emits a new state (and updates its in-memory cache with
>> the new state)
>> 3. The sink batches up all the new states and upon checkpoint flushes
>> them to the external store
>>
>> Could anyone point me at the work that's already been done on this? Has
>> it already been merged into Flink?
>>
>> Thanks,
>> Josh
>>
>> On Thu, Apr 7, 2016 at 12:53 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>> regarding windows and incremental aggregation. This is already happening
>>> in Flink as of now. When you give a ReduceFunction on a window, which "sum"
>>> internally does, the result for a window is incrementally updated whenever
>>> a new element comes in. This incremental aggregation only happens when you
>>> specify a ReduceFunction or a FoldFunction, not for the general case of a
>>> WindowFunction, where all elements in the window are required.
>>>
>>> You are right about incremental snapshots. We mainly want to introduce
>>> them to reduce latency incurred by snapshotting. Right now, processing
>>> stalls when a checkpoint happens.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 7 Apr 2016 at 13:12 Shannon Carey <sc...@expedia.com> wrote:
>>>
>>>> Thanks very kindly for your response, Stephan!
>>>>
>>>> We will definitely use a custom sink for persistence of idempotent
>>>> mutations whenever possible. Exposing state as read-only to external
>>>> systems is a complication we will try to avoid. Also, we will definitely
>>>> only write to the DB upon checkpoint, and the write will be synchronous and
>>>> transactional (no possibility of partial success/failure).
>>>>
>>>> However, we do want Flink state to be durable, we want it to be in
>>>> memory when possible, and we want to avoid running out of memory due to the
>>>> size of the state. For example, if you have a wide window that hasn't
>>>> gotten an event for a long time, we want to evict that window state from
>>>> memory. We're now thinking of using Redis (via AWS Elasticache) which also
>>>> conveniently has TTL, instead of DynamoDB.
>>>>
>>>> I just wanted to check whether eviction of (inactive/quiet) state from
>>>> memory is something that I should consider implementing, or whether Flink
>>>> already had some built-in way of doing it.
>>>>
>>>> Along the same lines, I am also wondering whether Flink already has
>>>> means of compacting the state of a window by applying an aggregation
>>>> function to the elements so-far (eg. every time window is triggered)? For
>>>> example, if you are only executing a sum on the contents of the window, the
>>>> window state doesn't need to store all the individual items in the window,
>>>> it only needs to store the sum. Aggregations other than "sum" might have
>>>> that characteristic too. I don't know if Flink is already that intelligent
>>>> or whether I should figure out how to aggregate window contents myself when
>>>> possible with something like a window fold? Another poster (Aljoscha) was
>>>> talking about adding incremental snapshots, but it sounds like that would
>>>> only improve the write throughput not the memory usage.
>>>>
>>>> Thanks again!
>>>> Shannon Carey
>>>>
>>>>
>>>> From: Stephan Ewen <se...@apache.org>
>>>> Date: Wednesday, April 6, 2016 at 10:37 PM
>>>> To: <us...@flink.apache.org>
>>>> Subject: Re: State in external db (dynamodb)
>>>>
>>>> Hi Shannon!
>>>>
>>>> Welcome to the Flink community!
>>>>
>>>> You are right, sinks need in general to be idempotent if you want
>>>> "exactly-once" semantics, because there can be a replay of elements that
>>>> were already written.
>>>>
>>>> However, what you describe later, overwriting of a key with a new value
>>>> (or the same value again) is pretty much sufficient. That means that when a
>>>> duplicate write happens during replay, the value for the key is simply
>>>> overwritten with the same value again.
>>>> As long as all computation is purely in Flink and you only write to the
>>>> key/value store (rather than read from k/v, modify in Flink, write to k/v),
>>>> you get the consistency that for example counts/aggregates never have
>>>> duplicates.
>>>>
>>>> If Flink needs to look up state from the database (because it is no
>>>> longer in Flink), it is a bit more tricky. I assume that is where you are
>>>> going with "Subsequently, when an event is processed, we must be able
>>>> to quickly load up any evicted state".  In that case, there are two
>>>> things you can do:
>>>>
>>>> (1)  Only write to the DB upon a checkpoint, at which point it is known
>>>> that no replay of that data will occur any more. Values from partially
>>>> successful writes will be overwritten with correct value. I assume that is
>>>> what you thought of when referring to the State Backend, because in some
>>>> sense, that is what that state backend would do.
>>>>
>>>> I think it is simpler to realize that in a custom sink, than developing
>>>> a new state backend.  Another Flink committer (Chesnay) has developed
>>>> some nice tooling for that, to be merged into Flink soon.
>>>>
>>>> (2) You could attach version numbers to every write, and increment the
>>>> versions upon each checkpoint. That allows you to always refer to a
>>>> consistent previous value, if some writes were made, but a failure occurred
>>>> before the checkpoint completed.
>>>>
>>>> I hope these answers apply to your case. Let us know if some things are
>>>> still unclear, or if I misunderstood your question!
>>>>
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>>
>>>> On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever <
>>>> sanne.de.roever@gmail.com> wrote:
>>>>
>>>>> FYI Cassandra has a TTL on data:
>>>>> https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html
>>>>>
>>>>> On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey <sc...@expedia.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, new Flink user here!
>>>>>>
>>>>>> I found a discussion on user@flink.apache.org about using DynamoDB
>>>>>> as a sink. However, as noted, sinks have an at-least-once guarantee so your
>>>>>> operations must idempotent.
>>>>>>
>>>>>> However, another way to go about this (and correct me if I'm wrong)
>>>>>> is to write the state to the external store via a custom State Backend.
>>>>>> Since the state participates in checkpointing, you don't have to worry
>>>>>> about idempotency: every time state is checkpointed, overwrite the value of
>>>>>> that key.
>>>>>>
>>>>>> We are starting a project with Flink, and we are interested in
>>>>>> evicting the state from memory once a TTL is reached during which no events
>>>>>> have come in for that state. Subsequently, when an event is processed, we
>>>>>> must be able to quickly load up any evicted state. Does this sound
>>>>>> reasonable? We are considering using DynamoDB for our state backend because
>>>>>> it seems like all we will need is a key-value store. The only weakness of
>>>>>> this is that if state gets older than, say, 2 years we would like to get
>>>>>> rid of it which might not be easy in DynamoDB. I don't suppose Flink has
>>>>>> any behind-the-scenes features that deal with getting rid of old state
>>>>>> (either evicting from memory or TTL/aging out entirely)?
>>>>>>
>>>>>> Thanks for your time!
>>>>>> Shannon Carey
>>>>>>
>>>>>
>>>>>
>>>>
>>
>

Re: State in external db (dynamodb)

Posted by Josh <jo...@gmail.com>.
Hi Chen,

Can you explain what you mean a bit more? I'm not sure I understand the
problem.

Does anyone know if the tooling discussed here has been merged into Flink
already? Or if there's an example of what this custom sink would look like?
I guess the sink would buffer updates in-memory between checkpoints. Then
it would implement the Checkpointed interface and write to the external
store in snapshotState(...)?

Thanks,
Josh

On Sun, Jul 24, 2016 at 6:00 PM, Chen Qin <qi...@gmail.com> wrote:

>
>
> On Jul 22, 2016, at 2:54 AM, Josh <jo...@gmail.com> wrote:
>
> Hi all,
>
> >(1)  Only write to the DB upon a checkpoint, at which point it is known
> that no replay of that data will occur any more. Values from partially
> successful writes will be overwritten >with correct value. I assume that is
> what you thought of when referring to the State Backend, because in some sense,
> that is what that state backend would do.
>
>
> I feel the problem is about how to commit all snapshots as a transaction.
> Partial writes pose cleanup challenges when job restore.
> A easy hack would be treat Rocksdb as cache and keep states updates there.
> Aka aemanifest. do cleanup check before actual restore.
>
>
>
> >I think it is simpler to realize that in a custom sink, than developing
>  a new state backend.  Another Flink committer (Chesnay) has developed
> some nice tooling for that, to >be merged into Flink soon.
>
> I am planning to implement something like this:
>
> Say I have a topology which looks like this: [source => operator => sink],
> I would like it to work like this:
> 1. Upon receiving an element, the operator retrieves some state from an
> external key-value store (would like to put an in-memory cache on top of
> this with a TTL)
> 2. The operator emits a new state (and updates its in-memory cache with
> the new state)
> 3. The sink batches up all the new states and upon checkpoint flushes them
> to the external store
>
> Could anyone point me at the work that's already been done on this? Has it
> already been merged into Flink?
>
> Thanks,
> Josh
>
> On Thu, Apr 7, 2016 at 12:53 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> regarding windows and incremental aggregation. This is already happening
>> in Flink as of now. When you give a ReduceFunction on a window, which "sum"
>> internally does, the result for a window is incrementally updated whenever
>> a new element comes in. This incremental aggregation only happens when you
>> specify a ReduceFunction or a FoldFunction, not for the general case of a
>> WindowFunction, where all elements in the window are required.
>>
>> You are right about incremental snapshots. We mainly want to introduce
>> them to reduce latency incurred by snapshotting. Right now, processing
>> stalls when a checkpoint happens.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 7 Apr 2016 at 13:12 Shannon Carey <sc...@expedia.com> wrote:
>>
>>> Thanks very kindly for your response, Stephan!
>>>
>>> We will definitely use a custom sink for persistence of idempotent
>>> mutations whenever possible. Exposing state as read-only to external
>>> systems is a complication we will try to avoid. Also, we will definitely
>>> only write to the DB upon checkpoint, and the write will be synchronous and
>>> transactional (no possibility of partial success/failure).
>>>
>>> However, we do want Flink state to be durable, we want it to be in
>>> memory when possible, and we want to avoid running out of memory due to the
>>> size of the state. For example, if you have a wide window that hasn't
>>> gotten an event for a long time, we want to evict that window state from
>>> memory. We're now thinking of using Redis (via AWS Elasticache) which also
>>> conveniently has TTL, instead of DynamoDB.
>>>
>>> I just wanted to check whether eviction of (inactive/quiet) state from
>>> memory is something that I should consider implementing, or whether Flink
>>> already had some built-in way of doing it.
>>>
>>> Along the same lines, I am also wondering whether Flink already has
>>> means of compacting the state of a window by applying an aggregation
>>> function to the elements so-far (eg. every time window is triggered)? For
>>> example, if you are only executing a sum on the contents of the window, the
>>> window state doesn't need to store all the individual items in the window,
>>> it only needs to store the sum. Aggregations other than "sum" might have
>>> that characteristic too. I don't know if Flink is already that intelligent
>>> or whether I should figure out how to aggregate window contents myself when
>>> possible with something like a window fold? Another poster (Aljoscha) was
>>> talking about adding incremental snapshots, but it sounds like that would
>>> only improve the write throughput not the memory usage.
>>>
>>> Thanks again!
>>> Shannon Carey
>>>
>>>
>>> From: Stephan Ewen <se...@apache.org>
>>> Date: Wednesday, April 6, 2016 at 10:37 PM
>>> To: <us...@flink.apache.org>
>>> Subject: Re: State in external db (dynamodb)
>>>
>>> Hi Shannon!
>>>
>>> Welcome to the Flink community!
>>>
>>> You are right, sinks need in general to be idempotent if you want
>>> "exactly-once" semantics, because there can be a replay of elements that
>>> were already written.
>>>
>>> However, what you describe later, overwriting of a key with a new value
>>> (or the same value again) is pretty much sufficient. That means that when a
>>> duplicate write happens during replay, the value for the key is simply
>>> overwritten with the same value again.
>>> As long as all computation is purely in Flink and you only write to the
>>> key/value store (rather than read from k/v, modify in Flink, write to k/v),
>>> you get the consistency that for example counts/aggregates never have
>>> duplicates.
>>>
>>> If Flink needs to look up state from the database (because it is no
>>> longer in Flink), it is a bit more tricky. I assume that is where you are
>>> going with "Subsequently, when an event is processed, we must be able
>>> to quickly load up any evicted state".  In that case, there are two
>>> things you can do:
>>>
>>> (1)  Only write to the DB upon a checkpoint, at which point it is known
>>> that no replay of that data will occur any more. Values from partially
>>> successful writes will be overwritten with correct value. I assume that is
>>> what you thought of when referring to the State Backend, because in some
>>> sense, that is what that state backend would do.
>>>
>>> I think it is simpler to realize that in a custom sink, than developing
>>> a new state backend.  Another Flink committer (Chesnay) has developed
>>> some nice tooling for that, to be merged into Flink soon.
>>>
>>> (2) You could attach version numbers to every write, and increment the
>>> versions upon each checkpoint. That allows you to always refer to a
>>> consistent previous value, if some writes were made, but a failure occurred
>>> before the checkpoint completed.
>>>
>>> I hope these answers apply to your case. Let us know if some things are
>>> still unclear, or if I misunderstood your question!
>>>
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>>
>>> On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever <
>>> sanne.de.roever@gmail.com> wrote:
>>>
>>>> FYI Cassandra has a TTL on data:
>>>> https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html
>>>>
>>>> On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey <sc...@expedia.com>
>>>> wrote:
>>>>
>>>>> Hi, new Flink user here!
>>>>>
>>>>> I found a discussion on user@flink.apache.org about using DynamoDB as
>>>>> a sink. However, as noted, sinks have an at-least-once guarantee so your
>>>>> operations must idempotent.
>>>>>
>>>>> However, another way to go about this (and correct me if I'm wrong) is
>>>>> to write the state to the external store via a custom State Backend. Since
>>>>> the state participates in checkpointing, you don't have to worry about
>>>>> idempotency: every time state is checkpointed, overwrite the value of that
>>>>> key.
>>>>>
>>>>> We are starting a project with Flink, and we are interested in
>>>>> evicting the state from memory once a TTL is reached during which no events
>>>>> have come in for that state. Subsequently, when an event is processed, we
>>>>> must be able to quickly load up any evicted state. Does this sound
>>>>> reasonable? We are considering using DynamoDB for our state backend because
>>>>> it seems like all we will need is a key-value store. The only weakness of
>>>>> this is that if state gets older than, say, 2 years we would like to get
>>>>> rid of it which might not be easy in DynamoDB. I don't suppose Flink has
>>>>> any behind-the-scenes features that deal with getting rid of old state
>>>>> (either evicting from memory or TTL/aging out entirely)?
>>>>>
>>>>> Thanks for your time!
>>>>> Shannon Carey
>>>>>
>>>>
>>>>
>>>
>

Re: State in external db (dynamodb)

Posted by Chen Qin <qi...@gmail.com>.

> On Jul 22, 2016, at 2:54 AM, Josh <jo...@gmail.com> wrote:
> 
> Hi all,
> 
> >(1)  Only write to the DB upon a checkpoint, at which point it is known that no replay of that data will occur any more. Values from partially successful writes will be overwritten >with correct value. I assume that is what you thought of when referring to the State Backend, because in some sense, that is what that state backend would do.

I feel the problem is about how to commit all snapshots as a transaction. Partial writes pose cleanup challenges when job restore. 
A easy hack would be treat Rocksdb as cache and keep states updates there. Aka aemanifest. do cleanup check before actual restore.


> 
> >I think it is simpler to realize that in a custom sink, than developing a new state backend.  Another Flink committer (Chesnay) has developed some nice tooling for that, to >be merged into Flink soon. 
> 
> I am planning to implement something like this:
> 
> Say I have a topology which looks like this: [source => operator => sink], I would like it to work like this:
> 1. Upon receiving an element, the operator retrieves some state from an external key-value store (would like to put an in-memory cache on top of this with a TTL)
> 2. The operator emits a new state (and updates its in-memory cache with the new state)
> 3. The sink batches up all the new states and upon checkpoint flushes them to the external store
> 
> Could anyone point me at the work that's already been done on this? Has it already been merged into Flink?
> 
> Thanks,
> Josh
> 
>> On Thu, Apr 7, 2016 at 12:53 PM, Aljoscha Krettek <al...@apache.org> wrote:
>> Hi,
>> regarding windows and incremental aggregation. This is already happening in Flink as of now. When you give a ReduceFunction on a window, which "sum" internally does, the result for a window is incrementally updated whenever a new element comes in. This incremental aggregation only happens when you specify a ReduceFunction or a FoldFunction, not for the general case of a WindowFunction, where all elements in the window are required.
>> 
>> You are right about incremental snapshots. We mainly want to introduce them to reduce latency incurred by snapshotting. Right now, processing stalls when a checkpoint happens.
>> 
>> Cheers,
>> Aljoscha
>> 
>>> On Thu, 7 Apr 2016 at 13:12 Shannon Carey <sc...@expedia.com> wrote:
>>> Thanks very kindly for your response, Stephan!
>>> 
>>> We will definitely use a custom sink for persistence of idempotent mutations whenever possible. Exposing state as read-only to external systems is a complication we will try to avoid. Also, we will definitely only write to the DB upon checkpoint, and the write will be synchronous and transactional (no possibility of partial success/failure).
>>> 
>>> However, we do want Flink state to be durable, we want it to be in memory when possible, and we want to avoid running out of memory due to the size of the state. For example, if you have a wide window that hasn't gotten an event for a long time, we want to evict that window state from memory. We're now thinking of using Redis (via AWS Elasticache) which also conveniently has TTL, instead of DynamoDB.
>>> 
>>> I just wanted to check whether eviction of (inactive/quiet) state from memory is something that I should consider implementing, or whether Flink already had some built-in way of doing it.
>>> 
>>> Along the same lines, I am also wondering whether Flink already has means of compacting the state of a window by applying an aggregation function to the elements so-far (eg. every time window is triggered)? For example, if you are only executing a sum on the contents of the window, the window state doesn't need to store all the individual items in the window, it only needs to store the sum. Aggregations other than "sum" might have that characteristic too. I don't know if Flink is already that intelligent or whether I should figure out how to aggregate window contents myself when possible with something like a window fold? Another poster (Aljoscha) was talking about adding incremental snapshots, but it sounds like that would only improve the write throughput not the memory usage.
>>> 
>>> Thanks again!
>>> Shannon Carey
>>> 
>>> 
>>> From: Stephan Ewen <se...@apache.org>
>>> Date: Wednesday, April 6, 2016 at 10:37 PM
>>> To: <us...@flink.apache.org>
>>> Subject: Re: State in external db (dynamodb)
>>> 
>>> Hi Shannon!
>>> 
>>> Welcome to the Flink community!
>>> 
>>> You are right, sinks need in general to be idempotent if you want "exactly-once" semantics, because there can be a replay of elements that were already written.
>>> 
>>> However, what you describe later, overwriting of a key with a new value (or the same value again) is pretty much sufficient. That means that when a duplicate write happens during replay, the value for the key is simply overwritten with the same value again.
>>> As long as all computation is purely in Flink and you only write to the key/value store (rather than read from k/v, modify in Flink, write to k/v), you get the consistency that for example counts/aggregates never have duplicates.
>>> 
>>> If Flink needs to look up state from the database (because it is no longer in Flink), it is a bit more tricky. I assume that is where you are going with "Subsequently, when an event is processed, we must be able to quickly load up any evicted state".  In that case, there are two things you can do:
>>> 
>>> (1)  Only write to the DB upon a checkpoint, at which point it is known that no replay of that data will occur any more. Values from partially successful writes will be overwritten with correct value. I assume that is what you thought of when referring to the State Backend, because in some sense, that is what that state backend would do.
>>> 
>>> I think it is simpler to realize that in a custom sink, than developing a new state backend.  Another Flink committer (Chesnay) has developed some nice tooling for that, to be merged into Flink soon. 
>>> 
>>> (2) You could attach version numbers to every write, and increment the versions upon each checkpoint. That allows you to always refer to a consistent previous value, if some writes were made, but a failure occurred before the checkpoint completed.
>>> 
>>> I hope these answers apply to your case. Let us know if some things are still unclear, or if I misunderstood your question!
>>> 
>>> 
>>> Greetings,
>>> Stephan
>>> 
>>> 
>>> 
>>>> On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever <sa...@gmail.com> wrote:
>>>> FYI Cassandra has a TTL on data: https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html
>>>> 
>>>>> On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey <sc...@expedia.com> wrote:
>>>>> Hi, new Flink user here!
>>>>> 
>>>>> I found a discussion on user@flink.apache.org about using DynamoDB as a sink. However, as noted, sinks have an at-least-once guarantee so your operations must idempotent.
>>>>> 
>>>>> However, another way to go about this (and correct me if I'm wrong) is to write the state to the external store via a custom State Backend. Since the state participates in checkpointing, you don't have to worry about idempotency: every time state is checkpointed, overwrite the value of that key.
>>>>> 
>>>>> We are starting a project with Flink, and we are interested in evicting the state from memory once a TTL is reached during which no events have come in for that state. Subsequently, when an event is processed, we must be able to quickly load up any evicted state. Does this sound reasonable? We are considering using DynamoDB for our state backend because it seems like all we will need is a key-value store. The only weakness of this is that if state gets older than, say, 2 years we would like to get rid of it which might not be easy in DynamoDB. I don't suppose Flink has any behind-the-scenes features that deal with getting rid of old state (either evicting from memory or TTL/aging out entirely)?
>>>>> 
>>>>> Thanks for your time!
>>>>> Shannon Carey
>>>> 
>>> 
> 

Re: State in external db (dynamodb)

Posted by Josh <jo...@gmail.com>.
Hi all,

>(1)  Only write to the DB upon a checkpoint, at which point it is known
that no replay of that data will occur any more. Values from partially
successful writes will be overwritten >with correct value. I assume that is
what you thought of when referring to the State Backend, because in some sense,
that is what that state backend would do.

>I think it is simpler to realize that in a custom sink, than developing a
new state backend.  Another Flink committer (Chesnay) has developed some
nice tooling for that, to >be merged into Flink soon.

I am planning to implement something like this:

Say I have a topology which looks like this: [source => operator => sink],
I would like it to work like this:
1. Upon receiving an element, the operator retrieves some state from an
external key-value store (would like to put an in-memory cache on top of
this with a TTL)
2. The operator emits a new state (and updates its in-memory cache with the
new state)
3. The sink batches up all the new states and upon checkpoint flushes them
to the external store

Could anyone point me at the work that's already been done on this? Has it
already been merged into Flink?

Thanks,
Josh

On Thu, Apr 7, 2016 at 12:53 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> regarding windows and incremental aggregation. This is already happening
> in Flink as of now. When you give a ReduceFunction on a window, which "sum"
> internally does, the result for a window is incrementally updated whenever
> a new element comes in. This incremental aggregation only happens when you
> specify a ReduceFunction or a FoldFunction, not for the general case of a
> WindowFunction, where all elements in the window are required.
>
> You are right about incremental snapshots. We mainly want to introduce
> them to reduce latency incurred by snapshotting. Right now, processing
> stalls when a checkpoint happens.
>
> Cheers,
> Aljoscha
>
> On Thu, 7 Apr 2016 at 13:12 Shannon Carey <sc...@expedia.com> wrote:
>
>> Thanks very kindly for your response, Stephan!
>>
>> We will definitely use a custom sink for persistence of idempotent
>> mutations whenever possible. Exposing state as read-only to external
>> systems is a complication we will try to avoid. Also, we will definitely
>> only write to the DB upon checkpoint, and the write will be synchronous and
>> transactional (no possibility of partial success/failure).
>>
>> However, we do want Flink state to be durable, we want it to be in memory
>> when possible, and we want to avoid running out of memory due to the size
>> of the state. For example, if you have a wide window that hasn't gotten an
>> event for a long time, we want to evict that window state from memory.
>> We're now thinking of using Redis (via AWS Elasticache) which also
>> conveniently has TTL, instead of DynamoDB.
>>
>> I just wanted to check whether eviction of (inactive/quiet) state from
>> memory is something that I should consider implementing, or whether Flink
>> already had some built-in way of doing it.
>>
>> Along the same lines, I am also wondering whether Flink already has means
>> of compacting the state of a window by applying an aggregation function to
>> the elements so-far (eg. every time window is triggered)? For example, if
>> you are only executing a sum on the contents of the window, the window
>> state doesn't need to store all the individual items in the window, it only
>> needs to store the sum. Aggregations other than "sum" might have that
>> characteristic too. I don't know if Flink is already that intelligent or
>> whether I should figure out how to aggregate window contents myself when
>> possible with something like a window fold? Another poster (Aljoscha) was
>> talking about adding incremental snapshots, but it sounds like that would
>> only improve the write throughput not the memory usage.
>>
>> Thanks again!
>> Shannon Carey
>>
>>
>> From: Stephan Ewen <se...@apache.org>
>> Date: Wednesday, April 6, 2016 at 10:37 PM
>> To: <us...@flink.apache.org>
>> Subject: Re: State in external db (dynamodb)
>>
>> Hi Shannon!
>>
>> Welcome to the Flink community!
>>
>> You are right, sinks need in general to be idempotent if you want
>> "exactly-once" semantics, because there can be a replay of elements that
>> were already written.
>>
>> However, what you describe later, overwriting of a key with a new value
>> (or the same value again) is pretty much sufficient. That means that when a
>> duplicate write happens during replay, the value for the key is simply
>> overwritten with the same value again.
>> As long as all computation is purely in Flink and you only write to the
>> key/value store (rather than read from k/v, modify in Flink, write to k/v),
>> you get the consistency that for example counts/aggregates never have
>> duplicates.
>>
>> If Flink needs to look up state from the database (because it is no
>> longer in Flink), it is a bit more tricky. I assume that is where you are
>> going with "Subsequently, when an event is processed, we must be able to
>> quickly load up any evicted state".  In that case, there are two things
>> you can do:
>>
>> (1)  Only write to the DB upon a checkpoint, at which point it is known
>> that no replay of that data will occur any more. Values from partially
>> successful writes will be overwritten with correct value. I assume that is
>> what you thought of when referring to the State Backend, because in some
>> sense, that is what that state backend would do.
>>
>> I think it is simpler to realize that in a custom sink, than developing a
>> new state backend.  Another Flink committer (Chesnay) has developed some
>> nice tooling for that, to be merged into Flink soon.
>>
>> (2) You could attach version numbers to every write, and increment the
>> versions upon each checkpoint. That allows you to always refer to a
>> consistent previous value, if some writes were made, but a failure occurred
>> before the checkpoint completed.
>>
>> I hope these answers apply to your case. Let us know if some things are
>> still unclear, or if I misunderstood your question!
>>
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever <
>> sanne.de.roever@gmail.com> wrote:
>>
>>> FYI Cassandra has a TTL on data:
>>> https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html
>>>
>>> On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey <sc...@expedia.com>
>>> wrote:
>>>
>>>> Hi, new Flink user here!
>>>>
>>>> I found a discussion on user@flink.apache.org about using DynamoDB as
>>>> a sink. However, as noted, sinks have an at-least-once guarantee so your
>>>> operations must idempotent.
>>>>
>>>> However, another way to go about this (and correct me if I'm wrong) is
>>>> to write the state to the external store via a custom State Backend. Since
>>>> the state participates in checkpointing, you don't have to worry about
>>>> idempotency: every time state is checkpointed, overwrite the value of that
>>>> key.
>>>>
>>>> We are starting a project with Flink, and we are interested in evicting
>>>> the state from memory once a TTL is reached during which no events have
>>>> come in for that state. Subsequently, when an event is processed, we must
>>>> be able to quickly load up any evicted state. Does this sound reasonable?
>>>> We are considering using DynamoDB for our state backend because it seems
>>>> like all we will need is a key-value store. The only weakness of this is
>>>> that if state gets older than, say, 2 years we would like to get rid of it
>>>> which might not be easy in DynamoDB. I don't suppose Flink has any
>>>> behind-the-scenes features that deal with getting rid of old state (either
>>>> evicting from memory or TTL/aging out entirely)?
>>>>
>>>> Thanks for your time!
>>>> Shannon Carey
>>>>
>>>
>>>
>>

Re: State in external db (dynamodb)

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
regarding windows and incremental aggregation. This is already happening in
Flink as of now. When you give a ReduceFunction on a window, which "sum"
internally does, the result for a window is incrementally updated whenever
a new element comes in. This incremental aggregation only happens when you
specify a ReduceFunction or a FoldFunction, not for the general case of a
WindowFunction, where all elements in the window are required.

You are right about incremental snapshots. We mainly want to introduce them
to reduce latency incurred by snapshotting. Right now, processing stalls
when a checkpoint happens.

Cheers,
Aljoscha

On Thu, 7 Apr 2016 at 13:12 Shannon Carey <sc...@expedia.com> wrote:

> Thanks very kindly for your response, Stephan!
>
> We will definitely use a custom sink for persistence of idempotent
> mutations whenever possible. Exposing state as read-only to external
> systems is a complication we will try to avoid. Also, we will definitely
> only write to the DB upon checkpoint, and the write will be synchronous and
> transactional (no possibility of partial success/failure).
>
> However, we do want Flink state to be durable, we want it to be in memory
> when possible, and we want to avoid running out of memory due to the size
> of the state. For example, if you have a wide window that hasn't gotten an
> event for a long time, we want to evict that window state from memory.
> We're now thinking of using Redis (via AWS Elasticache) which also
> conveniently has TTL, instead of DynamoDB.
>
> I just wanted to check whether eviction of (inactive/quiet) state from
> memory is something that I should consider implementing, or whether Flink
> already had some built-in way of doing it.
>
> Along the same lines, I am also wondering whether Flink already has means
> of compacting the state of a window by applying an aggregation function to
> the elements so-far (eg. every time window is triggered)? For example, if
> you are only executing a sum on the contents of the window, the window
> state doesn't need to store all the individual items in the window, it only
> needs to store the sum. Aggregations other than "sum" might have that
> characteristic too. I don't know if Flink is already that intelligent or
> whether I should figure out how to aggregate window contents myself when
> possible with something like a window fold? Another poster (Aljoscha) was
> talking about adding incremental snapshots, but it sounds like that would
> only improve the write throughput not the memory usage.
>
> Thanks again!
> Shannon Carey
>
>
> From: Stephan Ewen <se...@apache.org>
> Date: Wednesday, April 6, 2016 at 10:37 PM
> To: <us...@flink.apache.org>
> Subject: Re: State in external db (dynamodb)
>
> Hi Shannon!
>
> Welcome to the Flink community!
>
> You are right, sinks need in general to be idempotent if you want
> "exactly-once" semantics, because there can be a replay of elements that
> were already written.
>
> However, what you describe later, overwriting of a key with a new value
> (or the same value again) is pretty much sufficient. That means that when a
> duplicate write happens during replay, the value for the key is simply
> overwritten with the same value again.
> As long as all computation is purely in Flink and you only write to the
> key/value store (rather than read from k/v, modify in Flink, write to k/v),
> you get the consistency that for example counts/aggregates never have
> duplicates.
>
> If Flink needs to look up state from the database (because it is no longer
> in Flink), it is a bit more tricky. I assume that is where you are going
> with "Subsequently, when an event is processed, we must be able to
> quickly load up any evicted state".  In that case, there are two things
> you can do:
>
> (1)  Only write to the DB upon a checkpoint, at which point it is known
> that no replay of that data will occur any more. Values from partially
> successful writes will be overwritten with correct value. I assume that is
> what you thought of when referring to the State Backend, because in some
> sense, that is what that state backend would do.
>
> I think it is simpler to realize that in a custom sink, than developing a
> new state backend.  Another Flink committer (Chesnay) has developed some
> nice tooling for that, to be merged into Flink soon.
>
> (2) You could attach version numbers to every write, and increment the
> versions upon each checkpoint. That allows you to always refer to a
> consistent previous value, if some writes were made, but a failure occurred
> before the checkpoint completed.
>
> I hope these answers apply to your case. Let us know if some things are
> still unclear, or if I misunderstood your question!
>
>
> Greetings,
> Stephan
>
>
>
> On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever <sanne.de.roever@gmail.com
> > wrote:
>
>> FYI Cassandra has a TTL on data:
>> https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html
>>
>> On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey <sc...@expedia.com> wrote:
>>
>>> Hi, new Flink user here!
>>>
>>> I found a discussion on user@flink.apache.org about using DynamoDB as a
>>> sink. However, as noted, sinks have an at-least-once guarantee so your
>>> operations must idempotent.
>>>
>>> However, another way to go about this (and correct me if I'm wrong) is
>>> to write the state to the external store via a custom State Backend. Since
>>> the state participates in checkpointing, you don't have to worry about
>>> idempotency: every time state is checkpointed, overwrite the value of that
>>> key.
>>>
>>> We are starting a project with Flink, and we are interested in evicting
>>> the state from memory once a TTL is reached during which no events have
>>> come in for that state. Subsequently, when an event is processed, we must
>>> be able to quickly load up any evicted state. Does this sound reasonable?
>>> We are considering using DynamoDB for our state backend because it seems
>>> like all we will need is a key-value store. The only weakness of this is
>>> that if state gets older than, say, 2 years we would like to get rid of it
>>> which might not be easy in DynamoDB. I don't suppose Flink has any
>>> behind-the-scenes features that deal with getting rid of old state (either
>>> evicting from memory or TTL/aging out entirely)?
>>>
>>> Thanks for your time!
>>> Shannon Carey
>>>
>>
>>
>

Re: State in external db (dynamodb)

Posted by Shannon Carey <sc...@expedia.com>.
Thanks very kindly for your response, Stephan!

We will definitely use a custom sink for persistence of idempotent mutations whenever possible. Exposing state as read-only to external systems is a complication we will try to avoid. Also, we will definitely only write to the DB upon checkpoint, and the write will be synchronous and transactional (no possibility of partial success/failure).

However, we do want Flink state to be durable, we want it to be in memory when possible, and we want to avoid running out of memory due to the size of the state. For example, if you have a wide window that hasn't gotten an event for a long time, we want to evict that window state from memory. We're now thinking of using Redis (via AWS Elasticache) which also conveniently has TTL, instead of DynamoDB.

I just wanted to check whether eviction of (inactive/quiet) state from memory is something that I should consider implementing, or whether Flink already had some built-in way of doing it.

Along the same lines, I am also wondering whether Flink already has means of compacting the state of a window by applying an aggregation function to the elements so-far (eg. every time window is triggered)? For example, if you are only executing a sum on the contents of the window, the window state doesn't need to store all the individual items in the window, it only needs to store the sum. Aggregations other than "sum" might have that characteristic too. I don't know if Flink is already that intelligent or whether I should figure out how to aggregate window contents myself when possible with something like a window fold? Another poster (Aljoscha) was talking about adding incremental snapshots, but it sounds like that would only improve the write throughput not the memory usage.

Thanks again!
Shannon Carey


From: Stephan Ewen <se...@apache.org>>
Date: Wednesday, April 6, 2016 at 10:37 PM
To: <us...@flink.apache.org>>
Subject: Re: State in external db (dynamodb)

Hi Shannon!

Welcome to the Flink community!

You are right, sinks need in general to be idempotent if you want "exactly-once" semantics, because there can be a replay of elements that were already written.

However, what you describe later, overwriting of a key with a new value (or the same value again) is pretty much sufficient. That means that when a duplicate write happens during replay, the value for the key is simply overwritten with the same value again.
As long as all computation is purely in Flink and you only write to the key/value store (rather than read from k/v, modify in Flink, write to k/v), you get the consistency that for example counts/aggregates never have duplicates.

If Flink needs to look up state from the database (because it is no longer in Flink), it is a bit more tricky. I assume that is where you are going with "Subsequently, when an event is processed, we must be able to quickly load up any evicted state".  In that case, there are two things you can do:

(1)  Only write to the DB upon a checkpoint, at which point it is known that no replay of that data will occur any more. Values from partially successful writes will be overwritten with correct value. I assume that is what you thought of when referring to the State Backend, because in some sense, that is what that state backend would do.

I think it is simpler to realize that in a custom sink, than developing a new state backend.  Another Flink committer (Chesnay) has developed some nice tooling for that, to be merged into Flink soon.

(2) You could attach version numbers to every write, and increment the versions upon each checkpoint. That allows you to always refer to a consistent previous value, if some writes were made, but a failure occurred before the checkpoint completed.

I hope these answers apply to your case. Let us know if some things are still unclear, or if I misunderstood your question!


Greetings,
Stephan



On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever <sa...@gmail.com>> wrote:
FYI Cassandra has a TTL on data: https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html

On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey <sc...@expedia.com>> wrote:
Hi, new Flink user here!

I found a discussion on user@flink.apache.org<ma...@flink.apache.org> about using DynamoDB as a sink. However, as noted, sinks have an at-least-once guarantee so your operations must idempotent.

However, another way to go about this (and correct me if I'm wrong) is to write the state to the external store via a custom State Backend. Since the state participates in checkpointing, you don't have to worry about idempotency: every time state is checkpointed, overwrite the value of that key.

We are starting a project with Flink, and we are interested in evicting the state from memory once a TTL is reached during which no events have come in for that state. Subsequently, when an event is processed, we must be able to quickly load up any evicted state. Does this sound reasonable? We are considering using DynamoDB for our state backend because it seems like all we will need is a key-value store. The only weakness of this is that if state gets older than, say, 2 years we would like to get rid of it which might not be easy in DynamoDB. I don't suppose Flink has any behind-the-scenes features that deal with getting rid of old state (either evicting from memory or TTL/aging out entirely)?

Thanks for your time!
Shannon Carey



Re: State in external db (dynamodb)

Posted by Stephan Ewen <se...@apache.org>.
Hi Shannon!

Welcome to the Flink community!

You are right, sinks need in general to be idempotent if you want
"exactly-once" semantics, because there can be a replay of elements that
were already written.

However, what you describe later, overwriting of a key with a new value (or
the same value again) is pretty much sufficient. That means that when a
duplicate write happens during replay, the value for the key is simply
overwritten with the same value again.
As long as all computation is purely in Flink and you only write to the
key/value store (rather than read from k/v, modify in Flink, write to k/v),
you get the consistency that for example counts/aggregates never have
duplicates.

If Flink needs to look up state from the database (because it is no longer
in Flink), it is a bit more tricky. I assume that is where you are going
with "Subsequently, when an event is processed, we must be able to quickly
load up any evicted state".  In that case, there are two things you can do:

(1)  Only write to the DB upon a checkpoint, at which point it is known
that no replay of that data will occur any more. Values from partially
successful writes will be overwritten with correct value. I assume that is
what you thought of when referring to the State Backend, because in some
sense, that is what that state backend would do.

I think it is simpler to realize that in a custom sink, than developing a
new state backend.  Another Flink committer (Chesnay) has developed some
nice tooling for that, to be merged into Flink soon.

(2) You could attach version numbers to every write, and increment the
versions upon each checkpoint. That allows you to always refer to a
consistent previous value, if some writes were made, but a failure occurred
before the checkpoint completed.

I hope these answers apply to your case. Let us know if some things are
still unclear, or if I misunderstood your question!


Greetings,
Stephan



On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever <sa...@gmail.com>
wrote:

> FYI Cassandra has a TTL on data:
> https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html
>
> On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey <sc...@expedia.com> wrote:
>
>> Hi, new Flink user here!
>>
>> I found a discussion on user@flink.apache.org about using DynamoDB as a
>> sink. However, as noted, sinks have an at-least-once guarantee so your
>> operations must idempotent.
>>
>> However, another way to go about this (and correct me if I'm wrong) is to
>> write the state to the external store via a custom State Backend. Since the
>> state participates in checkpointing, you don't have to worry about
>> idempotency: every time state is checkpointed, overwrite the value of that
>> key.
>>
>> We are starting a project with Flink, and we are interested in evicting
>> the state from memory once a TTL is reached during which no events have
>> come in for that state. Subsequently, when an event is processed, we must
>> be able to quickly load up any evicted state. Does this sound reasonable?
>> We are considering using DynamoDB for our state backend because it seems
>> like all we will need is a key-value store. The only weakness of this is
>> that if state gets older than, say, 2 years we would like to get rid of it
>> which might not be easy in DynamoDB. I don't suppose Flink has any
>> behind-the-scenes features that deal with getting rid of old state (either
>> evicting from memory or TTL/aging out entirely)?
>>
>> Thanks for your time!
>> Shannon Carey
>>
>
>

Re: State in external db (dynamodb)

Posted by Sanne de Roever <sa...@gmail.com>.
FYI Cassandra has a TTL on data:
https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html

On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey <sc...@expedia.com> wrote:

> Hi, new Flink user here!
>
> I found a discussion on user@flink.apache.org about using DynamoDB as a
> sink. However, as noted, sinks have an at-least-once guarantee so your
> operations must idempotent.
>
> However, another way to go about this (and correct me if I'm wrong) is to
> write the state to the external store via a custom State Backend. Since the
> state participates in checkpointing, you don't have to worry about
> idempotency: every time state is checkpointed, overwrite the value of that
> key.
>
> We are starting a project with Flink, and we are interested in evicting
> the state from memory once a TTL is reached during which no events have
> come in for that state. Subsequently, when an event is processed, we must
> be able to quickly load up any evicted state. Does this sound reasonable?
> We are considering using DynamoDB for our state backend because it seems
> like all we will need is a key-value store. The only weakness of this is
> that if state gets older than, say, 2 years we would like to get rid of it
> which might not be easy in DynamoDB. I don't suppose Flink has any
> behind-the-scenes features that deal with getting rid of old state (either
> evicting from memory or TTL/aging out entirely)?
>
> Thanks for your time!
> Shannon Carey
>