You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jan Lukavský <je...@seznam.cz> on 2024/02/21 15:31:04 UTC

Pipeline upgrade to 2.55.0-SNAPSHOT broken for FlinkRunner

Hi,

while implementing FlinkRunner for Flink 1.17 I tried to verify that a 
running Pipeline is able to successfully upgrade from Flink 1.16 to 
Flink 1.17. There is some change regarding serialization needed for 
Flink 1.17, so this was a concern. Unfortunately recently we merged 
core-construction-java into SDK, which resulted in some classes being 
repackaged. Unfortunately, we serialize some classes into Flink's 
check/savepoints. The renaming of the class therefore ends with the 
following exception trying to restore from the savepoint:

Caused by: java.lang.ClassNotFoundException: 
org.apache.beam.runners.core.construction.SerializablePipelineOptions
     at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
     at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
     at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
     at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
     at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
     at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
     at 
org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
     at java.base/java.lang.Class.forName0(Native Method)
     at java.base/java.lang.Class.forName(Class.java:398)
     at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
     at 
org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)


This means that no Pipeline will be able to successfully upgrade from 
version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be 
restarted from scratch). I wanted to know how the community would feel 
about that, this consequence probably was not clear when we merged the 
artifacts. The only option would be to revert the merge and then try to 
figure out how to avoid Java serialization in Flink's savepoints. That 
would definitely be costly in terms of implementation and even more to 
provide ways to transfer old savepoints to the new format (can be 
possible using state processor API). I'm aware that Beam provides no 
general guarantees about the upgrade compatibility, so it might be fine 
to just ignore this, I just wanted to shout this out loud so that we can 
make a deliberate decision.

Best,

  Jan


Re: Pipeline upgrade to 2.55.0-SNAPSHOT broken for FlinkRunner

Posted by Kenneth Knowles <ke...@apache.org>.
Great. Let me know if I can help. I broke it after all :-)

Kenn

On Thu, Feb 22, 2024 at 2:58 AM Jan Lukavský <je...@seznam.cz> wrote:

> Reasons we use Java serialization are not fundamental, probably only
> historical. Thinking about it, yes, there is lucky coincidence that we
> currently have to change the serialization because of Flink 1.17 support.
> Flink 1.17 actually removes the legacy java serialization from Flink and
> enforces custom serialization. Therefore, we need to introduce an upgrade
> compatible change of serialization to support Flink 1.17. This is already
> implemented in [1]. The PR can go further, though. We can replace Java
> serialization of Coder in the TypeSerializerSnapshot and use the portable
> representation of Coder (which will still use Java serialization in some
> cases, but might avoid it at least for well-known coders, moreover Coders
> should be more upgrade-stable classes).
>
> I'll try to restore the SerializablePipelineOptions (copy&paste) in
> FlinkRunner only and rework the serialization in a more stable way (at
> least avoid serializing the CoderTypeSerializer, which references the
> SerializablePipelineOptions).
>
> I created [2] and marked it as blocker for 2.55.0 release, because
> otherwise we would break the upgrade.
>
> Thanks for the discussion, it helped a lot.
>
>  Jan
>
> [1] https://github.com/apache/beam/pull/30197
>
> [2] https://github.com/apache/beam/issues/30385
> On 2/21/24 20:33, Kenneth Knowles wrote:
>
> Yea I think we should restore the necessary classes but also fix the
> FlinkRunner. Java serialization is inherently self-update-incompatible.
>
> On Wed, Feb 21, 2024 at 1:35 PM Reuven Lax via dev <de...@beam.apache.org>
> wrote:
>
>> Is there a fundamental reason we serialize java classes into Flink
>> savepoints.
>>
>> On Wed, Feb 21, 2024 at 9:51 AM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> We could consider merging the gradle targets without renaming the
>>> classpaths as an intermediate step.
>>>
>>> Optimistically, perhaps there's a small number of classes that we need
>>> to preserve (e.g. SerializablePipelineOptions looks like it was
>>> something specifically intended to be serialized; maybe that an a
>>> handful of others (that implement Serializable) could be left in their
>>> original packages for backwards compatibility reasons?
>>>
>>> On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský <je...@seznam.cz> wrote:
>>> >
>>> > Hi,
>>> >
>>> > while implementing FlinkRunner for Flink 1.17 I tried to verify that a
>>> > running Pipeline is able to successfully upgrade from Flink 1.16 to
>>> > Flink 1.17. There is some change regarding serialization needed for
>>> > Flink 1.17, so this was a concern. Unfortunately recently we merged
>>> > core-construction-java into SDK, which resulted in some classes being
>>> > repackaged. Unfortunately, we serialize some classes into Flink's
>>> > check/savepoints. The renaming of the class therefore ends with the
>>> > following exception trying to restore from the savepoint:
>>> >
>>> > Caused by: java.lang.ClassNotFoundException:
>>> > org.apache.beam.runners.core.construction.SerializablePipelineOptions
>>> >      at java.base/java.net
>>> .URLClassLoader.findClass(URLClassLoader.java:476)
>>> >      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>> >      at
>>> >
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>>> >      at
>>> >
>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>> >      at
>>> >
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>>> >      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>> >      at
>>> >
>>> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
>>> >      at java.base/java.lang.Class.forName0(Native Method)
>>> >      at java.base/java.lang.Class.forName(Class.java:398)
>>> >      at
>>> >
>>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>> >      at
>>> >
>>> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
>>> >
>>> >
>>> > This means that no Pipeline will be able to successfully upgrade from
>>> > version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be
>>> > restarted from scratch). I wanted to know how the community would feel
>>> > about that, this consequence probably was not clear when we merged the
>>> > artifacts. The only option would be to revert the merge and then try to
>>> > figure out how to avoid Java serialization in Flink's savepoints. That
>>> > would definitely be costly in terms of implementation and even more to
>>> > provide ways to transfer old savepoints to the new format (can be
>>> > possible using state processor API). I'm aware that Beam provides no
>>> > general guarantees about the upgrade compatibility, so it might be fine
>>> > to just ignore this, I just wanted to shout this out loud so that we
>>> can
>>> > make a deliberate decision.
>>> >
>>> > Best,
>>> >
>>> >   Jan
>>> >
>>>
>>

Re: Pipeline upgrade to 2.55.0-SNAPSHOT broken for FlinkRunner

Posted by Robert Bradshaw via dev <de...@beam.apache.org>.
On Wed, Feb 21, 2024 at 11:58 PM Jan Lukavský <je...@seznam.cz> wrote:
>
> Reasons we use Java serialization are not fundamental, probably only historical. Thinking about it, yes, there is lucky coincidence that we currently have to change the serialization because of Flink 1.17 support. Flink 1.17 actually removes the legacy java serialization from Flink and enforces custom serialization. Therefore, we need to introduce an upgrade compatible change of serialization to support Flink 1.17. This is already implemented in [1]. The PR can go further, though. We can replace Java serialization of Coder in the TypeSerializerSnapshot and use the portable representation of Coder (which will still use Java serialization in some cases, but might avoid it at least for well-known coders, moreover Coders should be more upgrade-stable classes).
>
> I'll try to restore the SerializablePipelineOptions (copy&paste) in FlinkRunner only and rework the serialization in a more stable way (at least avoid serializing the CoderTypeSerializer, which references the SerializablePipelineOptions).

Thanks!

If other runners use this (SerializablePipelineOptions seems like it's
explicitly created for this purpose) we could consider putting the
copy into core rather than just the Flink packages.

> I created [2] and marked it as blocker for 2.55.0 release, because otherwise we would break the upgrade.
>
> Thanks for the discussion, it helped a lot.
>
>  Jan
>
> [1] https://github.com/apache/beam/pull/30197
>
> [2] https://github.com/apache/beam/issues/30385
>
> On 2/21/24 20:33, Kenneth Knowles wrote:
>
> Yea I think we should restore the necessary classes but also fix the FlinkRunner. Java serialization is inherently self-update-incompatible.
>
> On Wed, Feb 21, 2024 at 1:35 PM Reuven Lax via dev <de...@beam.apache.org> wrote:
>>
>> Is there a fundamental reason we serialize java classes into Flink savepoints.
>>
>> On Wed, Feb 21, 2024 at 9:51 AM Robert Bradshaw via dev <de...@beam.apache.org> wrote:
>>>
>>> We could consider merging the gradle targets without renaming the
>>> classpaths as an intermediate step.
>>>
>>> Optimistically, perhaps there's a small number of classes that we need
>>> to preserve (e.g. SerializablePipelineOptions looks like it was
>>> something specifically intended to be serialized; maybe that an a
>>> handful of others (that implement Serializable) could be left in their
>>> original packages for backwards compatibility reasons?
>>>
>>> On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský <je...@seznam.cz> wrote:
>>> >
>>> > Hi,
>>> >
>>> > while implementing FlinkRunner for Flink 1.17 I tried to verify that a
>>> > running Pipeline is able to successfully upgrade from Flink 1.16 to
>>> > Flink 1.17. There is some change regarding serialization needed for
>>> > Flink 1.17, so this was a concern. Unfortunately recently we merged
>>> > core-construction-java into SDK, which resulted in some classes being
>>> > repackaged. Unfortunately, we serialize some classes into Flink's
>>> > check/savepoints. The renaming of the class therefore ends with the
>>> > following exception trying to restore from the savepoint:
>>> >
>>> > Caused by: java.lang.ClassNotFoundException:
>>> > org.apache.beam.runners.core.construction.SerializablePipelineOptions
>>> >      at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>>> >      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>> >      at
>>> > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>>> >      at
>>> > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>> >      at
>>> > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>>> >      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>> >      at
>>> > org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
>>> >      at java.base/java.lang.Class.forName0(Native Method)
>>> >      at java.base/java.lang.Class.forName(Class.java:398)
>>> >      at
>>> > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>> >      at
>>> > org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
>>> >
>>> >
>>> > This means that no Pipeline will be able to successfully upgrade from
>>> > version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be
>>> > restarted from scratch). I wanted to know how the community would feel
>>> > about that, this consequence probably was not clear when we merged the
>>> > artifacts. The only option would be to revert the merge and then try to
>>> > figure out how to avoid Java serialization in Flink's savepoints. That
>>> > would definitely be costly in terms of implementation and even more to
>>> > provide ways to transfer old savepoints to the new format (can be
>>> > possible using state processor API). I'm aware that Beam provides no
>>> > general guarantees about the upgrade compatibility, so it might be fine
>>> > to just ignore this, I just wanted to shout this out loud so that we can
>>> > make a deliberate decision.
>>> >
>>> > Best,
>>> >
>>> >   Jan
>>> >

Re: Pipeline upgrade to 2.55.0-SNAPSHOT broken for FlinkRunner

Posted by Jan Lukavský <je...@seznam.cz>.
Reasons we use Java serialization are not fundamental, probably only 
historical. Thinking about it, yes, there is lucky coincidence that we 
currently have to change the serialization because of Flink 1.17 
support. Flink 1.17 actually removes the legacy java serialization from 
Flink and enforces custom serialization. Therefore, we need to introduce 
an upgrade compatible change of serialization to support Flink 1.17. 
This is already implemented in [1]. The PR can go further, though. We 
can replace Java serialization of Coder in the TypeSerializerSnapshot 
and use the portable representation of Coder (which will still use Java 
serialization in some cases, but might avoid it at least for well-known 
coders, moreover Coders should be more upgrade-stable classes).

I'll try to restore the SerializablePipelineOptions (copy&paste) in 
FlinkRunner only and rework the serialization in a more stable way (at 
least avoid serializing the CoderTypeSerializer, which references the 
SerializablePipelineOptions).

I created [2] and marked it as blocker for 2.55.0 release, because 
otherwise we would break the upgrade.

Thanks for the discussion, it helped a lot.

  Jan

[1] https://github.com/apache/beam/pull/30197

[2] https://github.com/apache/beam/issues/30385

On 2/21/24 20:33, Kenneth Knowles wrote:
> Yea I think we should restore the necessary classes but also fix the 
> FlinkRunner. Java serialization is inherently self-update-incompatible.
>
> On Wed, Feb 21, 2024 at 1:35 PM Reuven Lax via dev 
> <de...@beam.apache.org> wrote:
>
>     Is there a fundamental reason we serialize java classes into Flink
>     savepoints.
>
>     On Wed, Feb 21, 2024 at 9:51 AM Robert Bradshaw via dev
>     <de...@beam.apache.org> wrote:
>
>         We could consider merging the gradle targets without renaming the
>         classpaths as an intermediate step.
>
>         Optimistically, perhaps there's a small number of classes that
>         we need
>         to preserve (e.g. SerializablePipelineOptions looks like it was
>         something specifically intended to be serialized; maybe that an a
>         handful of others (that implement Serializable) could be left
>         in their
>         original packages for backwards compatibility reasons?
>
>         On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský <je...@seznam.cz>
>         wrote:
>         >
>         > Hi,
>         >
>         > while implementing FlinkRunner for Flink 1.17 I tried to
>         verify that a
>         > running Pipeline is able to successfully upgrade from Flink
>         1.16 to
>         > Flink 1.17. There is some change regarding serialization
>         needed for
>         > Flink 1.17, so this was a concern. Unfortunately recently we
>         merged
>         > core-construction-java into SDK, which resulted in some
>         classes being
>         > repackaged. Unfortunately, we serialize some classes into
>         Flink's
>         > check/savepoints. The renaming of the class therefore ends
>         with the
>         > following exception trying to restore from the savepoint:
>         >
>         > Caused by: java.lang.ClassNotFoundException:
>         >
>         org.apache.beam.runners.core.construction.SerializablePipelineOptions
>         >      at java.base/java.net
>         <http://java.net>.URLClassLoader.findClass(URLClassLoader.java:476)
>         >      at
>         java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>         >      at
>         >
>         org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>         >      at
>         >
>         org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>         >      at
>         >
>         org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>         >      at
>         java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>         >      at
>         >
>         org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
>         >      at java.base/java.lang.Class.forName0(Native Method)
>         >      at java.base/java.lang.Class.forName(Class.java:398)
>         >      at
>         >
>         org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>         >      at
>         >
>         org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
>         >
>         >
>         > This means that no Pipeline will be able to successfully
>         upgrade from
>         > version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will
>         have to be
>         > restarted from scratch). I wanted to know how the community
>         would feel
>         > about that, this consequence probably was not clear when we
>         merged the
>         > artifacts. The only option would be to revert the merge and
>         then try to
>         > figure out how to avoid Java serialization in Flink's
>         savepoints. That
>         > would definitely be costly in terms of implementation and
>         even more to
>         > provide ways to transfer old savepoints to the new format
>         (can be
>         > possible using state processor API). I'm aware that Beam
>         provides no
>         > general guarantees about the upgrade compatibility, so it
>         might be fine
>         > to just ignore this, I just wanted to shout this out loud so
>         that we can
>         > make a deliberate decision.
>         >
>         > Best,
>         >
>         >   Jan
>         >
>

Re: Pipeline upgrade to 2.55.0-SNAPSHOT broken for FlinkRunner

Posted by Kenneth Knowles <ke...@apache.org>.
Yea I think we should restore the necessary classes but also fix the
FlinkRunner. Java serialization is inherently self-update-incompatible.

On Wed, Feb 21, 2024 at 1:35 PM Reuven Lax via dev <de...@beam.apache.org>
wrote:

> Is there a fundamental reason we serialize java classes into Flink
> savepoints.
>
> On Wed, Feb 21, 2024 at 9:51 AM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> We could consider merging the gradle targets without renaming the
>> classpaths as an intermediate step.
>>
>> Optimistically, perhaps there's a small number of classes that we need
>> to preserve (e.g. SerializablePipelineOptions looks like it was
>> something specifically intended to be serialized; maybe that an a
>> handful of others (that implement Serializable) could be left in their
>> original packages for backwards compatibility reasons?
>>
>> On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský <je...@seznam.cz> wrote:
>> >
>> > Hi,
>> >
>> > while implementing FlinkRunner for Flink 1.17 I tried to verify that a
>> > running Pipeline is able to successfully upgrade from Flink 1.16 to
>> > Flink 1.17. There is some change regarding serialization needed for
>> > Flink 1.17, so this was a concern. Unfortunately recently we merged
>> > core-construction-java into SDK, which resulted in some classes being
>> > repackaged. Unfortunately, we serialize some classes into Flink's
>> > check/savepoints. The renaming of the class therefore ends with the
>> > following exception trying to restore from the savepoint:
>> >
>> > Caused by: java.lang.ClassNotFoundException:
>> > org.apache.beam.runners.core.construction.SerializablePipelineOptions
>> >      at java.base/java.net
>> .URLClassLoader.findClass(URLClassLoader.java:476)
>> >      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> >      at
>> >
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>> >      at
>> >
>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> >      at
>> >
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>> >      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> >      at
>> >
>> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
>> >      at java.base/java.lang.Class.forName0(Native Method)
>> >      at java.base/java.lang.Class.forName(Class.java:398)
>> >      at
>> >
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>> >      at
>> >
>> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
>> >
>> >
>> > This means that no Pipeline will be able to successfully upgrade from
>> > version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be
>> > restarted from scratch). I wanted to know how the community would feel
>> > about that, this consequence probably was not clear when we merged the
>> > artifacts. The only option would be to revert the merge and then try to
>> > figure out how to avoid Java serialization in Flink's savepoints. That
>> > would definitely be costly in terms of implementation and even more to
>> > provide ways to transfer old savepoints to the new format (can be
>> > possible using state processor API). I'm aware that Beam provides no
>> > general guarantees about the upgrade compatibility, so it might be fine
>> > to just ignore this, I just wanted to shout this out loud so that we can
>> > make a deliberate decision.
>> >
>> > Best,
>> >
>> >   Jan
>> >
>>
>

Re: Pipeline upgrade to 2.55.0-SNAPSHOT broken for FlinkRunner

Posted by Reuven Lax via dev <de...@beam.apache.org>.
Is there a fundamental reason we serialize java classes into Flink
savepoints.

On Wed, Feb 21, 2024 at 9:51 AM Robert Bradshaw via dev <de...@beam.apache.org>
wrote:

> We could consider merging the gradle targets without renaming the
> classpaths as an intermediate step.
>
> Optimistically, perhaps there's a small number of classes that we need
> to preserve (e.g. SerializablePipelineOptions looks like it was
> something specifically intended to be serialized; maybe that an a
> handful of others (that implement Serializable) could be left in their
> original packages for backwards compatibility reasons?
>
> On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský <je...@seznam.cz> wrote:
> >
> > Hi,
> >
> > while implementing FlinkRunner for Flink 1.17 I tried to verify that a
> > running Pipeline is able to successfully upgrade from Flink 1.16 to
> > Flink 1.17. There is some change regarding serialization needed for
> > Flink 1.17, so this was a concern. Unfortunately recently we merged
> > core-construction-java into SDK, which resulted in some classes being
> > repackaged. Unfortunately, we serialize some classes into Flink's
> > check/savepoints. The renaming of the class therefore ends with the
> > following exception trying to restore from the savepoint:
> >
> > Caused by: java.lang.ClassNotFoundException:
> > org.apache.beam.runners.core.construction.SerializablePipelineOptions
> >      at java.base/java.net
> .URLClassLoader.findClass(URLClassLoader.java:476)
> >      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> >      at
> >
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
> >      at
> >
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> >      at
> >
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
> >      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> >      at
> >
> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
> >      at java.base/java.lang.Class.forName0(Native Method)
> >      at java.base/java.lang.Class.forName(Class.java:398)
> >      at
> >
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
> >      at
> >
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
> >
> >
> > This means that no Pipeline will be able to successfully upgrade from
> > version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be
> > restarted from scratch). I wanted to know how the community would feel
> > about that, this consequence probably was not clear when we merged the
> > artifacts. The only option would be to revert the merge and then try to
> > figure out how to avoid Java serialization in Flink's savepoints. That
> > would definitely be costly in terms of implementation and even more to
> > provide ways to transfer old savepoints to the new format (can be
> > possible using state processor API). I'm aware that Beam provides no
> > general guarantees about the upgrade compatibility, so it might be fine
> > to just ignore this, I just wanted to shout this out loud so that we can
> > make a deliberate decision.
> >
> > Best,
> >
> >   Jan
> >
>

Re: Pipeline upgrade to 2.55.0-SNAPSHOT broken for FlinkRunner

Posted by Robert Bradshaw via dev <de...@beam.apache.org>.
We could consider merging the gradle targets without renaming the
classpaths as an intermediate step.

Optimistically, perhaps there's a small number of classes that we need
to preserve (e.g. SerializablePipelineOptions looks like it was
something specifically intended to be serialized; maybe that an a
handful of others (that implement Serializable) could be left in their
original packages for backwards compatibility reasons?

On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský <je...@seznam.cz> wrote:
>
> Hi,
>
> while implementing FlinkRunner for Flink 1.17 I tried to verify that a
> running Pipeline is able to successfully upgrade from Flink 1.16 to
> Flink 1.17. There is some change regarding serialization needed for
> Flink 1.17, so this was a concern. Unfortunately recently we merged
> core-construction-java into SDK, which resulted in some classes being
> repackaged. Unfortunately, we serialize some classes into Flink's
> check/savepoints. The renaming of the class therefore ends with the
> following exception trying to restore from the savepoint:
>
> Caused by: java.lang.ClassNotFoundException:
> org.apache.beam.runners.core.construction.SerializablePipelineOptions
>      at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>      at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>      at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>      at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>      at
> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
>      at java.base/java.lang.Class.forName0(Native Method)
>      at java.base/java.lang.Class.forName(Class.java:398)
>      at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>      at
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
>
>
> This means that no Pipeline will be able to successfully upgrade from
> version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be
> restarted from scratch). I wanted to know how the community would feel
> about that, this consequence probably was not clear when we merged the
> artifacts. The only option would be to revert the merge and then try to
> figure out how to avoid Java serialization in Flink's savepoints. That
> would definitely be costly in terms of implementation and even more to
> provide ways to transfer old savepoints to the new format (can be
> possible using state processor API). I'm aware that Beam provides no
> general guarantees about the upgrade compatibility, so it might be fine
> to just ignore this, I just wanted to shout this out loud so that we can
> make a deliberate decision.
>
> Best,
>
>   Jan
>