You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Pouria Pirzadeh <po...@gmail.com> on 2021/12/16 19:42:45 UTC

Flink fails to load class from configured classpath using PipelineOptions

I am developing a Java application which uses UDFs on Flink 1.14.
It uses PipelineOptions.JARS config to add jar files, containing UDF
classes, dynamically to the user classpath in the main method; However the
application fails to load UDF class from configured jar files at job launch
time with and crashes with ClassNotFoundException.

Is PipelineOptions.JARS the correct option to add files to classpath on Job
manager and all task managers?

Sample code snippet:

final Configuration configuration = new Configuration();
configuration.set(PipelineOptions.JARS,Collections.singletonList("file:///path/to/udf.jar"));
StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
...
Class udfClass = Class.forName("demo.MyUDF", ...);
tableEnv.createTemporarySystemFunction("MyUDF", udfClass);
...

Error stack trace:
Exception in thread "main" java.lang.ClassNotFoundException: demo.MyUDF
    at
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582)
    at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at
java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1886)
    at
java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1772)
    at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
    at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594)
    at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
    at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
    at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
    at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
    at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:692)
    at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:714)
    at
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:130)
    at
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:116)
    at
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73)
    at
org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81)
    at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825)
    at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503)
    at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
    at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
    at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:437)
    at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:432)
    at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:356)
    ...

Re: Flink fails to load class from configured classpath using PipelineOptions

Posted by Yang Wang <da...@gmail.com>.
Yes. You need to set the "pipeline.classpath" via flink-conf.yaml or CLI
options(-C/--classpath).
I do not think setting it in your main class could work. Just like you
said, the user classloader will not be updated after the user main class is
executed.

Best,
Yang

Pouria Pirzadeh <po...@gmail.com> 于2021年12月18日周六 01:23写道:

> I have tried 'PipelineOptions.CLASSPATHS'; It also fails with
> ClassNotFoundException with the exact same error stack trace as
> PipelineOptions.JARS.
>
> FYI The Same application jar works fine if submitted via Flink CLI using
> 'flink run' with the "-C" option to update classpath:
> <FLINK_HOME>/bin/flink run --detached -C file:///path/to/udf.jar ....
>
> The problem seems to be that the classpath for the ClassLoader which
> codegen in table planner uses is not updated according to Configuration
> passed to the StreamExecutionEnvironment, and I am not sure how that can
> be done.
>
> Pouria
>
>
> On Thu, Dec 16, 2021 at 8:46 PM Yang Wang <da...@gmail.com> wrote:
>
>> The config option "pipeline.jars" is used to specify the user jar, which
>> contains the main class.
>> I think what you need is "pipeline.classpaths".
>>
>> /**
>>  * A list of URLs that are added to the classpath of each user code classloader of the program.
>>  * Paths must specify a protocol (e.g. file://) and be accessible on all nodes
>>  */
>> public static final ConfigOption<List<String>> CLASSPATHS =
>>         key("pipeline.classpaths")
>>                 .stringType()
>>                 .asList()
>>                 .noDefaultValue()
>>                 .withDescription(
>>                         "A semicolon-separated list of the classpaths to package with the job jars to be sent to"
>>                                 + " the cluster. These have to be valid URLs.");
>>
>>
>> Best,
>> Yang
>>
>> Pouria Pirzadeh <po...@gmail.com> 于2021年12月17日周五 03:43写道:
>>
>>> I am developing a Java application which uses UDFs on Flink 1.14.
>>> It uses PipelineOptions.JARS config to add jar files, containing UDF
>>> classes, dynamically to the user classpath in the main method; However the
>>> application fails to load UDF class from configured jar files at job
>>> launch time with and crashes with ClassNotFoundException.
>>>
>>> Is PipelineOptions.JARS the correct option to add files to classpath on
>>> Job manager and all task managers?
>>>
>>> Sample code snippet:
>>>
>>> final Configuration configuration = new Configuration();
>>>
>>> configuration.set(PipelineOptions.JARS,Collections.singletonList("file:///path/to/udf.jar"));
>>> StreamExecutionEnvironment streamEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>>> StreamTableEnvironment tableEnv =
>>> StreamTableEnvironment.create(streamEnv);
>>> ...
>>> Class udfClass = Class.forName("demo.MyUDF", ...);
>>> tableEnv.createTemporarySystemFunction("MyUDF", udfClass);
>>> ...
>>>
>>> Error stack trace:
>>> Exception in thread "main" java.lang.ClassNotFoundException: demo.MyUDF
>>>     at
>>> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582)
>>>     at
>>> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>>>     at
>>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>>     at
>>> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1886)
>>>     at
>>> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1772)
>>>     at
>>> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>>>     at
>>> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594)
>>>     at
>>> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
>>>     at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>>>     at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>>>     at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>>>     at
>>> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:692)
>>>     at
>>> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:714)
>>>     at
>>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:130)
>>>     at
>>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:116)
>>>     at
>>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73)
>>>     at
>>> org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81)
>>>     at
>>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825)
>>>     at
>>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503)
>>>     at
>>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
>>>     at
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
>>>     at
>>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:437)
>>>     at
>>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:432)
>>>     at
>>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:356)
>>>     ...
>>>
>>

Re: Flink fails to load class from configured classpath using PipelineOptions

Posted by Pouria Pirzadeh <po...@gmail.com>.
I have tried 'PipelineOptions.CLASSPATHS'; It also fails with
ClassNotFoundException with the exact same error stack trace as
PipelineOptions.JARS.

FYI The Same application jar works fine if submitted via Flink CLI using
'flink run' with the "-C" option to update classpath:
<FLINK_HOME>/bin/flink run --detached -C file:///path/to/udf.jar ....

The problem seems to be that the classpath for the ClassLoader which
codegen in table planner uses is not updated according to Configuration
passed to the StreamExecutionEnvironment, and I am not sure how that can be
done.

Pouria


On Thu, Dec 16, 2021 at 8:46 PM Yang Wang <da...@gmail.com> wrote:

> The config option "pipeline.jars" is used to specify the user jar, which
> contains the main class.
> I think what you need is "pipeline.classpaths".
>
> /**
>  * A list of URLs that are added to the classpath of each user code classloader of the program.
>  * Paths must specify a protocol (e.g. file://) and be accessible on all nodes
>  */
> public static final ConfigOption<List<String>> CLASSPATHS =
>         key("pipeline.classpaths")
>                 .stringType()
>                 .asList()
>                 .noDefaultValue()
>                 .withDescription(
>                         "A semicolon-separated list of the classpaths to package with the job jars to be sent to"
>                                 + " the cluster. These have to be valid URLs.");
>
>
> Best,
> Yang
>
> Pouria Pirzadeh <po...@gmail.com> 于2021年12月17日周五 03:43写道:
>
>> I am developing a Java application which uses UDFs on Flink 1.14.
>> It uses PipelineOptions.JARS config to add jar files, containing UDF
>> classes, dynamically to the user classpath in the main method; However the
>> application fails to load UDF class from configured jar files at job
>> launch time with and crashes with ClassNotFoundException.
>>
>> Is PipelineOptions.JARS the correct option to add files to classpath on
>> Job manager and all task managers?
>>
>> Sample code snippet:
>>
>> final Configuration configuration = new Configuration();
>>
>> configuration.set(PipelineOptions.JARS,Collections.singletonList("file:///path/to/udf.jar"));
>> StreamExecutionEnvironment streamEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>> StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(streamEnv);
>> ...
>> Class udfClass = Class.forName("demo.MyUDF", ...);
>> tableEnv.createTemporarySystemFunction("MyUDF", udfClass);
>> ...
>>
>> Error stack trace:
>> Exception in thread "main" java.lang.ClassNotFoundException: demo.MyUDF
>>     at
>> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582)
>>     at
>> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>>     at
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>     at
>> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1886)
>>     at
>> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1772)
>>     at
>> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>>     at
>> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594)
>>     at
>> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
>>     at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>>     at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>>     at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>>     at
>> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:692)
>>     at
>> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:714)
>>     at
>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:130)
>>     at
>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:116)
>>     at
>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73)
>>     at
>> org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81)
>>     at
>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825)
>>     at
>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503)
>>     at
>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
>>     at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
>>     at
>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:437)
>>     at
>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:432)
>>     at
>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:356)
>>     ...
>>
>

Re: Flink fails to load class from configured classpath using PipelineOptions

Posted by Yang Wang <da...@gmail.com>.
The config option "pipeline.jars" is used to specify the user jar, which
contains the main class.
I think what you need is "pipeline.classpaths".

/**
 * A list of URLs that are added to the classpath of each user code
classloader of the program.
 * Paths must specify a protocol (e.g. file://) and be accessible on all nodes
 */
public static final ConfigOption<List<String>> CLASSPATHS =
        key("pipeline.classpaths")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        "A semicolon-separated list of the classpaths
to package with the job jars to be sent to"
                                + " the cluster. These have to be valid URLs.");


Best,
Yang

Pouria Pirzadeh <po...@gmail.com> 于2021年12月17日周五 03:43写道:

> I am developing a Java application which uses UDFs on Flink 1.14.
> It uses PipelineOptions.JARS config to add jar files, containing UDF
> classes, dynamically to the user classpath in the main method; However the
> application fails to load UDF class from configured jar files at job
> launch time with and crashes with ClassNotFoundException.
>
> Is PipelineOptions.JARS the correct option to add files to classpath on
> Job manager and all task managers?
>
> Sample code snippet:
>
> final Configuration configuration = new Configuration();
>
> configuration.set(PipelineOptions.JARS,Collections.singletonList("file:///path/to/udf.jar"));
> StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
> ...
> Class udfClass = Class.forName("demo.MyUDF", ...);
> tableEnv.createTemporarySystemFunction("MyUDF", udfClass);
> ...
>
> Error stack trace:
> Exception in thread "main" java.lang.ClassNotFoundException: demo.MyUDF
>     at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582)
>     at
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>     at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>     at
> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1886)
>     at
> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1772)
>     at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594)
>     at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>     at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:692)
>     at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:714)
>     at
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:130)
>     at
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:116)
>     at
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73)
>     at
> org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81)
>     at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825)
>     at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503)
>     at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
>     at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
>     at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:437)
>     at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:432)
>     at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:356)
>     ...
>