You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jan Lukavský <je...@seznam.cz> on 2019/08/12 14:00:12 UTC

[FLINK-12653] and system state

Hi,

I have come across issue that is very much likely caused by [1]. The 
issue is that Beam's state is (generally) created lazily, after element 
is received (as Max described in the Flink's JIRA). Max also created 
workaround [2], but that seems to work for user state only (i.e. state 
that has been created in user code by declaring @StateId - please 
correct me if I'm wrong). In my work, however, I created a system state 
(that holds elements before being output, due to 
@RequiresTimeSortedInput annotation, but that's probably not important), 
and this state is not covered by the workaround. This might be the case 
for all system states, generally, because they are not visible, until 
element arrives and the state is actually created.

Is my analysis correct? If so, would anyone have any suggestions how to 
fix that?

Jan

[1] https://jira.apache.org/jira/browse/FLINK-12653

[2] https://issues.apache.org/jira/browse/BEAM-7144


Re: [FLINK-12653] and system state

Posted by Maximilian Michels <mx...@apache.org>.
Sounds good. Might be worth commenting on the JIRA to get this prioritized in case it has not been fixed.

-Max

On 13.08.19 12:18, Jan Lukavský wrote:
> Hi Max,
>
> comments inline.
>
> On 8/13/19 12:01 PM, Maximilian Michels wrote:
> > Hi Jan,
> >
> > Just checking, do you see the same rescaling problem as described in
> > https://jira.apache.org/jira/browse/FLINK-12653 ?
> Yes.
> >
> > If so, you are most likely correct that this is due to the system state
> > that you added in your code. When I did the fix, I ran some tests to
> > check if any system state is not bound. I did not find instances but you
> > are right that we could see this issue for internal state, e.g. in
> > ReduceFnContextFactory.
> I think that there were no instances of internal state used in Flink
> Runner prior to my patch introduced internal state for sorting inputs.
> But that seems a little fragile, because it might easily change.
> >
> > Given that this is a Flink specific bug I'm not sure it warrants adding
> > a `getSystemStateTags()` method to the DoFnRunner. Also, this is error
> > prone since we have to remember to add all state there. The better
> > solution would be to eagerly register state during StateSpec creation,
> > but this would require significant code refactoring.
> I'm also not happy with adding additional generic method just because of
> one runner, but registering that during creation of StateSpec would be
> hard, as you said.
> >
> > Wouldn't it suffice to just perform an early binding in your code?
> > Additionally, we want to make sure to also revise any existing Beam code
> > paths.
> I think I might do it (although it would mean that it would be
> registered early for all runners, not just Flink).
> >
> > The issue hopefully will be fixed with Flink 1.9. Would be interesting
> > to try with the Flink 1.9 RC2:
> > https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc2/
>
> I was not sure from comments in the Flink JIRA, that this will be fixes
> soon. If so, I'm fine with registering just the single state I
> introduced. If this would be an issue for long term I think it would
> require some other solution.
>
> So - I will register the state(s) I have created and test that on Flink
> 1.9 when I have a little spare time. Will decide what to do next, ok?
>
> Jan
>
> >
> > Cheers,
> > Max
> >
> > On 12.08.19 19:58, Jan Lukavský wrote:
> >> I've managed to fix that by introducing (optional) method to DoFnRunner
> >> called getSystemStateTags() (default implementation returns
> >> Collection.emptyList()), and the use that list to early bind states in
> >> Flink's DoFnOperator ([1])
> >>
> >> @Max, WDYT?
> >>
> >> Jan
> >>
> >> [1]
> >> https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802
> >>
> >> On 8/12/19 4:00 PM, Jan Lukavský wrote:
> >>> Hi,
> >>>
> >>> I have come across issue that is very much likely caused by [1]. The
> >>> issue is that Beam's state is (generally) created lazily, after
> >>> element is received (as Max described in the Flink's JIRA). Max also
> >>> created workaround [2], but that seems to work for user state only
> >>> (i.e. state that has been created in user code by declaring @StateId -
> >>> please correct me if I'm wrong). In my work, however, I created a
> >>> system state (that holds elements before being output, due to
> >>> @RequiresTimeSortedInput annotation, but that's probably not
> >>> important), and this state is not covered by the workaround. This
> >>> might be the case for all system states, generally, because they are
> >>> not visible, until element arrives and the state is actually created.
> >>>
> >>> Is my analysis correct? If so, would anyone have any suggestions how
> >>> to fix that?
> >>>
> >>> Jan
> >>>
> >>> [1] https://jira.apache.org/jira/browse/FLINK-12653
> >>>
> >>> [2] https://issues.apache.org/jira/browse/BEAM-7144
> >>>


Re: [FLINK-12653] and system state

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Max,

comments inline.

On 8/13/19 12:01 PM, Maximilian Michels wrote:
> Hi Jan,
>
> Just checking, do you see the same rescaling problem as described in
> https://jira.apache.org/jira/browse/FLINK-12653 ?
Yes.
>
> If so, you are most likely correct that this is due to the system state
> that you added in your code. When I did the fix, I ran some tests to
> check if any system state is not bound. I did not find instances but you
> are right that we could see this issue for internal state, e.g. in
> ReduceFnContextFactory.
I think that there were no instances of internal state used in Flink 
Runner prior to my patch introduced internal state for sorting inputs. 
But that seems a little fragile, because it might easily change.
>
> Given that this is a Flink specific bug I'm not sure it warrants adding
> a `getSystemStateTags()` method to the DoFnRunner. Also, this is error
> prone since we have to remember to add all state there. The better
> solution would be to eagerly register state during StateSpec creation,
> but this would require significant code refactoring.
I'm also not happy with adding additional generic method just because of 
one runner, but registering that during creation of StateSpec would be 
hard, as you said.
>
> Wouldn't it suffice to just perform an early binding in your code?
> Additionally, we want to make sure to also revise any existing Beam code
> paths.
I think I might do it (although it would mean that it would be 
registered early for all runners, not just Flink).
>
> The issue hopefully will be fixed with Flink 1.9. Would be interesting
> to try with the Flink 1.9 RC2:
> https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc2/

I was not sure from comments in the Flink JIRA, that this will be fixes 
soon. If so, I'm fine with registering just the single state I 
introduced. If this would be an issue for long term I think it would 
require some other solution.

So - I will register the state(s) I have created and test that on Flink 
1.9 when I have a little spare time. Will decide what to do next, ok?

Jan

>
> Cheers,
> Max
>
> On 12.08.19 19:58, Jan Lukavský wrote:
>> I've managed to fix that by introducing (optional) method to DoFnRunner
>> called getSystemStateTags() (default implementation returns
>> Collection.emptyList()), and the use that list to early bind states in
>> Flink's DoFnOperator ([1])
>>
>> @Max, WDYT?
>>
>> Jan
>>
>> [1]
>> https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802
>>
>> On 8/12/19 4:00 PM, Jan Lukavský wrote:
>>> Hi,
>>>
>>> I have come across issue that is very much likely caused by [1]. The
>>> issue is that Beam's state is (generally) created lazily, after
>>> element is received (as Max described in the Flink's JIRA). Max also
>>> created workaround [2], but that seems to work for user state only
>>> (i.e. state that has been created in user code by declaring @StateId -
>>> please correct me if I'm wrong). In my work, however, I created a
>>> system state (that holds elements before being output, due to
>>> @RequiresTimeSortedInput annotation, but that's probably not
>>> important), and this state is not covered by the workaround. This
>>> might be the case for all system states, generally, because they are
>>> not visible, until element arrives and the state is actually created.
>>>
>>> Is my analysis correct? If so, would anyone have any suggestions how
>>> to fix that?
>>>
>>> Jan
>>>
>>> [1] https://jira.apache.org/jira/browse/FLINK-12653
>>>
>>> [2] https://issues.apache.org/jira/browse/BEAM-7144
>>>

Re: [FLINK-12653] and system state

Posted by Maximilian Michels <mx...@apache.org>.
Hi Jan,

Just checking, do you see the same rescaling problem as described in
https://jira.apache.org/jira/browse/FLINK-12653 ?

If so, you are most likely correct that this is due to the system state
that you added in your code. When I did the fix, I ran some tests to
check if any system state is not bound. I did not find instances but you
are right that we could see this issue for internal state, e.g. in
ReduceFnContextFactory.

Given that this is a Flink specific bug I'm not sure it warrants adding
a `getSystemStateTags()` method to the DoFnRunner. Also, this is error
prone since we have to remember to add all state there. The better
solution would be to eagerly register state during StateSpec creation,
but this would require significant code refactoring.

Wouldn't it suffice to just perform an early binding in your code?
Additionally, we want to make sure to also revise any existing Beam code
paths.

The issue hopefully will be fixed with Flink 1.9. Would be interesting
to try with the Flink 1.9 RC2:
https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc2/

Cheers,
Max

On 12.08.19 19:58, Jan Lukavský wrote:
> I've managed to fix that by introducing (optional) method to DoFnRunner 
> called getSystemStateTags() (default implementation returns 
> Collection.emptyList()), and the use that list to early bind states in 
> Flink's DoFnOperator ([1])
> 
> @Max, WDYT?
> 
> Jan
> 
> [1] 
> https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802
> 
> On 8/12/19 4:00 PM, Jan Lukavský wrote:
>> Hi,
>>
>> I have come across issue that is very much likely caused by [1]. The 
>> issue is that Beam's state is (generally) created lazily, after 
>> element is received (as Max described in the Flink's JIRA). Max also 
>> created workaround [2], but that seems to work for user state only 
>> (i.e. state that has been created in user code by declaring @StateId - 
>> please correct me if I'm wrong). In my work, however, I created a 
>> system state (that holds elements before being output, due to 
>> @RequiresTimeSortedInput annotation, but that's probably not 
>> important), and this state is not covered by the workaround. This 
>> might be the case for all system states, generally, because they are 
>> not visible, until element arrives and the state is actually created.
>>
>> Is my analysis correct? If so, would anyone have any suggestions how 
>> to fix that?
>>
>> Jan
>>
>> [1] https://jira.apache.org/jira/browse/FLINK-12653
>>
>> [2] https://issues.apache.org/jira/browse/BEAM-7144
>>

Re: [FLINK-12653] and system state

Posted by Jan Lukavský <je...@seznam.cz>.
I've managed to fix that by introducing (optional) method to DoFnRunner 
called getSystemStateTags() (default implementation returns 
Collection.emptyList()), and the use that list to early bind states in 
Flink's DoFnOperator ([1])

@Max, WDYT?

Jan

[1] 
https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802

On 8/12/19 4:00 PM, Jan Lukavský wrote:
> Hi,
>
> I have come across issue that is very much likely caused by [1]. The 
> issue is that Beam's state is (generally) created lazily, after 
> element is received (as Max described in the Flink's JIRA). Max also 
> created workaround [2], but that seems to work for user state only 
> (i.e. state that has been created in user code by declaring @StateId - 
> please correct me if I'm wrong). In my work, however, I created a 
> system state (that holds elements before being output, due to 
> @RequiresTimeSortedInput annotation, but that's probably not 
> important), and this state is not covered by the workaround. This 
> might be the case for all system states, generally, because they are 
> not visible, until element arrives and the state is actually created.
>
> Is my analysis correct? If so, would anyone have any suggestions how 
> to fix that?
>
> Jan
>
> [1] https://jira.apache.org/jira/browse/FLINK-12653
>
> [2] https://issues.apache.org/jira/browse/BEAM-7144
>