You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shkob1 <sh...@gmail.com> on 2018/10/22 21:47:25 UTC

Dynamically Generated Classes - Cannot load user class

Hey,

I'm trying to run a job which uses a dynamically generated class (through
Byte Buddy).
think of me having a complex schema as yaml text and generating a class from
it. Throughout the job i am using an artificial super class (MySuperClass)
of the generated class (as for example i need to specify the generic class
to extend RichMapFunction).



MyRichMapFunction<Y extends MySuperClass> extends RichMapFunction<Row, Y> is
introducing the dynamic class. It will take the yaml in the CTOR and:
1. open - takes the schema and converts it into a Pojo class which extends
MySuperClass
2. getProducedType - does the same thing in order to correctly send the Pojo
with all the right fields

So basically my job is something like

env.addSource([stream of pojos])
.filter(...)
... (register table, running a query which generates Rows)
.map(myRichMapFunction)
.returns(myRichMapFunction.getProducedType)
.addSink(...)

My trouble now is that, when running on a cluster the classloader fails to
load my generated class.
i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader to
use for Byte Buddy - but doesnt seem to be enough.

Was reading about it here:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html 
Is there a hook maybe to get called when a job is loaded so i can load the
class?


Stacktrace:

org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
user class: com....model.MyGeneratedClass
ClassLoader info: URL ClassLoader:
    file:
'/var/folders/f7/c4pvjrf902b6c73_tbzkxnjw0000gn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0'
(valid JAR)
Class not resolvable through given classloader.
	at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:264)
	at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
	at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
	at com.....MainClass.main(MainClass.java:46)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
	at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
	at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
	at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)
	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
	at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
	at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
	at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot load user class: com.....model.DynamicSchema
ClassLoader info: URL ClassLoader:
    file:
'/var/folders/f7/c4pvjrf902b6c73_tbzkxnjw0000gn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0'
(valid JAR)
Class not resolvable through given classloader.
	at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:99)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:273)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamically Generated Classes - Cannot load user class

Posted by shkob1 <sh...@gmail.com>.
OK I think i figured it out - not sure though exactly the reason:

It seems that i need to have a stream type - Generic Type of the super class
- rather than a Pojo of the concrete generated class. It seems like the
operation definition otherwise cannot load the Pojo class on the task
creation.
So - if i don't declare the map produced type as the concrete generated
class and then work around the keyby which cannot use a field name to a key
selector. 
Doing all of that seems to work. Will be happy to hear about the reason for
it more in depth if anyone knows.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamically Generated Classes - Cannot load user class

Posted by DDreyfus <da...@databasepub.com>.
Thinking out loud here:

I can't tell where the class load is failing.
The general model I've used with ByteBuddy in this scenario is very similar
to yours.
I subclass my superclass using ByteBuddy.
I inject the new class into a JAR that will be shared by the task managers.
I subclass the Flink classes such as RichMapFunction so that the template
parameters are accepted.
I would have to be careful in my subclassed functions to not instantiate
members that could not serialize.
This, too, gets injected into the JAR.
I would then instantiate all the classes and send them to the task managers.
The trick to getting the classes to load on the task manager was to make
sure all the JARs that they needed to reference were available. This would
include my temporary JAR containing ByteBuddy classes as well as any JARs
that weren't automatically distributed by Flink to the task managers, but
which were otherwise needed.

It's not clear from your note whether in dropping SQL your ByteBuddy
functions are getting called.
The last stack trace you produced indicates a problem other than finding or
loading bytebuddy objects.
Can you get the job to run without using Streams (ie, use Batch)?
Can you get the job to run without using ByteBuddy (ie, just execute the
SQL. Then use a hard-coded POJO. Then try adding bytebuddy.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamically Generated Classes - Cannot load user class

Posted by shkob1 <sh...@gmail.com>.
Update on this - if i just do empty mapping and drop the sql part, it works
just fine. i wonder if there's any class loading that needs to be done when
using SQL, not sure how i do that



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamically Generated Classes - Cannot load user class

Posted by shkob1 <sh...@gmail.com>.
After removing some operators (which i still need, but wanted to understand
where my issues are) i get a slightly different stacktrace (though still
same issue).

my current operators are 
1. a sql select with group by (returns retracted stream <Boolean,Row> )
2. filter (take only non retracted)
3. map (tuple to Row)
3. map (Row to MyGeneratedClass -> this implements the classloader load of
the generated class on open())


org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not
instantiate outputs in order.
	at
org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:398)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.createStreamRecordWriters(StreamTask.java:1165)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:214)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:193)
	at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:51)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at
org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1445)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:680)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.....model.MyGeneratedClass
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
	at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1716)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1556)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:561)
	at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.readObject(PojoSerializer.java:1038)
	at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at java.util.ArrayList.readObject(ArrayList.java:797)
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at java.util.ArrayList.readObject(ArrayList.java:797)
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at java.util.ArrayList.readObject(ArrayList.java:797)
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at java.util.ArrayList.readObject(ArrayList.java:797)
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at java.util.ArrayList.readObject(ArrayList.java:797)
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at java.util.ArrayList.readObject(ArrayList.java:797)
	at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502)
	at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489)
	at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477)
	at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:438)
	at
org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:395)
	... 11 more


according to the logs the Task itself contains all of those operators

2018-10-23 11:15:19,234 INFO  org.apache.flink.runtime.taskmanager.Task                    
- Registering task at network: groupBy: (...), select: (...) -> to: Tuple2
-> Filter -> Map -> my-query (1/1) (06b471ad99d77f0449c30641bed7cc88)
[DEPLOYING].



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamically Generated Classes - Cannot load user class

Posted by Shahar Cizer Kobrinsky <sh...@gmail.com>.
I have with no luck. I wonder though - do I need to load it only in the map function? I tried to add it in the open method of the sink function and the process function I have there too cause they still are using the type.. still no good. is there any way. is there a way of knowing which operator fails?
________________________________
From: Hequn Cheng <ch...@gmail.com>
Sent: Monday, October 22, 2018 6:33:52 PM
To: Shahar Cizer Kobrinsky
Cc: user
Subject: Re: Dynamically Generated Classes - Cannot load user class

Hi shkob

> i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader to use for Byte Buddy - but doesnt seem to be enough.
From the log, it seems that the user class can not be found in the classloader.
Cannot load user class: com....model.MyGeneratedClass
Have you ever tried Thread.currentThread().getContextClassLoader(), which should have the user-code ClassLoader.

Best, Hequn

On Tue, Oct 23, 2018 at 5:47 AM shkob1 <sh...@gmail.com>> wrote:
Hey,

I'm trying to run a job which uses a dynamically generated class (through
Byte Buddy).
think of me having a complex schema as yaml text and generating a class from
it. Throughout the job i am using an artificial super class (MySuperClass)
of the generated class (as for example i need to specify the generic class
to extend RichMapFunction).



MyRichMapFunction<Y extends MySuperClass> extends RichMapFunction<Row, Y> is
introducing the dynamic class. It will take the yaml in the CTOR and:
1. open - takes the schema and converts it into a Pojo class which extends
MySuperClass
2. getProducedType - does the same thing in order to correctly send the Pojo
with all the right fields

So basically my job is something like

env.addSource([stream of pojos])
.filter(...)
... (register table, running a query which generates Rows)
.map(myRichMapFunction)
.returns(myRichMapFunction.getProducedType)
.addSink(...)

My trouble now is that, when running on a cluster the classloader fails to
load my generated class.
i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader to
use for Byte Buddy - but doesnt seem to be enough.

Was reading about it here:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
Is there a hook maybe to get called when a job is loaded so i can load the
class?


Stacktrace:

org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
user class: com....model.MyGeneratedClass
ClassLoader info: URL ClassLoader:
    file:
'/var/folders/f7/c4pvjrf902b6c73_tbzkxnjw0000gn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0'
(valid JAR)
Class not resolvable through given classloader.
        at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:264)
        at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
        at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
        at com.....MainClass.main(MainClass.java:46)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
        at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
        at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot load user class: com.....model.DynamicSchema
ClassLoader info: URL ClassLoader:
    file:
'/var/folders/f7/c4pvjrf902b6c73_tbzkxnjw0000gn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0'
(valid JAR)
Class not resolvable through given classloader.
        at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:99)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:273)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:748)





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamically Generated Classes - Cannot load user class

Posted by Hequn Cheng <ch...@gmail.com>.
Hi shkob

> i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader
to use for Byte Buddy - but doesnt seem to be enough.
From the log, it seems that the user class can not be found in the
classloader.

> Cannot load user class: com....model.MyGeneratedClass

Have you ever tried Thread.currentThread().getContextClassLoader(),
which should have the user-code ClassLoader.

Best, Hequn

On Tue, Oct 23, 2018 at 5:47 AM shkob1 <sh...@gmail.com> wrote:

> Hey,
>
> I'm trying to run a job which uses a dynamically generated class (through
> Byte Buddy).
> think of me having a complex schema as yaml text and generating a class
> from
> it. Throughout the job i am using an artificial super class (MySuperClass)
> of the generated class (as for example i need to specify the generic class
> to extend RichMapFunction).
>
>
>
> MyRichMapFunction<Y extends MySuperClass> extends RichMapFunction<Row, Y>
> is
> introducing the dynamic class. It will take the yaml in the CTOR and:
> 1. open - takes the schema and converts it into a Pojo class which extends
> MySuperClass
> 2. getProducedType - does the same thing in order to correctly send the
> Pojo
> with all the right fields
>
> So basically my job is something like
>
> env.addSource([stream of pojos])
> .filter(...)
> ... (register table, running a query which generates Rows)
> .map(myRichMapFunction)
> .returns(myRichMapFunction.getProducedType)
> .addSink(...)
>
> My trouble now is that, when running on a cluster the classloader fails to
> load my generated class.
> i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader
> to
> use for Byte Buddy - but doesnt seem to be enough.
>
> Was reading about it here:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
> Is there a hook maybe to get called when a job is loaded so i can load the
> class?
>
>
> Stacktrace:
>
> org.apache.flink.client.program.ProgramInvocationException:
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
> user class: com....model.MyGeneratedClass
> ClassLoader info: URL ClassLoader:
>     file:
>
> '/var/folders/f7/c4pvjrf902b6c73_tbzkxnjw0000gn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0'
> (valid JAR)
> Class not resolvable through given classloader.
>         at
>
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:264)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
>         at
>
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>         at com.....MainClass.main(MainClass.java:46)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>         at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
>         at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)
>         at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
>         at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
>         at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
>         at
>
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot load user class: com.....model.DynamicSchema
> ClassLoader info: URL ClassLoader:
>     file:
>
> '/var/folders/f7/c4pvjrf902b6c73_tbzkxnjw0000gn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0'
> (valid JAR)
> Class not resolvable through given classloader.
>         at
>
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:99)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:273)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>         at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>