You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Praneeth Ramesh <sr...@gmail.com> on 2021/09/01 21:38:25 UTC

Seeing Exception ClassNotFoundException: __wrapper while running in Kubernetes Cluster

Hi All

I am trying to run a flink scala application which reads from kafka apply some lookup transformations and then writes to kafka. 

I am using Flink Version 1.12.1

I tested it in local and it works fine. But when I try to run it on cluster using native kubernetes integration I see weird errors like below.

The cluster also looks fine, because I tried to run a wordcount application on the cluster and it worked fine.

The exception is not clear and also the stacktrace shows the taskmanager stack trace and hence no idea where in the application the problem could be. Could this be a serialization issue? Is there a way to debug such issues and find the actual point in application code where there is a problem?

```org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate serializer.
        at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:216) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createStreamOutput(OperatorChain.java:664) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainOutputs(OperatorChain.java:250) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:160) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.io.IOException: unexpected exception type
        at java.io.ObjectStreamClass.throwMiscException(Unknown Source) ~[?:?]
        at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        ... 8 more
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassNotFoundException: __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
        at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3557) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2302) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2289) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
        at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        ... 8 more
Caused by: java.lang.ClassNotFoundException: __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
        at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:64) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
        at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
        at java.lang.Class.forName0(Native Method) ~[?:?]
        at java.lang.Class.forName(Unknown Source) ~[?:?]
        at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.compile(ToolBoxFactory.scala:261) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.$anonfun$compile$13(ToolBoxFactory.scala:433) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$withCompilerApi$.apply(ToolBoxFactory.scala:359) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.compile(ToolBoxFactory.scala:426) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.compileCbfInternal(TraversableSerializer.scala:230) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:220) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:216) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
        at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        ... 8 more```


Re: Seeing Exception ClassNotFoundException: __wrapper while running in Kubernetes Cluster

Posted by Praneeth Ramesh <sr...@gmail.com>.
Hi Arvid and Nicolaus

Thanks for the reply.
I don't think this is an issue with the user code. I'm pretty sure that the
code is serializable.
I have not used any custom serializer. And the serialization configs are
not overridden.
My guess was the problem was with order of classloading.

I am deploying in application mode. Before I had my user code jar and
dependent libs in /opt/flink/usrlib.
I tried to follow this guide
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode

and used /usrlib to put it in the jars. Not sure why it was not working for
my case. I was seeing the error above.

Now I changed to /opt/flink/lib and user code and dependencies with flink
libs are in the same dir inside the docker image.

Changed
`COPY .dist/poc-flink-0.0.1.jar $FLINK_HOME/usrlib/poc-flink.jar`
to
`COPY .dist/poc-flink-0.0.1.jar $FLINK_HOME/lib/poc-flink.jar`
So the same classloader is used.
And it solved the problem.

Wondering if any of you have tried bundling user code in $FLINK_HOME/usrlib in
application mode?


Regards
Praneeth Ramesh

On Mon, Sep 6, 2021 at 9:28 AM Arvid Heise <ar...@apache.org> wrote:

> This looks like you hold a reference to some outer class or
> non-serializable class. Make sure that your user function is minimal (e.g.
> static or top-level class) and all fields really need to be serialized
> (that excludes all caches).
>
> On Thu, Sep 2, 2021 at 3:13 PM Nicolaus Weidner <
> nicolaus.weidner@ververica.com> wrote:
>
>> Hi Praneeth,
>>
>> It does look like a failure constructing the serializer. Can you share
>> the serialization config you use for the Kafka producer? In particular, are
>> you using a custom serializer?
>> Do you use any custom classloading configuration?
>>
>> Best regards,
>> Nico
>>
>> On Wed, Sep 1, 2021 at 11:38 PM Praneeth Ramesh <sr...@gmail.com>
>> wrote:
>>
>>> Hi All
>>>
>>> I am trying to run a flink scala application which reads from kafka
>>> apply some lookup transformations and then writes to kafka.
>>>
>>> I am using Flink Version 1.12.1
>>>
>>> I tested it in local and it works fine. But when I try to run it on
>>> cluster using native kubernetes integration I see weird errors like below.
>>>
>>> The cluster also looks fine, because I tried to run a wordcount
>>> application on the cluster and it worked fine.
>>>
>>> The exception is not clear and also the stacktrace shows the taskmanager
>>> stack trace and hence no idea where in the application the problem could
>>> be. Could this be a serialization issue? Is there a way to debug such
>>> issues and find the actual point in application code where there is a
>>> problem?
>>>
>>> ```org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could
>>> not instantiate serializer.
>>>         at
>>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:216)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createStreamOutput(OperatorChain.java:664)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainOutputs(OperatorChain.java:250)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:160)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>> [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>> [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at java.lang.Thread.run(Unknown Source) [?:?]
>>> Caused by: java.io.IOException: unexpected exception type
>>>         at java.io.ObjectStreamClass.throwMiscException(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readSerialData(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readSerialData(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>>>         at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         ... 8 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.ClassNotFoundException:
>>> __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3557)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2302)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2289)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown
>>> Source) ~[?:?]
>>>         at
>>> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>> ~[?:?]
>>>         at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>>>         at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readSerialData(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readSerialData(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>>>         at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         ... 8 more
>>> Caused by: java.lang.ClassNotFoundException:
>>> __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
>>>         at
>>> scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:64)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>>         at java.lang.Class.forName0(Native Method) ~[?:?]
>>>         at java.lang.Class.forName(Unknown Source) ~[?:?]
>>>         at
>>> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.compile(ToolBoxFactory.scala:261)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.$anonfun$compile$13(ToolBoxFactory.scala:433)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$withCompilerApi$.apply(ToolBoxFactory.scala:359)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.compile(ToolBoxFactory.scala:426)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.compileCbfInternal(TraversableSerializer.scala:230)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:220)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:216)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown
>>> Source) ~[?:?]
>>>         at
>>> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>> ~[?:?]
>>>         at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>>>         at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readSerialData(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readSerialData(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>> ~[?:?]
>>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>>>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>>>         at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         at
>>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214)
>>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>>         ... 8 more```
>>>
>>>

-- 
Regards
Praneeth Ramesh

Re: Seeing Exception ClassNotFoundException: __wrapper while running in Kubernetes Cluster

Posted by Arvid Heise <ar...@apache.org>.
This looks like you hold a reference to some outer class or
non-serializable class. Make sure that your user function is minimal (e.g.
static or top-level class) and all fields really need to be serialized
(that excludes all caches).

On Thu, Sep 2, 2021 at 3:13 PM Nicolaus Weidner <
nicolaus.weidner@ververica.com> wrote:

> Hi Praneeth,
>
> It does look like a failure constructing the serializer. Can you share the
> serialization config you use for the Kafka producer? In particular, are you
> using a custom serializer?
> Do you use any custom classloading configuration?
>
> Best regards,
> Nico
>
> On Wed, Sep 1, 2021 at 11:38 PM Praneeth Ramesh <sr...@gmail.com>
> wrote:
>
>> Hi All
>>
>> I am trying to run a flink scala application which reads from kafka apply
>> some lookup transformations and then writes to kafka.
>>
>> I am using Flink Version 1.12.1
>>
>> I tested it in local and it works fine. But when I try to run it on
>> cluster using native kubernetes integration I see weird errors like below.
>>
>> The cluster also looks fine, because I tried to run a wordcount
>> application on the cluster and it worked fine.
>>
>> The exception is not clear and also the stacktrace shows the taskmanager
>> stack trace and hence no idea where in the application the problem could
>> be. Could this be a serialization issue? Is there a way to debug such
>> issues and find the actual point in application code where there is a
>> problem?
>>
>> ```org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could
>> not instantiate serializer.
>>         at
>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:216)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createStreamOutput(OperatorChain.java:664)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainOutputs(OperatorChain.java:250)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:160)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at java.lang.Thread.run(Unknown Source) [?:?]
>> Caused by: java.io.IOException: unexpected exception type
>>         at java.io.ObjectStreamClass.throwMiscException(Unknown Source)
>> ~[?:?]
>>         at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
>> ~[?:?]
>>         at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>> ~[?:?]
>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>> ~[?:?]
>>         at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>> ~[?:?]
>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>>         at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         ... 8 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.ClassNotFoundException:
>> __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3557)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2302)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2289)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown
>> Source) ~[?:?]
>>         at
>> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>> ~[?:?]
>>         at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>>         at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
>> ~[?:?]
>>         at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>> ~[?:?]
>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>> ~[?:?]
>>         at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>> ~[?:?]
>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>>         at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         ... 8 more
>> Caused by: java.lang.ClassNotFoundException:
>> __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
>>         at
>> scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:64)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>         at java.lang.Class.forName0(Native Method) ~[?:?]
>>         at java.lang.Class.forName(Unknown Source) ~[?:?]
>>         at
>> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.compile(ToolBoxFactory.scala:261)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.$anonfun$compile$13(ToolBoxFactory.scala:433)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$withCompilerApi$.apply(ToolBoxFactory.scala:359)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.compile(ToolBoxFactory.scala:426)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.compileCbfInternal(TraversableSerializer.scala:230)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:220)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:216)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown
>> Source) ~[?:?]
>>         at
>> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>> ~[?:?]
>>         at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>>         at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
>> ~[?:?]
>>         at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>> ~[?:?]
>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>> ~[?:?]
>>         at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>> ~[?:?]
>>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>>         at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         at
>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214)
>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>         ... 8 more```
>>
>>

Re: Seeing Exception ClassNotFoundException: __wrapper while running in Kubernetes Cluster

Posted by Nicolaus Weidner <ni...@ververica.com>.
Hi Praneeth,

It does look like a failure constructing the serializer. Can you share the
serialization config you use for the Kafka producer? In particular, are you
using a custom serializer?
Do you use any custom classloading configuration?

Best regards,
Nico

On Wed, Sep 1, 2021 at 11:38 PM Praneeth Ramesh <sr...@gmail.com>
wrote:

> Hi All
>
> I am trying to run a flink scala application which reads from kafka apply
> some lookup transformations and then writes to kafka.
>
> I am using Flink Version 1.12.1
>
> I tested it in local and it works fine. But when I try to run it on
> cluster using native kubernetes integration I see weird errors like below.
>
> The cluster also looks fine, because I tried to run a wordcount
> application on the cluster and it worked fine.
>
> The exception is not clear and also the stacktrace shows the taskmanager
> stack trace and hence no idea where in the application the problem could
> be. Could this be a serialization issue? Is there a way to debug such
> issues and find the actual point in application code where there is a
> problem?
>
> ```org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not
> instantiate serializer.
>         at
> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:216)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createStreamOutput(OperatorChain.java:664)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainOutputs(OperatorChain.java:250)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:160)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: java.io.IOException: unexpected exception type
>         at java.io.ObjectStreamClass.throwMiscException(Unknown Source)
> ~[?:?]
>         at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
> ~[?:?]
>         at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> ~[?:?]
>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
> ~[?:?]
>         at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> ~[?:?]
>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         ... 8 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.ClassNotFoundException:
> __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
>         at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3557)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2302)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2289)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown
> Source) ~[?:?]
>         at
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> ~[?:?]
>         at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>         at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
> ~[?:?]
>         at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> ~[?:?]
>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
> ~[?:?]
>         at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> ~[?:?]
>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         ... 8 more
> Caused by: java.lang.ClassNotFoundException:
> __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
>         at
> scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:64)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>         at java.lang.Class.forName0(Native Method) ~[?:?]
>         at java.lang.Class.forName(Unknown Source) ~[?:?]
>         at
> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.compile(ToolBoxFactory.scala:261)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.$anonfun$compile$13(ToolBoxFactory.scala:433)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$withCompilerApi$.apply(ToolBoxFactory.scala:359)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.compile(ToolBoxFactory.scala:426)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.compileCbfInternal(TraversableSerializer.scala:230)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:220)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:216)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown
> Source) ~[?:?]
>         at
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> ~[?:?]
>         at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>         at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
> ~[?:?]
>         at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> ~[?:?]
>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
> ~[?:?]
>         at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> ~[?:?]
>         at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>         at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at
> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         ... 8 more```
>
>