You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jozef Vilcek <jo...@gmail.com> on 2018/08/20 14:03:18 UTC

Beam application upgrade on Flink crashes

Hello,

I am attempting to upgrade  Beam app from 2.5.0 running on Flink 1.4.0 to
Beam 2.6.0 running on Flink 1.5.0. I am not aware of any state migration
changes needed for Flink 1.4.0 -> 1.5.0 so I am just starting a new App
with updated libs from Flink save-point captured by previous version of the
app.

There is not change in topology. Job is accepted without error to the new
cluster which suggests that all operators are matched with state based on
IDs. However, app runs only few seccons and then crash with:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore
operator state backend for
DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from any of the
1 provided restore options.
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
	... 5 more
Caused by: java.io.IOException: Unable to restore operator state
[bundle-buffer-tag]. The previous serializer of the operator state
must be present; the serializer could have been removed from the
classpath, or its implementation have changed and could not be loaded.
This is a temporary restriction that will be fixed in future versions.
	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:514)
	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:63)
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
	... 7 more


Does this mean anything to anyone? Am I doing anything wrong or did
FlinkRunner change in some way? The mentioned "bundle-buffer-tag" seems to
be too deep internal in runner for my reach.

Any help is much appreciated.

Best,
Jozo

Re: Beam application upgrade on Flink crashes

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Unfortunately, there are currently no compatibility guarantees between different Beam versions. Beam itself doesn't have the required interfaces or procedures in place for supporting backwards compatibility of state and there have been quite some changes in the internals between Flink 1.4 and Flink 1.5 that made larger changed necessary in how the Beam-Flink runner handles operator state.

Best,
Aljoscha

> On 22. Aug 2018, at 12:14, Jozef Vilcek <jo...@gmail.com> wrote:
> 
> Hm, I am sorry to hear this. I must of missed it in docs, that beam version upgrades can break flink state. It is important information for ones wanting to use Beam on Flink in production.
> 
> So, I guess there is no guarantee for another bump of Flink version to not break things until it reach 1.7. 
> Event then, thinks can break maybe? Is there a plan making Flink runner more robust and catch compatibility issues early by tests?
> 
> Just trying to figure out my options with upgrades. Does other runners suffer the same weak guarantees?
> 
> 
> On Tue, Aug 21, 2018 at 9:25 PM Stephan Ewen <sewen@apache.org <ma...@apache.org>> wrote:
> Flink 1.7 will change the way the "restore serializer" is handled, which should make it much easier to handle such cases.
> Especially breaking java class version format will not be an issue anymore.
> 
> That should help to make it easier to give the Beam-on-Flink runner cross version compatibility.
> 
> 
> On Mon, Aug 20, 2018 at 6:46 PM, Maximilian Michels <mxm@apache.org <ma...@apache.org>> wrote:
> AFAIK the serializer used here is the CoderTypeSerializer which may not
> be recoverable because of changes to the contained Coder
> (TaggedKvCoder). It doesn't currently have a serialVersionUID, so even
> small changes could break serialization backwards-compatibility.
> 
> As of now Beam doesn't offer the same upgrade guarantees as Flink [1].
> This should be improved for the next release.
> 
> Thanks,
> Max
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table <https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table>
> 
> On 20.08.18 17:46, Stephan Ewen wrote:
> > Hi Jozef!
> > 
> > When restoring state, the serializer that created the state must still
> > be available, so the state can be read.
> > It looks like some serializer classes were removed between Beam versions
> > (or changed in an incompatible manner).
> > 
> > Backwards compatibility of an operator implementation needs cooperation
> > from the operator. Withing Flink itself, when we change the way an
> > operator uses state, we keep the old codepath and classes in a
> > "backwards compatibility restore" that takes the old state and brings it
> > into the shape of the new state. 
> > 
> > I am not deeply into the of how Beam and the Flink runner implement
> > their use of state, but it looks this part is not present, which could
> > mean that savepoints taken from Beam applications are not backwards
> > compatible.
> > 
> > 
> > On Mon, Aug 20, 2018 at 4:03 PM, Jozef Vilcek <jozo.vilcek@gmail.com <ma...@gmail.com>
> > <mailto:jozo.vilcek@gmail.com <ma...@gmail.com>>> wrote:
> > 
> >     Hello,
> > 
> >     I am attempting to upgrade  Beam app from 2.5.0 running on Flink
> >     1.4.0 to Beam 2.6.0 running on Flink 1.5.0. I am not aware of any
> >     state migration changes needed for Flink 1.4.0 -> 1.5.0 so I am just
> >     starting a new App with updated libs from Flink save-point captured
> >     by previous version of the app.
> > 
> >     There is not change in topology. Job is accepted without error to
> >     the new cluster which suggests that all operators are matched with
> >     state based on IDs. However, app runs only few seccons and then
> >     crash with:
> > 
> >     java.lang.Exception: Exception while creating StreamOperatorStateContext.
> >       at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> >       at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
> >       at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
> >       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> >       at java.lang.Thread.run(Thread.java:745)
> >     Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from any of the 1 provided restore options.
> >       at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> >       at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
> >       at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
> >       ... 5 more
> >     Caused by: java.io.IOException: Unable to restore operator state [bundle-buffer-tag]. The previous serializer of the operator state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.
> >       at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:514)
> >       at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:63)
> >       at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
> >       at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
> >       ... 7 more
> > 
> > 
> >     Does this mean anything to anyone? Am I doing anything wrong or did
> >     FlinkRunner change in some way? The mentioned "bundle-buffer-tag"
> >     seems to be too deep internal in runner for my reach.
> > 
> >     Any help is much appreciated.
> > 
> >     Best,
> >     Jozo
> > 
> > 
> 


Re: Beam application upgrade on Flink crashes

Posted by Jozef Vilcek <jo...@gmail.com>.
Hm, I am sorry to hear this. I must of missed it in docs, that beam version
upgrades can break flink state. It is important information for ones
wanting to use Beam on Flink in production.

So, I guess there is no guarantee for another bump of Flink version to not
break things until it reach 1.7.
Event then, thinks can break maybe? Is there a plan making Flink runner
more robust and catch compatibility issues early by tests?

Just trying to figure out my options with upgrades. Does other runners
suffer the same weak guarantees?


On Tue, Aug 21, 2018 at 9:25 PM Stephan Ewen <se...@apache.org> wrote:

> Flink 1.7 will change the way the "restore serializer" is handled, which
> should make it much easier to handle such cases.
> Especially breaking java class version format will not be an issue anymore.
>
> That should help to make it easier to give the Beam-on-Flink runner cross
> version compatibility.
>
>
> On Mon, Aug 20, 2018 at 6:46 PM, Maximilian Michels <mx...@apache.org>
> wrote:
>
>> AFAIK the serializer used here is the CoderTypeSerializer which may not
>> be recoverable because of changes to the contained Coder
>> (TaggedKvCoder). It doesn't currently have a serialVersionUID, so even
>> small changes could break serialization backwards-compatibility.
>>
>> As of now Beam doesn't offer the same upgrade guarantees as Flink [1].
>> This should be improved for the next release.
>>
>> Thanks,
>> Max
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>>
>> On 20.08.18 17:46, Stephan Ewen wrote:
>> > Hi Jozef!
>> >
>> > When restoring state, the serializer that created the state must still
>> > be available, so the state can be read.
>> > It looks like some serializer classes were removed between Beam versions
>> > (or changed in an incompatible manner).
>> >
>> > Backwards compatibility of an operator implementation needs cooperation
>> > from the operator. Withing Flink itself, when we change the way an
>> > operator uses state, we keep the old codepath and classes in a
>> > "backwards compatibility restore" that takes the old state and brings it
>> > into the shape of the new state.
>> >
>> > I am not deeply into the of how Beam and the Flink runner implement
>> > their use of state, but it looks this part is not present, which could
>> > mean that savepoints taken from Beam applications are not backwards
>> > compatible.
>> >
>> >
>> > On Mon, Aug 20, 2018 at 4:03 PM, Jozef Vilcek <jozo.vilcek@gmail.com
>> > <ma...@gmail.com>> wrote:
>> >
>> >     Hello,
>> >
>> >     I am attempting to upgrade  Beam app from 2.5.0 running on Flink
>> >     1.4.0 to Beam 2.6.0 running on Flink 1.5.0. I am not aware of any
>> >     state migration changes needed for Flink 1.4.0 -> 1.5.0 so I am just
>> >     starting a new App with updated libs from Flink save-point captured
>> >     by previous version of the app.
>> >
>> >     There is not change in topology. Job is accepted without error to
>> >     the new cluster which suggests that all operators are matched with
>> >     state based on IDs. However, app runs only few seccons and then
>> >     crash with:
>> >
>> >     java.lang.Exception: Exception while creating
>> StreamOperatorStateContext.
>> >       at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>> >       at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>> >       at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>> >       at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>> >       at java.lang.Thread.run(Thread.java:745)
>> >     Caused by: org.apache.flink.util.FlinkException: Could not restore
>> operator state backend for
>> DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from any of the 1
>> provided restore options.
>> >       at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>> >       at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
>> >       at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
>> >       ... 5 more
>> >     Caused by: java.io.IOException: Unable to restore operator state
>> [bundle-buffer-tag]. The previous serializer of the operator state must be
>> present; the serializer could have been removed from the classpath, or its
>> implementation have changed and could not be loaded. This is a temporary
>> restriction that will be fixed in future versions.
>> >       at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:514)
>> >       at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:63)
>> >       at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>> >       at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>> >       ... 7 more
>> >
>> >
>> >     Does this mean anything to anyone? Am I doing anything wrong or did
>> >     FlinkRunner change in some way? The mentioned "bundle-buffer-tag"
>> >     seems to be too deep internal in runner for my reach.
>> >
>> >     Any help is much appreciated.
>> >
>> >     Best,
>> >     Jozo
>> >
>> >
>>
>
>

Re: Beam application upgrade on Flink crashes

Posted by Stephan Ewen <se...@apache.org>.
Flink 1.7 will change the way the "restore serializer" is handled, which
should make it much easier to handle such cases.
Especially breaking java class version format will not be an issue anymore.

That should help to make it easier to give the Beam-on-Flink runner cross
version compatibility.


On Mon, Aug 20, 2018 at 6:46 PM, Maximilian Michels <mx...@apache.org> wrote:

> AFAIK the serializer used here is the CoderTypeSerializer which may not
> be recoverable because of changes to the contained Coder
> (TaggedKvCoder). It doesn't currently have a serialVersionUID, so even
> small changes could break serialization backwards-compatibility.
>
> As of now Beam doesn't offer the same upgrade guarantees as Flink [1].
> This should be improved for the next release.
>
> Thanks,
> Max
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#
> compatibility-table
>
> On 20.08.18 17:46, Stephan Ewen wrote:
> > Hi Jozef!
> >
> > When restoring state, the serializer that created the state must still
> > be available, so the state can be read.
> > It looks like some serializer classes were removed between Beam versions
> > (or changed in an incompatible manner).
> >
> > Backwards compatibility of an operator implementation needs cooperation
> > from the operator. Withing Flink itself, when we change the way an
> > operator uses state, we keep the old codepath and classes in a
> > "backwards compatibility restore" that takes the old state and brings it
> > into the shape of the new state.
> >
> > I am not deeply into the of how Beam and the Flink runner implement
> > their use of state, but it looks this part is not present, which could
> > mean that savepoints taken from Beam applications are not backwards
> > compatible.
> >
> >
> > On Mon, Aug 20, 2018 at 4:03 PM, Jozef Vilcek <jozo.vilcek@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     Hello,
> >
> >     I am attempting to upgrade  Beam app from 2.5.0 running on Flink
> >     1.4.0 to Beam 2.6.0 running on Flink 1.5.0. I am not aware of any
> >     state migration changes needed for Flink 1.4.0 -> 1.5.0 so I am just
> >     starting a new App with updated libs from Flink save-point captured
> >     by previous version of the app.
> >
> >     There is not change in topology. Job is accepted without error to
> >     the new cluster which suggests that all operators are matched with
> >     state based on IDs. However, app runs only few seccons and then
> >     crash with:
> >
> >     java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
> >       at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:191)
> >       at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
> >       at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:730)
> >       at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:295)
> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> >       at java.lang.Thread.run(Thread.java:745)
> >     Caused by: org.apache.flink.util.FlinkException: Could not restore
> operator state backend for DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100)
> from any of the 1 provided restore options.
> >       at org.apache.flink.streaming.api.operators.
> BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:
> 137)
> >       at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.operatorStateBackend(
> StreamTaskStateInitializerImpl.java:240)
> >       at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:139)
> >       ... 5 more
> >     Caused by: java.io.IOException: Unable to restore operator state
> [bundle-buffer-tag]. The previous serializer of the operator state must be
> present; the serializer could have been removed from the classpath, or its
> implementation have changed and could not be loaded. This is a temporary
> restriction that will be fixed in future versions.
> >       at org.apache.flink.runtime.state.DefaultOperatorStateBackend.
> restore(DefaultOperatorStateBackend.java:514)
> >       at org.apache.flink.runtime.state.DefaultOperatorStateBackend.
> restore(DefaultOperatorStateBackend.java:63)
> >       at org.apache.flink.streaming.api.operators.
> BackendRestorerProcedure.attemptCreateAndRestore(
> BackendRestorerProcedure.java:151)
> >       at org.apache.flink.streaming.api.operators.
> BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:
> 123)
> >       ... 7 more
> >
> >
> >     Does this mean anything to anyone? Am I doing anything wrong or did
> >     FlinkRunner change in some way? The mentioned "bundle-buffer-tag"
> >     seems to be too deep internal in runner for my reach.
> >
> >     Any help is much appreciated.
> >
> >     Best,
> >     Jozo
> >
> >
>

Re: Beam application upgrade on Flink crashes

Posted by Maximilian Michels <mx...@apache.org>.
AFAIK the serializer used here is the CoderTypeSerializer which may not
be recoverable because of changes to the contained Coder
(TaggedKvCoder). It doesn't currently have a serialVersionUID, so even
small changes could break serialization backwards-compatibility.

As of now Beam doesn't offer the same upgrade guarantees as Flink [1].
This should be improved for the next release.

Thanks,
Max

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table

On 20.08.18 17:46, Stephan Ewen wrote:
> Hi Jozef!
> 
> When restoring state, the serializer that created the state must still
> be available, so the state can be read.
> It looks like some serializer classes were removed between Beam versions
> (or changed in an incompatible manner).
> 
> Backwards compatibility of an operator implementation needs cooperation
> from the operator. Withing Flink itself, when we change the way an
> operator uses state, we keep the old codepath and classes in a
> "backwards compatibility restore" that takes the old state and brings it
> into the shape of the new state. 
> 
> I am not deeply into the of how Beam and the Flink runner implement
> their use of state, but it looks this part is not present, which could
> mean that savepoints taken from Beam applications are not backwards
> compatible.
> 
> 
> On Mon, Aug 20, 2018 at 4:03 PM, Jozef Vilcek <jozo.vilcek@gmail.com
> <ma...@gmail.com>> wrote:
> 
>     Hello,
> 
>     I am attempting to upgrade  Beam app from 2.5.0 running on Flink
>     1.4.0 to Beam 2.6.0 running on Flink 1.5.0. I am not aware of any
>     state migration changes needed for Flink 1.4.0 -> 1.5.0 so I am just
>     starting a new App with updated libs from Flink save-point captured
>     by previous version of the app.
> 
>     There is not change in topology. Job is accepted without error to
>     the new cluster which suggests that all operators are matched with
>     state based on IDs. However, app runs only few seccons and then
>     crash with:
> 
>     java.lang.Exception: Exception while creating StreamOperatorStateContext.
>     	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>     	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>     	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>     	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>     	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>     	at java.lang.Thread.run(Thread.java:745)
>     Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from any of the 1 provided restore options.
>     	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>     	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
>     	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
>     	... 5 more
>     Caused by: java.io.IOException: Unable to restore operator state [bundle-buffer-tag]. The previous serializer of the operator state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.
>     	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:514)
>     	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:63)
>     	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>     	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>     	... 7 more
> 
> 
>     Does this mean anything to anyone? Am I doing anything wrong or did
>     FlinkRunner change in some way? The mentioned "bundle-buffer-tag"
>     seems to be too deep internal in runner for my reach.
> 
>     Any help is much appreciated.
> 
>     Best,
>     Jozo
> 
> 

Re: Beam application upgrade on Flink crashes

Posted by Stephan Ewen <se...@apache.org>.
Hi Jozef!

When restoring state, the serializer that created the state must still be
available, so the state can be read.
It looks like some serializer classes were removed between Beam versions
(or changed in an incompatible manner).

Backwards compatibility of an operator implementation needs cooperation
from the operator. Withing Flink itself, when we change the way an operator
uses state, we keep the old codepath and classes in a "backwards
compatibility restore" that takes the old state and brings it into the
shape of the new state.

I am not deeply into the of how Beam and the Flink runner implement their
use of state, but it looks this part is not present, which could mean that
savepoints taken from Beam applications are not backwards compatible.


On Mon, Aug 20, 2018 at 4:03 PM, Jozef Vilcek <jo...@gmail.com> wrote:

> Hello,
>
> I am attempting to upgrade  Beam app from 2.5.0 running on Flink 1.4.0 to
> Beam 2.6.0 running on Flink 1.5.0. I am not aware of any state migration
> changes needed for Flink 1.4.0 -> 1.5.0 so I am just starting a new App
> with updated libs from Flink save-point captured by previous version of the
> app.
>
> There is not change in topology. Job is accepted without error to the new
> cluster which suggests that all operators are matched with state based on
> IDs. However, app runs only few seccons and then crash with:
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from any of the 1 provided restore options.
> 	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
> 	... 5 more
> Caused by: java.io.IOException: Unable to restore operator state [bundle-buffer-tag]. The previous serializer of the operator state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.
> 	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:514)
> 	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:63)
> 	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
> 	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
> 	... 7 more
>
>
> Does this mean anything to anyone? Am I doing anything wrong or did
> FlinkRunner change in some way? The mentioned "bundle-buffer-tag" seems to
> be too deep internal in runner for my reach.
>
> Any help is much appreciated.
>
> Best,
> Jozo
>