You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lian Jiang <ji...@gmail.com> on 2020/09/30 00:22:19 UTC

Flink 1.12 snapshot throws ClassNotFoundException

Hi,

I use Flink source master to build a snapshot and use the jars in my
project. The goal is to avoid hacky deserialization code caused by avro 1.8
in old Flink versions since Flink 1.12 uses avro 1.10. Unfortunately, the
code throws below ClassNotFoundException. I have verified that the
akka-actor jar 2.5.12 is available and specified in -classpath. I can even
create an object using akka/serialization/NullSerializer class in my
application, indicating there is no problem for this app to use any class
under namespace akka/serialization.

Caused by: java.lang.NoClassDefFoundError:
akka/serialization/BaseSerializer$class
at
akka.remote.serialization.MiscMessageSerializer.<init>(MiscMessageSerializer.scala:25)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$1(ReflectiveDynamicAccess.scala:33)
at scala.util.Try$.apply(Try.scala:213)
at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:28)
at
akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$4(ReflectiveDynamicAccess.scala:39)
at scala.util.Success.flatMap(Try.scala:251)
at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:39)
at akka.serialization.Serialization.serializerOf(Serialization.scala:320)
at
akka.serialization.Serialization.$anonfun$serializers$2(Serialization.scala:346)
at
scala.collection.TraversableLike$WithFilter.$anonfun$map$2(TraversableLike.scala:874)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:394)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:721)
at
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:873)
at akka.serialization.Serialization.<init>(Serialization.scala:346)
at
akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:16)
at
akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:13)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:913)
at
akka.actor.ActorSystemImpl.$anonfun$loadExtensions$1(ActorSystem.scala:946)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at akka.actor.ActorSystemImpl.loadExtensions$1(ActorSystem.scala:944)
at akka.actor.ActorSystemImpl.loadExtensions(ActorSystem.scala:961)
at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:833)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:823)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:823)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:842)
at akka.actor.RobustActorSystem$.internalApply(RobustActorSystem.scala:96)
at akka.actor.RobustActorSystem$.apply(RobustActorSystem.scala:70)
at akka.actor.RobustActorSystem$.create(RobustActorSystem.scala:55)
at
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:125)
at
org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:276)
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:260)
... 11 more


This is my gradle:

implementation files('lib/flink-avro-confluent-registry-1.12-SNAPSHOT.jar')
implementation files('lib/flink-clients_2.11-1.12-SNAPSHOT.jar')
implementation files('lib/flink-connector-kafka_2.11-1.12-SNAPSHOT.jar')
implementation files('lib/flink-connector-wikiedits_2.11-1.12-SNAPSHOT.jar')
implementation files('lib/flink-core-1.12-SNAPSHOT.jar')
implementation files('lib/flink-java-1.12-SNAPSHOT.jar')
implementation files('lib/flink-metrics-dropwizard-1.12-SNAPSHOT.jar')
implementation files('lib/flink-streaming-java_2.11-1.12-SNAPSHOT.jar')
implementation files('lib/flink-connector-kafka-base_2.11-1.12-SNAPSHOT.jar')
implementation files('lib/flink-avro-1.12-SNAPSHOT.jar')
implementation files('lib/flink-annotations-1.12-SNAPSHOT.jar')
implementation files('lib/flink-runtime_2.11-1.12-SNAPSHOT.jar')
implementation files('lib/flink-shaded-asm-7-7.1-11.0.jar')
implementation files('lib/flink-metrics-core-1.12-SNAPSHOT.jar')
implementation files('lib/flink-optimizer_2.11-1.12-SNAPSHOT.jar')
implementation files('lib/flink-shaded-guava-18.0-11.0.jar')

implementation group: 'org.scala-lang', name: 'scala-library',
version: '2.12.12'
implementation group: 'org.apache.commons', name: 'commons-lang3',
version: '3.11'
implementation group: 'commons-collections', name:
'commons-collections', version: '3.2.2'
implementation group: 'com.esotericsoftware.kryo', name: 'kryo', version: '2.16'
implementation group: 'com.typesafe', name: 'config', version: '1.3.2'

implementation group: 'com.typesafe.akka', name: 'akka-remote_2.12',
version: '2.5.12'
implementation group: 'com.typesafe.akka', name: 'akka-slf4j_2.12',
version: '2.5.12'
implementation group: 'com.typesafe.akka', name: 'akka-stream_2.12',
version: '2.5.12'
implementation group: 'com.typesafe.akka', name: 'akka-protobuf_2.12',
version: '2.5.12'

Note: the row of "akka-slf4j" will automatically include "akka-actor" so I
don't need to add "akka-actor" explicitly.

Since Flink 1.12 will be released in Oct 2020, it makes sense for me to use
it for development to save a lot of workaround code dealing with the avro
1.8 serialization issue [1] in older Flink. However, this exception is
blocking me from doing so. Any idea is highly appreciated!

Thanks
Lian


[1] https://issues.apache.org/jira/browse/FLINK-19339

Re: Flink 1.12 snapshot throws ClassNotFoundException

Posted by Till Rohrmann <tr...@apache.org>.
Great to hear that it works now :-)

On Fri, Oct 2, 2020 at 2:17 AM Lian Jiang <ji...@gmail.com> wrote:

> Thanks Till.  Making the scala version consistent using 2.11 solved the
> ClassNotFoundException.
>
> On Tue, Sep 29, 2020 at 11:58 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Lian,
>>
>> I suspect that it is caused by an incompatible Akka version. Flink uses
>> Akka 2.5.21 instead of 2.5.12. Moreover, you are mixing Flink jars which
>> use Scala 2.11 with Akka dependencies which are built against Scala 2.12.
>>
>> I am not an Gradle expert but can't Gradle simply pull in the transitive
>> dependencies of flink-runtime?
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 30, 2020 at 2:22 AM Lian Jiang <ji...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I use Flink source master to build a snapshot and use the jars in my
>>> project. The goal is to avoid hacky deserialization code caused by avro 1.8
>>> in old Flink versions since Flink 1.12 uses avro 1.10. Unfortunately, the
>>> code throws below ClassNotFoundException. I have verified that the
>>> akka-actor jar 2.5.12 is available and specified in -classpath. I can even
>>> create an object using akka/serialization/NullSerializer class in my
>>> application, indicating there is no problem for this app to use any class
>>> under namespace akka/serialization.
>>>
>>> Caused by: java.lang.NoClassDefFoundError:
>>> akka/serialization/BaseSerializer$class
>>> at
>>> akka.remote.serialization.MiscMessageSerializer.<init>(MiscMessageSerializer.scala:25)
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>> at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>> at
>>> akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$1(ReflectiveDynamicAccess.scala:33)
>>> at scala.util.Try$.apply(Try.scala:213)
>>> at
>>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:28)
>>> at
>>> akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$4(ReflectiveDynamicAccess.scala:39)
>>> at scala.util.Success.flatMap(Try.scala:251)
>>> at
>>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:39)
>>> at akka.serialization.Serialization.serializerOf(Serialization.scala:320)
>>> at
>>> akka.serialization.Serialization.$anonfun$serializers$2(Serialization.scala:346)
>>> at
>>> scala.collection.TraversableLike$WithFilter.$anonfun$map$2(TraversableLike.scala:874)
>>> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:394)
>>> at
>>> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:721)
>>> at
>>> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:873)
>>> at akka.serialization.Serialization.<init>(Serialization.scala:346)
>>> at
>>> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:16)
>>> at
>>> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:13)
>>> at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:913)
>>> at
>>> akka.actor.ActorSystemImpl.$anonfun$loadExtensions$1(ActorSystem.scala:946)
>>> at scala.collection.Iterator.foreach(Iterator.scala:943)
>>> at scala.collection.Iterator.foreach$(Iterator.scala:943)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>>> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>>> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>>> at akka.actor.ActorSystemImpl.loadExtensions$1(ActorSystem.scala:944)
>>> at akka.actor.ActorSystemImpl.loadExtensions(ActorSystem.scala:961)
>>> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:833)
>>> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:823)
>>> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:823)
>>> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:842)
>>> at
>>> akka.actor.RobustActorSystem$.internalApply(RobustActorSystem.scala:96)
>>> at akka.actor.RobustActorSystem$.apply(RobustActorSystem.scala:70)
>>> at akka.actor.RobustActorSystem$.create(RobustActorSystem.scala:55)
>>> at
>>> org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:125)
>>> at
>>> org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
>>> at
>>> org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:276)
>>> at
>>> org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:260)
>>> ... 11 more
>>>
>>>
>>> This is my gradle:
>>>
>>> implementation files('lib/flink-avro-confluent-registry-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-clients_2.11-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-connector-kafka_2.11-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-connector-wikiedits_2.11-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-core-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-java-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-metrics-dropwizard-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-streaming-java_2.11-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-connector-kafka-base_2.11-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-avro-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-annotations-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-runtime_2.11-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-shaded-asm-7-7.1-11.0.jar')
>>> implementation files('lib/flink-metrics-core-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-optimizer_2.11-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-shaded-guava-18.0-11.0.jar')
>>>
>>> implementation group: 'org.scala-lang', name: 'scala-library', version: '2.12.12'
>>> implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.11'
>>> implementation group: 'commons-collections', name: 'commons-collections', version: '3.2.2'
>>> implementation group: 'com.esotericsoftware.kryo', name: 'kryo', version: '2.16'
>>> implementation group: 'com.typesafe', name: 'config', version: '1.3.2'
>>>
>>> implementation group: 'com.typesafe.akka', name: 'akka-remote_2.12', version: '2.5.12'
>>> implementation group: 'com.typesafe.akka', name: 'akka-slf4j_2.12', version: '2.5.12'
>>> implementation group: 'com.typesafe.akka', name: 'akka-stream_2.12', version: '2.5.12'
>>> implementation group: 'com.typesafe.akka', name: 'akka-protobuf_2.12', version: '2.5.12'
>>>
>>> Note: the row of "akka-slf4j" will automatically include "akka-actor" so
>>> I don't need to add "akka-actor" explicitly.
>>>
>>> Since Flink 1.12 will be released in Oct 2020, it makes sense for me to
>>> use it for development to save a lot of workaround code dealing with the
>>> avro 1.8 serialization issue [1] in older Flink. However, this exception is
>>> blocking me from doing so. Any idea is highly appreciated!
>>>
>>> Thanks
>>> Lian
>>>
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-19339
>>>
>>>
>>>
>
> --
>
> Create your own email signature
> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>

Re: Flink 1.12 snapshot throws ClassNotFoundException

Posted by Lian Jiang <ji...@gmail.com>.
Thanks Till.  Making the scala version consistent using 2.11 solved the
ClassNotFoundException.

On Tue, Sep 29, 2020 at 11:58 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Lian,
>
> I suspect that it is caused by an incompatible Akka version. Flink uses
> Akka 2.5.21 instead of 2.5.12. Moreover, you are mixing Flink jars which
> use Scala 2.11 with Akka dependencies which are built against Scala 2.12.
>
> I am not an Gradle expert but can't Gradle simply pull in the transitive
> dependencies of flink-runtime?
>
> Cheers,
> Till
>
> On Wed, Sep 30, 2020 at 2:22 AM Lian Jiang <ji...@gmail.com> wrote:
>
>> Hi,
>>
>> I use Flink source master to build a snapshot and use the jars in my
>> project. The goal is to avoid hacky deserialization code caused by avro 1.8
>> in old Flink versions since Flink 1.12 uses avro 1.10. Unfortunately, the
>> code throws below ClassNotFoundException. I have verified that the
>> akka-actor jar 2.5.12 is available and specified in -classpath. I can even
>> create an object using akka/serialization/NullSerializer class in my
>> application, indicating there is no problem for this app to use any class
>> under namespace akka/serialization.
>>
>> Caused by: java.lang.NoClassDefFoundError:
>> akka/serialization/BaseSerializer$class
>> at
>> akka.remote.serialization.MiscMessageSerializer.<init>(MiscMessageSerializer.scala:25)
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$1(ReflectiveDynamicAccess.scala:33)
>> at scala.util.Try$.apply(Try.scala:213)
>> at
>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:28)
>> at
>> akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$4(ReflectiveDynamicAccess.scala:39)
>> at scala.util.Success.flatMap(Try.scala:251)
>> at
>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:39)
>> at akka.serialization.Serialization.serializerOf(Serialization.scala:320)
>> at
>> akka.serialization.Serialization.$anonfun$serializers$2(Serialization.scala:346)
>> at
>> scala.collection.TraversableLike$WithFilter.$anonfun$map$2(TraversableLike.scala:874)
>> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:394)
>> at
>> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:721)
>> at
>> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:873)
>> at akka.serialization.Serialization.<init>(Serialization.scala:346)
>> at
>> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:16)
>> at
>> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:13)
>> at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:913)
>> at
>> akka.actor.ActorSystemImpl.$anonfun$loadExtensions$1(ActorSystem.scala:946)
>> at scala.collection.Iterator.foreach(Iterator.scala:943)
>> at scala.collection.Iterator.foreach$(Iterator.scala:943)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>> at akka.actor.ActorSystemImpl.loadExtensions$1(ActorSystem.scala:944)
>> at akka.actor.ActorSystemImpl.loadExtensions(ActorSystem.scala:961)
>> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:833)
>> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:823)
>> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:823)
>> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:842)
>> at akka.actor.RobustActorSystem$.internalApply(RobustActorSystem.scala:96)
>> at akka.actor.RobustActorSystem$.apply(RobustActorSystem.scala:70)
>> at akka.actor.RobustActorSystem$.create(RobustActorSystem.scala:55)
>> at
>> org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:125)
>> at
>> org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
>> at
>> org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:276)
>> at
>> org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:260)
>> ... 11 more
>>
>>
>> This is my gradle:
>>
>> implementation files('lib/flink-avro-confluent-registry-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-clients_2.11-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-connector-kafka_2.11-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-connector-wikiedits_2.11-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-core-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-java-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-metrics-dropwizard-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-streaming-java_2.11-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-connector-kafka-base_2.11-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-avro-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-annotations-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-runtime_2.11-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-shaded-asm-7-7.1-11.0.jar')
>> implementation files('lib/flink-metrics-core-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-optimizer_2.11-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-shaded-guava-18.0-11.0.jar')
>>
>> implementation group: 'org.scala-lang', name: 'scala-library', version: '2.12.12'
>> implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.11'
>> implementation group: 'commons-collections', name: 'commons-collections', version: '3.2.2'
>> implementation group: 'com.esotericsoftware.kryo', name: 'kryo', version: '2.16'
>> implementation group: 'com.typesafe', name: 'config', version: '1.3.2'
>>
>> implementation group: 'com.typesafe.akka', name: 'akka-remote_2.12', version: '2.5.12'
>> implementation group: 'com.typesafe.akka', name: 'akka-slf4j_2.12', version: '2.5.12'
>> implementation group: 'com.typesafe.akka', name: 'akka-stream_2.12', version: '2.5.12'
>> implementation group: 'com.typesafe.akka', name: 'akka-protobuf_2.12', version: '2.5.12'
>>
>> Note: the row of "akka-slf4j" will automatically include "akka-actor" so
>> I don't need to add "akka-actor" explicitly.
>>
>> Since Flink 1.12 will be released in Oct 2020, it makes sense for me to
>> use it for development to save a lot of workaround code dealing with the
>> avro 1.8 serialization issue [1] in older Flink. However, this exception is
>> blocking me from doing so. Any idea is highly appreciated!
>>
>> Thanks
>> Lian
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-19339
>>
>>
>>

-- 

Create your own email signature
<https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>

Re: Flink 1.12 snapshot throws ClassNotFoundException

Posted by Till Rohrmann <tr...@apache.org>.
Hi Lian,

I suspect that it is caused by an incompatible Akka version. Flink uses
Akka 2.5.21 instead of 2.5.12. Moreover, you are mixing Flink jars which
use Scala 2.11 with Akka dependencies which are built against Scala 2.12.

I am not an Gradle expert but can't Gradle simply pull in the transitive
dependencies of flink-runtime?

Cheers,
Till

On Wed, Sep 30, 2020 at 2:22 AM Lian Jiang <ji...@gmail.com> wrote:

> Hi,
>
> I use Flink source master to build a snapshot and use the jars in my
> project. The goal is to avoid hacky deserialization code caused by avro 1.8
> in old Flink versions since Flink 1.12 uses avro 1.10. Unfortunately, the
> code throws below ClassNotFoundException. I have verified that the
> akka-actor jar 2.5.12 is available and specified in -classpath. I can even
> create an object using akka/serialization/NullSerializer class in my
> application, indicating there is no problem for this app to use any class
> under namespace akka/serialization.
>
> Caused by: java.lang.NoClassDefFoundError:
> akka/serialization/BaseSerializer$class
> at
> akka.remote.serialization.MiscMessageSerializer.<init>(MiscMessageSerializer.scala:25)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$1(ReflectiveDynamicAccess.scala:33)
> at scala.util.Try$.apply(Try.scala:213)
> at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:28)
> at
> akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$4(ReflectiveDynamicAccess.scala:39)
> at scala.util.Success.flatMap(Try.scala:251)
> at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:39)
> at akka.serialization.Serialization.serializerOf(Serialization.scala:320)
> at
> akka.serialization.Serialization.$anonfun$serializers$2(Serialization.scala:346)
> at
> scala.collection.TraversableLike$WithFilter.$anonfun$map$2(TraversableLike.scala:874)
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:394)
> at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:721)
> at
> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:873)
> at akka.serialization.Serialization.<init>(Serialization.scala:346)
> at
> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:16)
> at
> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:13)
> at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:913)
> at
> akka.actor.ActorSystemImpl.$anonfun$loadExtensions$1(ActorSystem.scala:946)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at akka.actor.ActorSystemImpl.loadExtensions$1(ActorSystem.scala:944)
> at akka.actor.ActorSystemImpl.loadExtensions(ActorSystem.scala:961)
> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:833)
> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:823)
> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:823)
> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:842)
> at akka.actor.RobustActorSystem$.internalApply(RobustActorSystem.scala:96)
> at akka.actor.RobustActorSystem$.apply(RobustActorSystem.scala:70)
> at akka.actor.RobustActorSystem$.create(RobustActorSystem.scala:55)
> at
> org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:125)
> at
> org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
> at
> org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:276)
> at
> org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:260)
> ... 11 more
>
>
> This is my gradle:
>
> implementation files('lib/flink-avro-confluent-registry-1.12-SNAPSHOT.jar')
> implementation files('lib/flink-clients_2.11-1.12-SNAPSHOT.jar')
> implementation files('lib/flink-connector-kafka_2.11-1.12-SNAPSHOT.jar')
> implementation files('lib/flink-connector-wikiedits_2.11-1.12-SNAPSHOT.jar')
> implementation files('lib/flink-core-1.12-SNAPSHOT.jar')
> implementation files('lib/flink-java-1.12-SNAPSHOT.jar')
> implementation files('lib/flink-metrics-dropwizard-1.12-SNAPSHOT.jar')
> implementation files('lib/flink-streaming-java_2.11-1.12-SNAPSHOT.jar')
> implementation files('lib/flink-connector-kafka-base_2.11-1.12-SNAPSHOT.jar')
> implementation files('lib/flink-avro-1.12-SNAPSHOT.jar')
> implementation files('lib/flink-annotations-1.12-SNAPSHOT.jar')
> implementation files('lib/flink-runtime_2.11-1.12-SNAPSHOT.jar')
> implementation files('lib/flink-shaded-asm-7-7.1-11.0.jar')
> implementation files('lib/flink-metrics-core-1.12-SNAPSHOT.jar')
> implementation files('lib/flink-optimizer_2.11-1.12-SNAPSHOT.jar')
> implementation files('lib/flink-shaded-guava-18.0-11.0.jar')
>
> implementation group: 'org.scala-lang', name: 'scala-library', version: '2.12.12'
> implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.11'
> implementation group: 'commons-collections', name: 'commons-collections', version: '3.2.2'
> implementation group: 'com.esotericsoftware.kryo', name: 'kryo', version: '2.16'
> implementation group: 'com.typesafe', name: 'config', version: '1.3.2'
>
> implementation group: 'com.typesafe.akka', name: 'akka-remote_2.12', version: '2.5.12'
> implementation group: 'com.typesafe.akka', name: 'akka-slf4j_2.12', version: '2.5.12'
> implementation group: 'com.typesafe.akka', name: 'akka-stream_2.12', version: '2.5.12'
> implementation group: 'com.typesafe.akka', name: 'akka-protobuf_2.12', version: '2.5.12'
>
> Note: the row of "akka-slf4j" will automatically include "akka-actor" so I
> don't need to add "akka-actor" explicitly.
>
> Since Flink 1.12 will be released in Oct 2020, it makes sense for me to
> use it for development to save a lot of workaround code dealing with the
> avro 1.8 serialization issue [1] in older Flink. However, this exception is
> blocking me from doing so. Any idea is highly appreciated!
>
> Thanks
> Lian
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-19339
>
>
>