You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Aaron Levin <aa...@stripe.com> on 2019/11/26 19:54:45 UTC

What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?

Hi,

Some context: after a refactoring, we were unable to start our jobs.
They started fine and checkpointed fine, but once the job restarted
owing to a transient failure, the application was unable to start. The
Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
`_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
the `_metadata` file we saw `- 1402496 offsets:
com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
to be the operator state we were no longer initializing or
snapshotting after the refactoring.

Before I dig further into this and try to find a smaller reproducible
test case I thought I would ask if someone knows what the expected
behaviour is for the following scenario:

suppose you have an operator (in this case a Source) which has some
operator ListState. Suppose you run your flink job for some time and
then later refactor your job such that you no longer use that state
(so after the refactoring you're no longer initializing this operator
state in initializeState, nor are you snapshotting the operator state
in snapshotState). If you launch your new code from a recent
savepoint, what do we expect to happen to the state? Do we anticipate
the behaviour I explained above?

My assumption would be that Flink would not read this state and so it
would be removed from the next checkpoint or savepoint. Alternatively,
I might assume it would not be read but would linger around every
future checkpoint or savepoint. However, it feels like what is
happening is it's not read and then possibly replicated by every
instance of the task every time a checkpoint happens (hence the
accidentally exponential behaviour).

Thoughts?

PS - in case someone asks: I was sure that we were calling `.clear()`
appropriately in `snapshotState` (we, uh, already learned that lesson
:D)

Best,

Aaron Levin

Re: What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?

Posted by Aaron Levin <aa...@stripe.com>.
Thanks for the clarification. I'll try to find some time to write a
reproducible test case and submit a ticket. While it may not be able
to delete the non-referenced ones, I'm surprised it's exponentially
replicating them, and so it's probably worth documenting in a ticket.

On Wed, Nov 27, 2019 at 12:15 PM Gyula Fóra <gy...@gmail.com> wrote:
>
> You are right Aaron.
>
> I would say this is like this by design as Flink doesn't require you to initialize state in the open method so it has no safe way to delete the non-referenced ones.
>
> What you can do is restore the state and clear it on all operators and not reference it again. I know this feels like a workaround but I have no better idea at the moment.
>
> Cheers,
> Gyula
>
> On Wed, Nov 27, 2019 at 6:08 PM Aaron Levin <aa...@stripe.com> wrote:
>>
>> Hi,
>>
>> Yes, we're using UNION state. I would assume, though, that if you are
>> not reading the UNION state it would either stop stick around as a
>> constant factor in your state size, or get cleared.
>>
>> Looks like I should try to recreate a small example and submit a bug
>> if this is true. Otherwise it's impossible to remove union state from
>> your operators.
>>
>> On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu <qc...@gmail.com> wrote:
>> >
>> > Hi
>> >
>> > Do you use UNION state in your scenario, when using UNION state, then JM may encounter OOM because each TDD will contains all the state of all subtasks[1]
>> >
>> > [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state
>> > Best,
>> > Congxian
>> >
>> >
>> > Aaron Levin <aa...@stripe.com> 于2019年11月27日周三 上午3:55写道:
>> >>
>> >> Hi,
>> >>
>> >> Some context: after a refactoring, we were unable to start our jobs.
>> >> They started fine and checkpointed fine, but once the job restarted
>> >> owing to a transient failure, the application was unable to start. The
>> >> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
>> >> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
>> >> the `_metadata` file we saw `- 1402496 offsets:
>> >> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
>> >> to be the operator state we were no longer initializing or
>> >> snapshotting after the refactoring.
>> >>
>> >> Before I dig further into this and try to find a smaller reproducible
>> >> test case I thought I would ask if someone knows what the expected
>> >> behaviour is for the following scenario:
>> >>
>> >> suppose you have an operator (in this case a Source) which has some
>> >> operator ListState. Suppose you run your flink job for some time and
>> >> then later refactor your job such that you no longer use that state
>> >> (so after the refactoring you're no longer initializing this operator
>> >> state in initializeState, nor are you snapshotting the operator state
>> >> in snapshotState). If you launch your new code from a recent
>> >> savepoint, what do we expect to happen to the state? Do we anticipate
>> >> the behaviour I explained above?
>> >>
>> >> My assumption would be that Flink would not read this state and so it
>> >> would be removed from the next checkpoint or savepoint. Alternatively,
>> >> I might assume it would not be read but would linger around every
>> >> future checkpoint or savepoint. However, it feels like what is
>> >> happening is it's not read and then possibly replicated by every
>> >> instance of the task every time a checkpoint happens (hence the
>> >> accidentally exponential behaviour).
>> >>
>> >> Thoughts?
>> >>
>> >> PS - in case someone asks: I was sure that we were calling `.clear()`
>> >> appropriately in `snapshotState` (we, uh, already learned that lesson
>> >> :D)
>> >>
>> >> Best,
>> >>
>> >> Aaron Levin

Re: What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?

Posted by Gyula Fóra <gy...@gmail.com>.
You are right Aaron.

I would say this is like this by design as Flink doesn't require you to
initialize state in the open method so it has no safe way to delete the
non-referenced ones.

What you can do is restore the state and clear it on all operators and not
reference it again. I know this feels like a workaround but I have no
better idea at the moment.

Cheers,
Gyula

On Wed, Nov 27, 2019 at 6:08 PM Aaron Levin <aa...@stripe.com> wrote:

> Hi,
>
> Yes, we're using UNION state. I would assume, though, that if you are
> not reading the UNION state it would either stop stick around as a
> constant factor in your state size, or get cleared.
>
> Looks like I should try to recreate a small example and submit a bug
> if this is true. Otherwise it's impossible to remove union state from
> your operators.
>
> On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu <qc...@gmail.com>
> wrote:
> >
> > Hi
> >
> > Do you use UNION state in your scenario, when using UNION state, then JM
> may encounter OOM because each TDD will contains all the state of all
> subtasks[1]
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state
> > Best,
> > Congxian
> >
> >
> > Aaron Levin <aa...@stripe.com> 于2019年11月27日周三 上午3:55写道:
> >>
> >> Hi,
> >>
> >> Some context: after a refactoring, we were unable to start our jobs.
> >> They started fine and checkpointed fine, but once the job restarted
> >> owing to a transient failure, the application was unable to start. The
> >> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
> >> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
> >> the `_metadata` file we saw `- 1402496 offsets:
> >> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
> >> to be the operator state we were no longer initializing or
> >> snapshotting after the refactoring.
> >>
> >> Before I dig further into this and try to find a smaller reproducible
> >> test case I thought I would ask if someone knows what the expected
> >> behaviour is for the following scenario:
> >>
> >> suppose you have an operator (in this case a Source) which has some
> >> operator ListState. Suppose you run your flink job for some time and
> >> then later refactor your job such that you no longer use that state
> >> (so after the refactoring you're no longer initializing this operator
> >> state in initializeState, nor are you snapshotting the operator state
> >> in snapshotState). If you launch your new code from a recent
> >> savepoint, what do we expect to happen to the state? Do we anticipate
> >> the behaviour I explained above?
> >>
> >> My assumption would be that Flink would not read this state and so it
> >> would be removed from the next checkpoint or savepoint. Alternatively,
> >> I might assume it would not be read but would linger around every
> >> future checkpoint or savepoint. However, it feels like what is
> >> happening is it's not read and then possibly replicated by every
> >> instance of the task every time a checkpoint happens (hence the
> >> accidentally exponential behaviour).
> >>
> >> Thoughts?
> >>
> >> PS - in case someone asks: I was sure that we were calling `.clear()`
> >> appropriately in `snapshotState` (we, uh, already learned that lesson
> >> :D)
> >>
> >> Best,
> >>
> >> Aaron Levin
>

Re: What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?

Posted by Aaron Levin <aa...@stripe.com>.
Hi,

Yes, we're using UNION state. I would assume, though, that if you are
not reading the UNION state it would either stop stick around as a
constant factor in your state size, or get cleared.

Looks like I should try to recreate a small example and submit a bug
if this is true. Otherwise it's impossible to remove union state from
your operators.

On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu <qc...@gmail.com> wrote:
>
> Hi
>
> Do you use UNION state in your scenario, when using UNION state, then JM may encounter OOM because each TDD will contains all the state of all subtasks[1]
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state
> Best,
> Congxian
>
>
> Aaron Levin <aa...@stripe.com> 于2019年11月27日周三 上午3:55写道:
>>
>> Hi,
>>
>> Some context: after a refactoring, we were unable to start our jobs.
>> They started fine and checkpointed fine, but once the job restarted
>> owing to a transient failure, the application was unable to start. The
>> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
>> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
>> the `_metadata` file we saw `- 1402496 offsets:
>> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
>> to be the operator state we were no longer initializing or
>> snapshotting after the refactoring.
>>
>> Before I dig further into this and try to find a smaller reproducible
>> test case I thought I would ask if someone knows what the expected
>> behaviour is for the following scenario:
>>
>> suppose you have an operator (in this case a Source) which has some
>> operator ListState. Suppose you run your flink job for some time and
>> then later refactor your job such that you no longer use that state
>> (so after the refactoring you're no longer initializing this operator
>> state in initializeState, nor are you snapshotting the operator state
>> in snapshotState). If you launch your new code from a recent
>> savepoint, what do we expect to happen to the state? Do we anticipate
>> the behaviour I explained above?
>>
>> My assumption would be that Flink would not read this state and so it
>> would be removed from the next checkpoint or savepoint. Alternatively,
>> I might assume it would not be read but would linger around every
>> future checkpoint or savepoint. However, it feels like what is
>> happening is it's not read and then possibly replicated by every
>> instance of the task every time a checkpoint happens (hence the
>> accidentally exponential behaviour).
>>
>> Thoughts?
>>
>> PS - in case someone asks: I was sure that we were calling `.clear()`
>> appropriately in `snapshotState` (we, uh, already learned that lesson
>> :D)
>>
>> Best,
>>
>> Aaron Levin

Re: What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?

Posted by Congxian Qiu <qc...@gmail.com>.
Hi

Do you use UNION state in your scenario, when using UNION state, then JM
may encounter OOM because each TDD will contains all the state of all
subtasks[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state
Best,
Congxian


Aaron Levin <aa...@stripe.com> 于2019年11月27日周三 上午3:55写道:

> Hi,
>
> Some context: after a refactoring, we were unable to start our jobs.
> They started fine and checkpointed fine, but once the job restarted
> owing to a transient failure, the application was unable to start. The
> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
> the `_metadata` file we saw `- 1402496 offsets:
> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
> to be the operator state we were no longer initializing or
> snapshotting after the refactoring.
>
> Before I dig further into this and try to find a smaller reproducible
> test case I thought I would ask if someone knows what the expected
> behaviour is for the following scenario:
>
> suppose you have an operator (in this case a Source) which has some
> operator ListState. Suppose you run your flink job for some time and
> then later refactor your job such that you no longer use that state
> (so after the refactoring you're no longer initializing this operator
> state in initializeState, nor are you snapshotting the operator state
> in snapshotState). If you launch your new code from a recent
> savepoint, what do we expect to happen to the state? Do we anticipate
> the behaviour I explained above?
>
> My assumption would be that Flink would not read this state and so it
> would be removed from the next checkpoint or savepoint. Alternatively,
> I might assume it would not be read but would linger around every
> future checkpoint or savepoint. However, it feels like what is
> happening is it's not read and then possibly replicated by every
> instance of the task every time a checkpoint happens (hence the
> accidentally exponential behaviour).
>
> Thoughts?
>
> PS - in case someone asks: I was sure that we were calling `.clear()`
> appropriately in `snapshotState` (we, uh, already learned that lesson
> :D)
>
> Best,
>
> Aaron Levin
>