You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Joel Edwards <jo...@ed-craft.com> on 2021/09/07 14:31:21 UTC

User defined function (UDF) not sent when submitting job to session cluster

Good day,

I have been attempting to submit a job to a session cluster. This job
involves a pair of dynamic tables and a SQL query. The SQL query is calling
a UDF which I register via the table API's createTemporarySystemFunction()
method. The job runs locally, but when I attempt to submit it to a remote
session cluster, the job fails with the error:

`Cannot load user class: <fully-qualified-class-name>`

If I place a fat jar containing all of my local dependencies on the
JobManagers and TaskManagers, the UDF will be loaded. However, I would
expect the UDF to be serialized and sent with the rest of the job. I have
looked over the UDF documentation, and I don't see a reason why it would
not be serialized with the rest of the job. However, seeing as there is no
error related to serializing the UDF, my assumptions related to UDF
serialization must be incorrect. Is there a hint I can use to cause the
closure cleaner to identify the UDF for serialization? I suspect the reason
it is not being included is that it is referenced only in the SQL query,
and not streams feeding the input table or the stream consuming the output
table.

Summary of questions:
- Will UDF be serialized with the job? Or are they never included?
- Is it possible to hint at what should be serialized and sent along with
the job?

Thank you,
Joel


-- 
Joel Edwards
Software Architect
Ed-Craft Software Solutions

Re: User defined function (UDF) not sent when submitting job to session cluster

Posted by Joel Edwards <jo...@ed-craft.com>.
Hi Dawid,

Thanks again for replying and for confirming the behavior.

Best regards,
Joel


On Tue, Sep 7, 2021 at 1:35 PM Dawid Wysakowicz <dw...@apache.org>
wrote:

> Huh, of course. Actually I was too quick with my answer. Even if it is
> serialized with the JobGraph, the class is necessary on TMs to be
> deserialized. That's how java serialization works after all.
>
> So the actual answer, it is serialized with the JobGraph. The class is
> mandatory for deserialization.
>
> Best,
>
> Dawid
> On 07/09/2021 16:31, Joel Edwards wrote:
>
> Good day,
>
> I have been attempting to submit a job to a session cluster. This job
> involves a pair of dynamic tables and a SQL query. The SQL query is calling
> a UDF which I register via the table API's createTemporarySystemFunction()
> method. The job runs locally, but when I attempt to submit it to a remote
> session cluster, the job fails with the error:
>
> `Cannot load user class: <fully-qualified-class-name>`
>
> If I place a fat jar containing all of my local dependencies on the
> JobManagers and TaskManagers, the UDF will be loaded. However, I would
> expect the UDF to be serialized and sent with the rest of the job. I have
> looked over the UDF documentation, and I don't see a reason why it would
> not be serialized with the rest of the job. However, seeing as there is no
> error related to serializing the UDF, my assumptions related to UDF
> serialization must be incorrect. Is there a hint I can use to cause the
> closure cleaner to identify the UDF for serialization? I suspect the reason
> it is not being included is that it is referenced only in the SQL query,
> and not streams feeding the input table or the stream consuming the output
> table.
>
> Summary of questions:
> - Will UDF be serialized with the job? Or are they never included?
> - Is it possible to hint at what should be serialized and sent along with
> the job?
>
> Thank you,
> Joel
>
>
> --
> Joel Edwards
> Software Architect
> Ed-Craft Software Solutions
>
>

-- 
Joel Edwards
Software Architect
Ed-Craft Software Solutions

Re: User defined function (UDF) not sent when submitting job to session cluster

Posted by Dawid Wysakowicz <dw...@apache.org>.
Huh, of course. Actually I was too quick with my answer. Even if it is
serialized with the JobGraph, the class is necessary on TMs to be
deserialized. That's how java serialization works after all.

So the actual answer, it is serialized with the JobGraph. The class is
mandatory for deserialization.

Best,

Dawid

On 07/09/2021 16:31, Joel Edwards wrote:
> Good day,
>
> I have been attempting to submit a job to a session cluster. This job
> involves a pair of dynamic tables and a SQL query. The SQL query is
> calling a UDF which I register via the table API's
> createTemporarySystemFunction() method. The job runs locally, but when
> I attempt to submit it to a remote session cluster, the job fails with
> the error:
>
> `Cannot load user class: <fully-qualified-class-name>`
>
> If I place a fat jar containing all of my local dependencies on the
> JobManagers and TaskManagers, the UDF will be loaded. However, I would
> expect the UDF to be serialized and sent with the rest of the job. I
> have looked over the UDF documentation, and I don't see a reason why
> it would not be serialized with the rest of the job. However, seeing
> as there is no error related to serializing the UDF, my assumptions
> related to UDF serialization must be incorrect. Is there a hint I can
> use to cause the closure cleaner to identify the UDF for
> serialization? I suspect the reason it is not being included is that
> it is referenced only in the SQL query, and not streams feeding the
> input table or the stream consuming the output table.
>
> Summary of questions:
> - Will UDF be serialized with the job? Or are they never included?
> - Is it possible to hint at what should be serialized and sent along
> with the job?
>
> Thank you,
> Joel
>
>
> -- 
> Joel Edwards
> Software Architect
> Ed-Craft Software Solutions

Re: User defined function (UDF) not sent when submitting job to session cluster

Posted by Joel Edwards <jo...@ed-craft.com>.
Hi Dawid,

Thank you for the response.

I create a remote StreamExecutionEnvironment, wrap a new
TableExecutionEnvironment around it, build the job, then execute it via
executeAsync().

It sounds like I cannot send the serialized job graph to the session
cluster's JobManager via the web API. The above approach does work to
submit a job, but only if I have my source bundled in a jar on the
JobManager. I am building the job using the remote environment, and it gets
to the point of initialization, but fails on the TaskManager nodes since
the UDF cannot be located.

It is surprising that the TaskManager is throwing the error loading my UDF
class. It seems like the session cluster's JobManager must be forwarding my
job graph (submitted to its web API) since it has no knowledge of the UDF
class either.

I will include the stacktrace at the bottom of this e-mail in case that is
helpful.

Is there a way I can capture and send the serialized job graph?

Thanks again!
Joel



Stack trace:


org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
user class: com.buzuli.example.MyUDF
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:159)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: com.buzuli.example.MyUDF
at java.base/java.net.URLClassLoader.findClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Unknown Source)
at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readArray(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
... 7 more







On Tue, Sep 7, 2021 at 12:03 Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hi Joel,
>
> There is a few uncertainties in my reply so I am including Timo who should
> be able to confirm or deny what I wrote.
>
> Generally speaking once a JobGraph is created it should contain the UDF. I
> might be wrong here, but the UDF must be available while the JobGraph is
> created. I believe you're submitting the job via the `flink run` command.
> Is that right? Therefore the UDF must be available on the classpath of this
> client, as that is the moment the JobGraph is create. If you use the web ui
> to submit the job, then the JobGraph is created on the JobManager and it
> must be available there.
>
> Best,
>
> Dawid
> On 07/09/2021 16:31, Joel Edwards wrote:
>
> Good day,
>
> I have been attempting to submit a job to a session cluster. This job
> involves a pair of dynamic tables and a SQL query. The SQL query is calling
> a UDF which I register via the table API's createTemporarySystemFunction()
> method. The job runs locally, but when I attempt to submit it to a remote
> session cluster, the job fails with the error:
>
> `Cannot load user class: <fully-qualified-class-name>`
>
> If I place a fat jar containing all of my local dependencies on the
> JobManagers and TaskManagers, the UDF will be loaded. However, I would
> expect the UDF to be serialized and sent with the rest of the job. I have
> looked over the UDF documentation, and I don't see a reason why it would
> not be serialized with the rest of the job. However, seeing as there is no
> error related to serializing the UDF, my assumptions related to UDF
> serialization must be incorrect. Is there a hint I can use to cause the
> closure cleaner to identify the UDF for serialization? I suspect the reason
> it is not being included is that it is referenced only in the SQL query,
> and not streams feeding the input table or the stream consuming the output
> table.
>
> Summary of questions:
> - Will UDF be serialized with the job? Or are they never included?
> - Is it possible to hint at what should be serialized and sent along with
> the job?
>
> Thank you,
> Joel
>
>
> --
> Joel Edwards
> Software Architect
> Ed-Craft Software Solutions
>
>

Re: User defined function (UDF) not sent when submitting job to session cluster

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Joel,

There is a few uncertainties in my reply so I am including Timo who
should be able to confirm or deny what I wrote.

Generally speaking once a JobGraph is created it should contain the UDF.
I might be wrong here, but the UDF must be available while the JobGraph
is created. I believe you're submitting the job via the `flink run`
command. Is that right? Therefore the UDF must be available on the
classpath of this client, as that is the moment the JobGraph is create.
If you use the web ui to submit the job, then the JobGraph is created on
the JobManager and it must be available there.

Best,

Dawid

On 07/09/2021 16:31, Joel Edwards wrote:
> Good day,
>
> I have been attempting to submit a job to a session cluster. This job
> involves a pair of dynamic tables and a SQL query. The SQL query is
> calling a UDF which I register via the table API's
> createTemporarySystemFunction() method. The job runs locally, but when
> I attempt to submit it to a remote session cluster, the job fails with
> the error:
>
> `Cannot load user class: <fully-qualified-class-name>`
>
> If I place a fat jar containing all of my local dependencies on the
> JobManagers and TaskManagers, the UDF will be loaded. However, I would
> expect the UDF to be serialized and sent with the rest of the job. I
> have looked over the UDF documentation, and I don't see a reason why
> it would not be serialized with the rest of the job. However, seeing
> as there is no error related to serializing the UDF, my assumptions
> related to UDF serialization must be incorrect. Is there a hint I can
> use to cause the closure cleaner to identify the UDF for
> serialization? I suspect the reason it is not being included is that
> it is referenced only in the SQL query, and not streams feeding the
> input table or the stream consuming the output table.
>
> Summary of questions:
> - Will UDF be serialized with the job? Or are they never included?
> - Is it possible to hint at what should be serialized and sent along
> with the job?
>
> Thank you,
> Joel
>
>
> -- 
> Joel Edwards
> Software Architect
> Ed-Craft Software Solutions