You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Antonio Si <an...@gmail.com> on 2020/12/19 18:33:48 UTC

Compatibility between Beam v2.23 and Beam v2.26

Hi,

We were using Beam v2.23 and recently, we are testing upgrade to Beam v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read and --fasterCopy=true.

We run into this exception when we resume our pipeline:

Caused by: java.io.InvalidClassException: org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local class incompatible: stream classdesc serialVersionUID = 5241803328188007316, local class serialVersionUID = 7247319138941746449
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
	at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
	at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
	at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
	at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)

It looks like it is not able to deserialize objects from our existing checkpoints. Is there any way we could resume our v2.23 checkpoints by v2.26?

Thanks for any suggestions.

Antonio.

Re: Compatibility between Beam v2.23 and Beam v2.26

Posted by Maximilian Michels <mx...@apache.org>.
Thanks for mentioning me here @Boyan.

In Beam there is no guarantee that checkpoints work across Beam 
releases. Checkpoint compatibility can break due to a lot of reasons 
(primarily DAG changes and serializer changes). Even though in this case 
the serialization id might have guaranteed compatibility, we make 
internal changes to Beam all the time. There is currently no process 
that we follow to ensure compatibility.

I do want to note that Flink has a serializer migration strategy which 
we currently do not leverage: 
https://github.com/apache/beam/blob/d8966d640549932d7551461ff59fa1085730f768/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L182

However, this requires that in addition to the new serializer, the old 
serializer is kept around. Flink will then migrate the state by reading 
first with the old serializer and then subsequently writing with the new 
one.

-Max

On 07.01.21 09:43, Jan Lukavský wrote:
> Hi Antonio,
> 
> can you please create one?
> 
> Thanks,
> 
>   Jan
> 
> On 1/6/21 10:31 PM, Antonio Si wrote:
>> Thanks for the information. Do we have a jira to track this issue or 
>> do you want me to create a jira for this?
>>
>> Thanks.
>>
>> Antonio.
>>
>> On 2021/01/06 17:59:47, Kenneth Knowles <ke...@apache.org> wrote:
>>> Agree with Boyuan & Kyle. That PR is the problem, and we probably do not
>>> have adequate testing. We have a cultural understanding of not breaking
>>> encoded data forms but this is the encoded form of the 
>>> TypeSerializer, and
>>> actually there are two problems.
>>>
>>> 1. When you have a serialized object that does not have the
>>> serialVersionUid explicitly set, the UID is generated based on many 
>>> details
>>> that are irrelevant for binary compatibility. Any Java-serialized object
>>> that is intended for anything other than transient transmission 
>>> *must* have
>>> a serialVersionUid set and an explicit serialized form. Else it is
>>> completely normal for it to break due to irrelevant changes. The
>>> serialVersionUid has no mechanism for upgrade/downgrade so you *must* 
>>> keep
>>> it the same forever, and any versioning or compat scheme exists 
>>> within the
>>> single serialVersionUid.
>>> 2. In this case there was an actual change to the fields of the object
>>> stored, so you need to explicitly add the serialized form and also the
>>> ability to read from prior serialized forms.
>>>
>>> I believe explicitly setting the serialVersionUid to the original (and
>>> keeping it that way forever) and adding the ability to decode prior 
>>> forms
>>> will regain the ability to read the snapshot. But also this seems like
>>> something that would be part of Flink best practice documentation since
>>> naive use of Java serialization often hits this problem.
>>>
>>> Kenn
>>>
>>> On Tue, Jan 5, 2021 at 4:30 PM Kyle Weaver <kc...@google.com> wrote:
>>>
>>>> This raises a few related questions from me:
>>>>
>>>> 1. Do we claim to support resuming Flink checkpoints made with previous
>>>> Beam versions?
>>>> 2. Does 1. require full binary compatibility between different 
>>>> versions of
>>>> runner internals like CoderTypeSerializer?
>>>>
>>> 3. Do we have tests for 1.?
>>> Kenn
>>>
>>>
>>>> On Tue, Jan 5, 2021 at 4:05 PM Boyuan Zhang <bo...@google.com> wrote:
>>>>
>>>>> https://github.com/apache/beam/pull/13240 seems suspicious to me.
>>>>>
>>>>>   +Maximilian Michels <ma...@maximilianmichels.com> Any insights here?
>>>>>
>>>>> On Tue, Jan 5, 2021 at 8:48 AM Antonio Si <an...@gmail.com> 
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I would like to followup with this question to see if there is a
>>>>>> solution/workaround for this issue.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> Antonio.
>>>>>>
>>>>>> On 2020/12/19 18:33:48, Antonio Si <an...@gmail.com> wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> We were using Beam v2.23 and recently, we are testing upgrade to 
>>>>>>> Beam
>>>>>> v2.26. For Beam v2.26, we are passing 
>>>>>> --experiments=use_deprecated_read and
>>>>>> --fasterCopy=true.
>>>>>>> We run into this exception when we resume our pipeline:
>>>>>>>
>>>>>>> Caused by: java.io.InvalidClassException:
>>>>>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; 
>>>>>> local
>>>>>> class incompatible: stream classdesc serialVersionUID =
>>>>>> 5241803328188007316, local class serialVersionUID = 
>>>>>> 7247319138941746449
>>>>>>>        at
>>>>>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>>>>>>>        at
>>>>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942) 
>>>>>>
>>>>>>>        at
>>>>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
>>>>>>>        at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099) 
>>>>>>
>>>>>>>        at
>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>>>>>>>        at
>>>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
>>>>>>>        at
>>>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
>>>>>>>        at
>>>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301) 
>>>>>>
>>>>>>>        at
>>>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116) 
>>>>>>
>>>>>>>        at
>>>>>> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113) 
>>>>>>
>>>>>>>        at
>>>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) 
>>>>>>
>>>>>>>        at
>>>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) 
>>>>>>
>>>>>>>        at
>>>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) 
>>>>>>
>>>>>>>        at
>>>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) 
>>>>>>
>>>>>>>        at
>>>>>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) 
>>>>>>
>>>>>>>        at
>>>>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) 
>>>>>>
>>>>>>>        at
>>>>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) 
>>>>>>
>>>>>>> It looks like it is not able to deserialize objects from our 
>>>>>>> existing
>>>>>> checkpoints. Is there any way we could resume our v2.23 
>>>>>> checkpoints by
>>>>>> v2.26?
>>>>>>> Thanks for any suggestions.
>>>>>>>
>>>>>>> Antonio.
>>>>>>>

Re: Compatibility between Beam v2.23 and Beam v2.26

Posted by Antonio Si <an...@gmail.com>.
Hi Jan,

I create this jira: https://issues.apache.org/jira/browse/BEAM-11583

Thanks.

Antonio.

On 2021/01/07 08:43:34, Jan Lukavský <je...@seznam.cz> wrote: 
> Hi Antonio,
> 
> can you please create one?
> 
> Thanks,
> 
>   Jan
> 
> On 1/6/21 10:31 PM, Antonio Si wrote:
> > Thanks for the information. Do we have a jira to track this issue or do you want me to create a jira for this?
> >
> > Thanks.
> >
> > Antonio.
> >
> > On 2021/01/06 17:59:47, Kenneth Knowles <ke...@apache.org> wrote:
> >> Agree with Boyuan & Kyle. That PR is the problem, and we probably do not
> >> have adequate testing. We have a cultural understanding of not breaking
> >> encoded data forms but this is the encoded form of the TypeSerializer, and
> >> actually there are two problems.
> >>
> >> 1. When you have a serialized object that does not have the
> >> serialVersionUid explicitly set, the UID is generated based on many details
> >> that are irrelevant for binary compatibility. Any Java-serialized object
> >> that is intended for anything other than transient transmission *must* have
> >> a serialVersionUid set and an explicit serialized form. Else it is
> >> completely normal for it to break due to irrelevant changes. The
> >> serialVersionUid has no mechanism for upgrade/downgrade so you *must* keep
> >> it the same forever, and any versioning or compat scheme exists within the
> >> single serialVersionUid.
> >> 2. In this case there was an actual change to the fields of the object
> >> stored, so you need to explicitly add the serialized form and also the
> >> ability to read from prior serialized forms.
> >>
> >> I believe explicitly setting the serialVersionUid to the original (and
> >> keeping it that way forever) and adding the ability to decode prior forms
> >> will regain the ability to read the snapshot. But also this seems like
> >> something that would be part of Flink best practice documentation since
> >> naive use of Java serialization often hits this problem.
> >>
> >> Kenn
> >>
> >> On Tue, Jan 5, 2021 at 4:30 PM Kyle Weaver <kc...@google.com> wrote:
> >>
> >>> This raises a few related questions from me:
> >>>
> >>> 1. Do we claim to support resuming Flink checkpoints made with previous
> >>> Beam versions?
> >>> 2. Does 1. require full binary compatibility between different versions of
> >>> runner internals like CoderTypeSerializer?
> >>>
> >> 3. Do we have tests for 1.?
> >> Kenn
> >>
> >>
> >>> On Tue, Jan 5, 2021 at 4:05 PM Boyuan Zhang <bo...@google.com> wrote:
> >>>
> >>>> https://github.com/apache/beam/pull/13240 seems suspicious to me.
> >>>>
> >>>>   +Maximilian Michels <ma...@maximilianmichels.com> Any insights here?
> >>>>
> >>>> On Tue, Jan 5, 2021 at 8:48 AM Antonio Si <an...@gmail.com> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> I would like to followup with this question to see if there is a
> >>>>> solution/workaround for this issue.
> >>>>>
> >>>>> Thanks.
> >>>>>
> >>>>> Antonio.
> >>>>>
> >>>>> On 2020/12/19 18:33:48, Antonio Si <an...@gmail.com> wrote:
> >>>>>> Hi,
> >>>>>>
> >>>>>> We were using Beam v2.23 and recently, we are testing upgrade to Beam
> >>>>> v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read and
> >>>>> --fasterCopy=true.
> >>>>>> We run into this exception when we resume our pipeline:
> >>>>>>
> >>>>>> Caused by: java.io.InvalidClassException:
> >>>>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local
> >>>>> class incompatible: stream classdesc serialVersionUID =
> >>>>> 5241803328188007316, local class serialVersionUID = 7247319138941746449
> >>>>>>        at
> >>>>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> >>>>>>        at
> >>>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
> >>>>>>        at
> >>>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
> >>>>>>        at
> >>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
> >>>>>>        at
> >>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
> >>>>>>        at
> >>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
> >>>>>>        at
> >>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
> >>>>>>        at
> >>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
> >>>>>>        at
> >>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
> >>>>>>        at
> >>>>> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
> >>>>>>        at
> >>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
> >>>>>>        at
> >>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
> >>>>>>        at
> >>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
> >>>>>>        at
> >>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
> >>>>>>        at
> >>>>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
> >>>>>>        at
> >>>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
> >>>>>>        at
> >>>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
> >>>>>> It looks like it is not able to deserialize objects from our existing
> >>>>> checkpoints. Is there any way we could resume our v2.23 checkpoints by
> >>>>> v2.26?
> >>>>>> Thanks for any suggestions.
> >>>>>>
> >>>>>> Antonio.
> >>>>>>
> 

Re: Compatibility between Beam v2.23 and Beam v2.26

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Antonio,

can you please create one?

Thanks,

  Jan

On 1/6/21 10:31 PM, Antonio Si wrote:
> Thanks for the information. Do we have a jira to track this issue or do you want me to create a jira for this?
>
> Thanks.
>
> Antonio.
>
> On 2021/01/06 17:59:47, Kenneth Knowles <ke...@apache.org> wrote:
>> Agree with Boyuan & Kyle. That PR is the problem, and we probably do not
>> have adequate testing. We have a cultural understanding of not breaking
>> encoded data forms but this is the encoded form of the TypeSerializer, and
>> actually there are two problems.
>>
>> 1. When you have a serialized object that does not have the
>> serialVersionUid explicitly set, the UID is generated based on many details
>> that are irrelevant for binary compatibility. Any Java-serialized object
>> that is intended for anything other than transient transmission *must* have
>> a serialVersionUid set and an explicit serialized form. Else it is
>> completely normal for it to break due to irrelevant changes. The
>> serialVersionUid has no mechanism for upgrade/downgrade so you *must* keep
>> it the same forever, and any versioning or compat scheme exists within the
>> single serialVersionUid.
>> 2. In this case there was an actual change to the fields of the object
>> stored, so you need to explicitly add the serialized form and also the
>> ability to read from prior serialized forms.
>>
>> I believe explicitly setting the serialVersionUid to the original (and
>> keeping it that way forever) and adding the ability to decode prior forms
>> will regain the ability to read the snapshot. But also this seems like
>> something that would be part of Flink best practice documentation since
>> naive use of Java serialization often hits this problem.
>>
>> Kenn
>>
>> On Tue, Jan 5, 2021 at 4:30 PM Kyle Weaver <kc...@google.com> wrote:
>>
>>> This raises a few related questions from me:
>>>
>>> 1. Do we claim to support resuming Flink checkpoints made with previous
>>> Beam versions?
>>> 2. Does 1. require full binary compatibility between different versions of
>>> runner internals like CoderTypeSerializer?
>>>
>> 3. Do we have tests for 1.?
>> Kenn
>>
>>
>>> On Tue, Jan 5, 2021 at 4:05 PM Boyuan Zhang <bo...@google.com> wrote:
>>>
>>>> https://github.com/apache/beam/pull/13240 seems suspicious to me.
>>>>
>>>>   +Maximilian Michels <ma...@maximilianmichels.com> Any insights here?
>>>>
>>>> On Tue, Jan 5, 2021 at 8:48 AM Antonio Si <an...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I would like to followup with this question to see if there is a
>>>>> solution/workaround for this issue.
>>>>>
>>>>> Thanks.
>>>>>
>>>>> Antonio.
>>>>>
>>>>> On 2020/12/19 18:33:48, Antonio Si <an...@gmail.com> wrote:
>>>>>> Hi,
>>>>>>
>>>>>> We were using Beam v2.23 and recently, we are testing upgrade to Beam
>>>>> v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read and
>>>>> --fasterCopy=true.
>>>>>> We run into this exception when we resume our pipeline:
>>>>>>
>>>>>> Caused by: java.io.InvalidClassException:
>>>>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local
>>>>> class incompatible: stream classdesc serialVersionUID =
>>>>> 5241803328188007316, local class serialVersionUID = 7247319138941746449
>>>>>>        at
>>>>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>>>>>>        at
>>>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
>>>>>>        at
>>>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
>>>>>>        at
>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
>>>>>>        at
>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>>>>>>        at
>>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
>>>>>>        at
>>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
>>>>>>        at
>>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
>>>>>>        at
>>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
>>>>>>        at
>>>>> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
>>>>>>        at
>>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
>>>>>>        at
>>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>>>>>>        at
>>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
>>>>>>        at
>>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
>>>>>>        at
>>>>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
>>>>>>        at
>>>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
>>>>>>        at
>>>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
>>>>>> It looks like it is not able to deserialize objects from our existing
>>>>> checkpoints. Is there any way we could resume our v2.23 checkpoints by
>>>>> v2.26?
>>>>>> Thanks for any suggestions.
>>>>>>
>>>>>> Antonio.
>>>>>>

Re: Compatibility between Beam v2.23 and Beam v2.26

Posted by Antonio Si <an...@gmail.com>.
Thanks for the information. Do we have a jira to track this issue or do you want me to create a jira for this?

Thanks.

Antonio.

On 2021/01/06 17:59:47, Kenneth Knowles <ke...@apache.org> wrote: 
> Agree with Boyuan & Kyle. That PR is the problem, and we probably do not
> have adequate testing. We have a cultural understanding of not breaking
> encoded data forms but this is the encoded form of the TypeSerializer, and
> actually there are two problems.
> 
> 1. When you have a serialized object that does not have the
> serialVersionUid explicitly set, the UID is generated based on many details
> that are irrelevant for binary compatibility. Any Java-serialized object
> that is intended for anything other than transient transmission *must* have
> a serialVersionUid set and an explicit serialized form. Else it is
> completely normal for it to break due to irrelevant changes. The
> serialVersionUid has no mechanism for upgrade/downgrade so you *must* keep
> it the same forever, and any versioning or compat scheme exists within the
> single serialVersionUid.
> 2. In this case there was an actual change to the fields of the object
> stored, so you need to explicitly add the serialized form and also the
> ability to read from prior serialized forms.
> 
> I believe explicitly setting the serialVersionUid to the original (and
> keeping it that way forever) and adding the ability to decode prior forms
> will regain the ability to read the snapshot. But also this seems like
> something that would be part of Flink best practice documentation since
> naive use of Java serialization often hits this problem.
> 
> Kenn
> 
> On Tue, Jan 5, 2021 at 4:30 PM Kyle Weaver <kc...@google.com> wrote:
> 
> > This raises a few related questions from me:
> >
> > 1. Do we claim to support resuming Flink checkpoints made with previous
> > Beam versions?
> > 2. Does 1. require full binary compatibility between different versions of
> > runner internals like CoderTypeSerializer?
> >
> 3. Do we have tests for 1.?
> >
> 
> Kenn
> 
> 
> > On Tue, Jan 5, 2021 at 4:05 PM Boyuan Zhang <bo...@google.com> wrote:
> >
> >> https://github.com/apache/beam/pull/13240 seems suspicious to me.
> >>
> >>  +Maximilian Michels <ma...@maximilianmichels.com> Any insights here?
> >>
> >> On Tue, Jan 5, 2021 at 8:48 AM Antonio Si <an...@gmail.com> wrote:
> >>
> >>> Hi,
> >>>
> >>> I would like to followup with this question to see if there is a
> >>> solution/workaround for this issue.
> >>>
> >>> Thanks.
> >>>
> >>> Antonio.
> >>>
> >>> On 2020/12/19 18:33:48, Antonio Si <an...@gmail.com> wrote:
> >>> > Hi,
> >>> >
> >>> > We were using Beam v2.23 and recently, we are testing upgrade to Beam
> >>> v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read and
> >>> --fasterCopy=true.
> >>> >
> >>> > We run into this exception when we resume our pipeline:
> >>> >
> >>> > Caused by: java.io.InvalidClassException:
> >>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local
> >>> class incompatible: stream classdesc serialVersionUID =
> >>> 5241803328188007316, local class serialVersionUID = 7247319138941746449
> >>> >       at
> >>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> >>> >       at
> >>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
> >>> >       at
> >>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
> >>> >       at
> >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
> >>> >       at
> >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
> >>> >       at
> >>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
> >>> >       at
> >>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
> >>> >       at
> >>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
> >>> >       at
> >>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
> >>> >       at
> >>> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
> >>> >       at
> >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
> >>> >       at
> >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
> >>> >       at
> >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
> >>> >       at
> >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
> >>> >       at
> >>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
> >>> >       at
> >>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
> >>> >       at
> >>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
> >>> >
> >>> > It looks like it is not able to deserialize objects from our existing
> >>> checkpoints. Is there any way we could resume our v2.23 checkpoints by
> >>> v2.26?
> >>> >
> >>> > Thanks for any suggestions.
> >>> >
> >>> > Antonio.
> >>> >
> >>>
> >>
> 

Re: Compatibility between Beam v2.23 and Beam v2.26

Posted by Kenneth Knowles <ke...@apache.org>.
Agree with Boyuan & Kyle. That PR is the problem, and we probably do not
have adequate testing. We have a cultural understanding of not breaking
encoded data forms but this is the encoded form of the TypeSerializer, and
actually there are two problems.

1. When you have a serialized object that does not have the
serialVersionUid explicitly set, the UID is generated based on many details
that are irrelevant for binary compatibility. Any Java-serialized object
that is intended for anything other than transient transmission *must* have
a serialVersionUid set and an explicit serialized form. Else it is
completely normal for it to break due to irrelevant changes. The
serialVersionUid has no mechanism for upgrade/downgrade so you *must* keep
it the same forever, and any versioning or compat scheme exists within the
single serialVersionUid.
2. In this case there was an actual change to the fields of the object
stored, so you need to explicitly add the serialized form and also the
ability to read from prior serialized forms.

I believe explicitly setting the serialVersionUid to the original (and
keeping it that way forever) and adding the ability to decode prior forms
will regain the ability to read the snapshot. But also this seems like
something that would be part of Flink best practice documentation since
naive use of Java serialization often hits this problem.

Kenn

On Tue, Jan 5, 2021 at 4:30 PM Kyle Weaver <kc...@google.com> wrote:

> This raises a few related questions from me:
>
> 1. Do we claim to support resuming Flink checkpoints made with previous
> Beam versions?
> 2. Does 1. require full binary compatibility between different versions of
> runner internals like CoderTypeSerializer?
>
3. Do we have tests for 1.?
>

Kenn


> On Tue, Jan 5, 2021 at 4:05 PM Boyuan Zhang <bo...@google.com> wrote:
>
>> https://github.com/apache/beam/pull/13240 seems suspicious to me.
>>
>>  +Maximilian Michels <ma...@maximilianmichels.com> Any insights here?
>>
>> On Tue, Jan 5, 2021 at 8:48 AM Antonio Si <an...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I would like to followup with this question to see if there is a
>>> solution/workaround for this issue.
>>>
>>> Thanks.
>>>
>>> Antonio.
>>>
>>> On 2020/12/19 18:33:48, Antonio Si <an...@gmail.com> wrote:
>>> > Hi,
>>> >
>>> > We were using Beam v2.23 and recently, we are testing upgrade to Beam
>>> v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read and
>>> --fasterCopy=true.
>>> >
>>> > We run into this exception when we resume our pipeline:
>>> >
>>> > Caused by: java.io.InvalidClassException:
>>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local
>>> class incompatible: stream classdesc serialVersionUID =
>>> 5241803328188007316, local class serialVersionUID = 7247319138941746449
>>> >       at
>>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>>> >       at
>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
>>> >       at
>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
>>> >       at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
>>> >       at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>>> >       at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
>>> >       at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
>>> >       at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
>>> >       at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
>>> >       at
>>> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
>>> >       at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
>>> >       at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>>> >       at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
>>> >       at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
>>> >       at
>>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
>>> >       at
>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
>>> >       at
>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
>>> >
>>> > It looks like it is not able to deserialize objects from our existing
>>> checkpoints. Is there any way we could resume our v2.23 checkpoints by
>>> v2.26?
>>> >
>>> > Thanks for any suggestions.
>>> >
>>> > Antonio.
>>> >
>>>
>>

Re: Compatibility between Beam v2.23 and Beam v2.26

Posted by Kyle Weaver <kc...@google.com>.
This raises a few related questions from me:

1. Do we claim to support resuming Flink checkpoints made with previous
Beam versions?
2. Does 1. require full binary compatibility between different versions of
runner internals like CoderTypeSerializer?
3. Do we have tests for 1.?

On Tue, Jan 5, 2021 at 4:05 PM Boyuan Zhang <bo...@google.com> wrote:

> https://github.com/apache/beam/pull/13240 seems suspicious to me.
>
>  +Maximilian Michels <ma...@maximilianmichels.com> Any insights here?
>
> On Tue, Jan 5, 2021 at 8:48 AM Antonio Si <an...@gmail.com> wrote:
>
>> Hi,
>>
>> I would like to followup with this question to see if there is a
>> solution/workaround for this issue.
>>
>> Thanks.
>>
>> Antonio.
>>
>> On 2020/12/19 18:33:48, Antonio Si <an...@gmail.com> wrote:
>> > Hi,
>> >
>> > We were using Beam v2.23 and recently, we are testing upgrade to Beam
>> v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read and
>> --fasterCopy=true.
>> >
>> > We run into this exception when we resume our pipeline:
>> >
>> > Caused by: java.io.InvalidClassException:
>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local
>> class incompatible: stream classdesc serialVersionUID =
>> 5241803328188007316, local class serialVersionUID = 7247319138941746449
>> >       at
>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>> >       at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
>> >       at
>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
>> >       at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
>> >       at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>> >       at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
>> >       at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
>> >       at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
>> >       at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
>> >       at
>> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
>> >       at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
>> >       at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>> >       at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
>> >       at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
>> >       at
>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
>> >       at
>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
>> >       at
>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
>> >
>> > It looks like it is not able to deserialize objects from our existing
>> checkpoints. Is there any way we could resume our v2.23 checkpoints by
>> v2.26?
>> >
>> > Thanks for any suggestions.
>> >
>> > Antonio.
>> >
>>
>

Re: Compatibility between Beam v2.23 and Beam v2.26

Posted by Boyuan Zhang <bo...@google.com>.
https://github.com/apache/beam/pull/13240 seems suspicious to me.

 +Maximilian Michels <ma...@maximilianmichels.com> Any insights here?

On Tue, Jan 5, 2021 at 8:48 AM Antonio Si <an...@gmail.com> wrote:

> Hi,
>
> I would like to followup with this question to see if there is a
> solution/workaround for this issue.
>
> Thanks.
>
> Antonio.
>
> On 2020/12/19 18:33:48, Antonio Si <an...@gmail.com> wrote:
> > Hi,
> >
> > We were using Beam v2.23 and recently, we are testing upgrade to Beam
> v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read and
> --fasterCopy=true.
> >
> > We run into this exception when we resume our pipeline:
> >
> > Caused by: java.io.InvalidClassException:
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local
> class incompatible: stream classdesc serialVersionUID =
> 5241803328188007316, local class serialVersionUID = 7247319138941746449
> >       at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> >       at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
> >       at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
> >       at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
> >       at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
> >       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
> >       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
> >       at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
> >       at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
> >       at
> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
> >       at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
> >       at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
> >       at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
> >       at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
> >       at
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
> >       at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
> >       at
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
> >
> > It looks like it is not able to deserialize objects from our existing
> checkpoints. Is there any way we could resume our v2.23 checkpoints by
> v2.26?
> >
> > Thanks for any suggestions.
> >
> > Antonio.
> >
>

Re: Compatibility between Beam v2.23 and Beam v2.26

Posted by Antonio Si <an...@gmail.com>.
Hi,

I would like to followup with this question to see if there is a solution/workaround for this issue.

Thanks.

Antonio.

On 2020/12/19 18:33:48, Antonio Si <an...@gmail.com> wrote: 
> Hi,
> 
> We were using Beam v2.23 and recently, we are testing upgrade to Beam v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read and --fasterCopy=true.
> 
> We run into this exception when we resume our pipeline:
> 
> Caused by: java.io.InvalidClassException: org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local class incompatible: stream classdesc serialVersionUID = 5241803328188007316, local class serialVersionUID = 7247319138941746449
> 	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> 	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
> 	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
> 	at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
> 	at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
> 	at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
> 
> It looks like it is not able to deserialize objects from our existing checkpoints. Is there any way we could resume our v2.23 checkpoints by v2.26?
> 
> Thanks for any suggestions.
> 
> Antonio.
>