You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jan Lukavský <je...@seznam.cz> on 2019/08/12 14:00:12 UTC
[FLINK-12653] and system state
Hi,
I have come across issue that is very much likely caused by [1]. The
issue is that Beam's state is (generally) created lazily, after element
is received (as Max described in the Flink's JIRA). Max also created
workaround [2], but that seems to work for user state only (i.e. state
that has been created in user code by declaring @StateId - please
correct me if I'm wrong). In my work, however, I created a system state
(that holds elements before being output, due to
@RequiresTimeSortedInput annotation, but that's probably not important),
and this state is not covered by the workaround. This might be the case
for all system states, generally, because they are not visible, until
element arrives and the state is actually created.
Is my analysis correct? If so, would anyone have any suggestions how to
fix that?
Jan
[1] https://jira.apache.org/jira/browse/FLINK-12653
[2] https://issues.apache.org/jira/browse/BEAM-7144
Re: [FLINK-12653] and system state
Posted by Maximilian Michels <mx...@apache.org>.
Sounds good. Might be worth commenting on the JIRA to get this prioritized in case it has not been fixed.
-Max
On 13.08.19 12:18, Jan Lukavský wrote:
> Hi Max,
>
> comments inline.
>
> On 8/13/19 12:01 PM, Maximilian Michels wrote:
> > Hi Jan,
> >
> > Just checking, do you see the same rescaling problem as described in
> > https://jira.apache.org/jira/browse/FLINK-12653 ?
> Yes.
> >
> > If so, you are most likely correct that this is due to the system state
> > that you added in your code. When I did the fix, I ran some tests to
> > check if any system state is not bound. I did not find instances but you
> > are right that we could see this issue for internal state, e.g. in
> > ReduceFnContextFactory.
> I think that there were no instances of internal state used in Flink
> Runner prior to my patch introduced internal state for sorting inputs.
> But that seems a little fragile, because it might easily change.
> >
> > Given that this is a Flink specific bug I'm not sure it warrants adding
> > a `getSystemStateTags()` method to the DoFnRunner. Also, this is error
> > prone since we have to remember to add all state there. The better
> > solution would be to eagerly register state during StateSpec creation,
> > but this would require significant code refactoring.
> I'm also not happy with adding additional generic method just because of
> one runner, but registering that during creation of StateSpec would be
> hard, as you said.
> >
> > Wouldn't it suffice to just perform an early binding in your code?
> > Additionally, we want to make sure to also revise any existing Beam code
> > paths.
> I think I might do it (although it would mean that it would be
> registered early for all runners, not just Flink).
> >
> > The issue hopefully will be fixed with Flink 1.9. Would be interesting
> > to try with the Flink 1.9 RC2:
> > https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc2/
>
> I was not sure from comments in the Flink JIRA, that this will be fixes
> soon. If so, I'm fine with registering just the single state I
> introduced. If this would be an issue for long term I think it would
> require some other solution.
>
> So - I will register the state(s) I have created and test that on Flink
> 1.9 when I have a little spare time. Will decide what to do next, ok?
>
> Jan
>
> >
> > Cheers,
> > Max
> >
> > On 12.08.19 19:58, Jan Lukavský wrote:
> >> I've managed to fix that by introducing (optional) method to DoFnRunner
> >> called getSystemStateTags() (default implementation returns
> >> Collection.emptyList()), and the use that list to early bind states in
> >> Flink's DoFnOperator ([1])
> >>
> >> @Max, WDYT?
> >>
> >> Jan
> >>
> >> [1]
> >> https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802
> >>
> >> On 8/12/19 4:00 PM, Jan Lukavský wrote:
> >>> Hi,
> >>>
> >>> I have come across issue that is very much likely caused by [1]. The
> >>> issue is that Beam's state is (generally) created lazily, after
> >>> element is received (as Max described in the Flink's JIRA). Max also
> >>> created workaround [2], but that seems to work for user state only
> >>> (i.e. state that has been created in user code by declaring @StateId -
> >>> please correct me if I'm wrong). In my work, however, I created a
> >>> system state (that holds elements before being output, due to
> >>> @RequiresTimeSortedInput annotation, but that's probably not
> >>> important), and this state is not covered by the workaround. This
> >>> might be the case for all system states, generally, because they are
> >>> not visible, until element arrives and the state is actually created.
> >>>
> >>> Is my analysis correct? If so, would anyone have any suggestions how
> >>> to fix that?
> >>>
> >>> Jan
> >>>
> >>> [1] https://jira.apache.org/jira/browse/FLINK-12653
> >>>
> >>> [2] https://issues.apache.org/jira/browse/BEAM-7144
> >>>
Re: [FLINK-12653] and system state
Posted by Jan Lukavský <je...@seznam.cz>.
Hi Max,
comments inline.
On 8/13/19 12:01 PM, Maximilian Michels wrote:
> Hi Jan,
>
> Just checking, do you see the same rescaling problem as described in
> https://jira.apache.org/jira/browse/FLINK-12653 ?
Yes.
>
> If so, you are most likely correct that this is due to the system state
> that you added in your code. When I did the fix, I ran some tests to
> check if any system state is not bound. I did not find instances but you
> are right that we could see this issue for internal state, e.g. in
> ReduceFnContextFactory.
I think that there were no instances of internal state used in Flink
Runner prior to my patch introduced internal state for sorting inputs.
But that seems a little fragile, because it might easily change.
>
> Given that this is a Flink specific bug I'm not sure it warrants adding
> a `getSystemStateTags()` method to the DoFnRunner. Also, this is error
> prone since we have to remember to add all state there. The better
> solution would be to eagerly register state during StateSpec creation,
> but this would require significant code refactoring.
I'm also not happy with adding additional generic method just because of
one runner, but registering that during creation of StateSpec would be
hard, as you said.
>
> Wouldn't it suffice to just perform an early binding in your code?
> Additionally, we want to make sure to also revise any existing Beam code
> paths.
I think I might do it (although it would mean that it would be
registered early for all runners, not just Flink).
>
> The issue hopefully will be fixed with Flink 1.9. Would be interesting
> to try with the Flink 1.9 RC2:
> https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc2/
I was not sure from comments in the Flink JIRA, that this will be fixes
soon. If so, I'm fine with registering just the single state I
introduced. If this would be an issue for long term I think it would
require some other solution.
So - I will register the state(s) I have created and test that on Flink
1.9 when I have a little spare time. Will decide what to do next, ok?
Jan
>
> Cheers,
> Max
>
> On 12.08.19 19:58, Jan Lukavský wrote:
>> I've managed to fix that by introducing (optional) method to DoFnRunner
>> called getSystemStateTags() (default implementation returns
>> Collection.emptyList()), and the use that list to early bind states in
>> Flink's DoFnOperator ([1])
>>
>> @Max, WDYT?
>>
>> Jan
>>
>> [1]
>> https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802
>>
>> On 8/12/19 4:00 PM, Jan Lukavský wrote:
>>> Hi,
>>>
>>> I have come across issue that is very much likely caused by [1]. The
>>> issue is that Beam's state is (generally) created lazily, after
>>> element is received (as Max described in the Flink's JIRA). Max also
>>> created workaround [2], but that seems to work for user state only
>>> (i.e. state that has been created in user code by declaring @StateId -
>>> please correct me if I'm wrong). In my work, however, I created a
>>> system state (that holds elements before being output, due to
>>> @RequiresTimeSortedInput annotation, but that's probably not
>>> important), and this state is not covered by the workaround. This
>>> might be the case for all system states, generally, because they are
>>> not visible, until element arrives and the state is actually created.
>>>
>>> Is my analysis correct? If so, would anyone have any suggestions how
>>> to fix that?
>>>
>>> Jan
>>>
>>> [1] https://jira.apache.org/jira/browse/FLINK-12653
>>>
>>> [2] https://issues.apache.org/jira/browse/BEAM-7144
>>>
Re: [FLINK-12653] and system state
Posted by Maximilian Michels <mx...@apache.org>.
Hi Jan,
Just checking, do you see the same rescaling problem as described in
https://jira.apache.org/jira/browse/FLINK-12653 ?
If so, you are most likely correct that this is due to the system state
that you added in your code. When I did the fix, I ran some tests to
check if any system state is not bound. I did not find instances but you
are right that we could see this issue for internal state, e.g. in
ReduceFnContextFactory.
Given that this is a Flink specific bug I'm not sure it warrants adding
a `getSystemStateTags()` method to the DoFnRunner. Also, this is error
prone since we have to remember to add all state there. The better
solution would be to eagerly register state during StateSpec creation,
but this would require significant code refactoring.
Wouldn't it suffice to just perform an early binding in your code?
Additionally, we want to make sure to also revise any existing Beam code
paths.
The issue hopefully will be fixed with Flink 1.9. Would be interesting
to try with the Flink 1.9 RC2:
https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc2/
Cheers,
Max
On 12.08.19 19:58, Jan Lukavský wrote:
> I've managed to fix that by introducing (optional) method to DoFnRunner
> called getSystemStateTags() (default implementation returns
> Collection.emptyList()), and the use that list to early bind states in
> Flink's DoFnOperator ([1])
>
> @Max, WDYT?
>
> Jan
>
> [1]
> https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802
>
> On 8/12/19 4:00 PM, Jan Lukavský wrote:
>> Hi,
>>
>> I have come across issue that is very much likely caused by [1]. The
>> issue is that Beam's state is (generally) created lazily, after
>> element is received (as Max described in the Flink's JIRA). Max also
>> created workaround [2], but that seems to work for user state only
>> (i.e. state that has been created in user code by declaring @StateId -
>> please correct me if I'm wrong). In my work, however, I created a
>> system state (that holds elements before being output, due to
>> @RequiresTimeSortedInput annotation, but that's probably not
>> important), and this state is not covered by the workaround. This
>> might be the case for all system states, generally, because they are
>> not visible, until element arrives and the state is actually created.
>>
>> Is my analysis correct? If so, would anyone have any suggestions how
>> to fix that?
>>
>> Jan
>>
>> [1] https://jira.apache.org/jira/browse/FLINK-12653
>>
>> [2] https://issues.apache.org/jira/browse/BEAM-7144
>>
Re: [FLINK-12653] and system state
Posted by Jan Lukavský <je...@seznam.cz>.
I've managed to fix that by introducing (optional) method to DoFnRunner
called getSystemStateTags() (default implementation returns
Collection.emptyList()), and the use that list to early bind states in
Flink's DoFnOperator ([1])
@Max, WDYT?
Jan
[1]
https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802
On 8/12/19 4:00 PM, Jan Lukavský wrote:
> Hi,
>
> I have come across issue that is very much likely caused by [1]. The
> issue is that Beam's state is (generally) created lazily, after
> element is received (as Max described in the Flink's JIRA). Max also
> created workaround [2], but that seems to work for user state only
> (i.e. state that has been created in user code by declaring @StateId -
> please correct me if I'm wrong). In my work, however, I created a
> system state (that holds elements before being output, due to
> @RequiresTimeSortedInput annotation, but that's probably not
> important), and this state is not covered by the workaround. This
> might be the case for all system states, generally, because they are
> not visible, until element arrives and the state is actually created.
>
> Is my analysis correct? If so, would anyone have any suggestions how
> to fix that?
>
> Jan
>
> [1] https://jira.apache.org/jira/browse/FLINK-12653
>
> [2] https://issues.apache.org/jira/browse/BEAM-7144
>