You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by 百岁 <ba...@dingtalk.com.INVALID> on 2021/10/16 05:56:18 UTC

dataStream can not use multiple classloaders

TO: everyone
I have create a dataStream demo as below,in the demo,create a very simple example,
read stream data from sourceFunction,and send it to sinkFunction without any processing.
The point is,by creating the instance of SourceFunction and SinkFunction has used two separately URLClassLoader with different dependencies,for avoiding the code conflict .
but the problem is flink client send to server ,the server side throw an classNotFoundException which defined the de classloader dependencies, Obviously the server side has not use the classloader as client side.
how can I solve the problem ,is there any one can give me some advice ? thanks a lot



public class FlinkStreamDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SourceFunction<DTO> sourceFunc = createSourceFunction();

        DataStreamSource<DTO> dtoDataStreamSource = env.addSource(sourceFunc);

        SinkFunction<DTO> sinkFunction = createSink();

        dtoDataStreamSource.addSink(sinkFunction);

        env.execute("flink-example");
    }

    private static SinkFunction<DTO> createSink() {
        URL[] urls = new URL[]{...};
        ClassLoader classLoader = new URLClassLoader(urls);
        ServiceLoader<ISinkFunctionFactory> loaders = ServiceLoader.load(ISinkFunctionFactory.class, classLoader);
        Iterator<ISinkFunctionFactory> it = loaders.iterator();
        if (it.hasNext()) {
            return it.next().create();
        }
        throw new IllegalStateException();
    }

    private static SourceFunction<DTO> createSourceFunction() {
        URL[] urls = new URL[]{...};
        ClassLoader classLoader = new URLClassLoader(urls);
        ServiceLoader<ISourceFunctionFactory> loaders = ServiceLoader.load(ISourceFunctionFactory.class, classLoader);
        Iterator<ISourceFunctionFactory> it = loaders.iterator();
        if (it.hasNext()) {
            return it.next().create();
        }
        throw new IllegalStateException();
    }

    public interface ISinkFunctionFactory {
        SinkFunction<DTO> create();
    }

    public interface ISourceFunctionFactory {
        SourceFunction<DTO> create();
    }
}


from: https://issues.apache.org/jira/projects/FLINK/issues/FLINK-24558?filter=allissues

回复:dataStream can not use multiple classloaders

Posted by 百岁 <ba...@dingtalk.com.INVALID>.
TO:@Arvid Heise @Caizhi Weng

thanks for your reply, as the stream api code become more and more complicated, we will add more dependenies from  third pary. 

This kind of problem will be inevitable. If we only rely on this kind of trick like shade the dependencies  to solve this kind of problem, I think it is far from enough . 
There should be a work-and-for-all solution. I proposed a plan for discussion,the figure show as below:

make server side classLoader which create by BloblibraryCacheManager.DefaultClassLoaderFactory pluggable in able to make the parent classloader of  ChildFirstClassLoader variable
the parent classloader facade client side multi classloader for class finding by polling
how about it , thanks


------------------------------------------------------------------
发件人:Arvid Heise <ar...@apache.org>
发送时间:2021年10月18日(星期一) 17:28
收件人:Caizhi Weng <ts...@gmail.com>
抄 送:dev <de...@flink.apache.org>; 百岁 <ba...@dingtalk.com>; user <us...@flink.apache.org>
主 题:Re: dataStream can not use multiple classloaders

You also must ensure that your SourceFunction is serializable, so it's not enough to just refer to some classloader, you must ensure that you have access to it also after deserialization on the task managers.

On Mon, Oct 18, 2021 at 4:24 AM Caizhi Weng <ts...@gmail.com> wrote:

Hi!

There is only one classloader for user code by default in runtime. The main method of your code is only executed on the client side. It generates a job graph and sends it to the cluster.

To avoid class loading conflict it is recommended to shade the dependencies of your source and sink function jars. If you really have to load some dependencies with different class loaders, you can load them in the open method of a RichSourceFunction or RichSinkFunction.
百岁 <ba...@dingtalk.com.invalid> 于2021年10月16日周六 下午11:47写道:
TO: everyone
 I have create a dataStream demo as below,in the demo,create a very simple example,
 read stream data from sourceFunction,and send it to sinkFunction without any processing.
 The point is,by creating the instance of SourceFunction and SinkFunction has used two separately URLClassLoader with different dependencies,for avoiding the code conflict .
 but the problem is flink client send to server ,the server side throw an classNotFoundException which defined the de classloader dependencies, Obviously the server side has not use the classloader as client side.
 how can I solve the problem ,is there any one can give me some advice ? thanks a lot



 public class FlinkStreamDemo {
     public static void main(String[] args) throws Exception {

         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         SourceFunction<DTO> sourceFunc = createSourceFunction();
         DataStreamSource<DTO> dtoDataStreamSource = env.addSource(sourceFunc);
         SinkFunction<DTO> sinkFunction = createSink();
         dtoDataStreamSource.addSink(sinkFunction);
         env.execute("flink-example");
     }

     private static SinkFunction<DTO> createSink() {
         URL[] urls = new URL[]{...};
         ClassLoader classLoader = new URLClassLoader(urls);
         ServiceLoader<ISinkFunctionFactory> loaders = ServiceLoader.load(ISinkFunctionFactory.class, classLoader);
         Iterator<ISinkFunctionFactory> it = loaders.iterator();
         if (it.hasNext()) {
             return it.next().create();
         }
         throw new IllegalStateException();
     }

     private static SourceFunction<DTO> createSourceFunction() {
         URL[] urls = new URL[]{...};
         ClassLoader classLoader = new URLClassLoader(urls);
         ServiceLoader<ISourceFunctionFactory> loaders = ServiceLoader.load(ISourceFunctionFactory.class, classLoader);
         Iterator<ISourceFunctionFactory> it = loaders.iterator();
         if (it.hasNext()) {
             return it.next().create();
         }
         throw new IllegalStateException();
     }

     public interface ISinkFunctionFactory {
         SinkFunction<DTO> create();
     }

     public interface ISourceFunctionFactory {
         SourceFunction<DTO> create();
     }
 }


 from: https://issues.apache.org/jira/projects/FLINK/issues/FLINK-24558?filter=allissues


Re: dataStream can not use multiple classloaders

Posted by Arvid Heise <ar...@apache.org>.
You also must ensure that your SourceFunction is serializable, so it's not
enough to just refer to some classloader, you must ensure that you have
access to it also after deserialization on the task managers.

On Mon, Oct 18, 2021 at 4:24 AM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> There is only one classloader for user code by default in runtime. The
> main method of your code is only executed on the client side. It generates
> a job graph and sends it to the cluster.
>
> To avoid class loading conflict it is recommended to shade the
> dependencies of your source and sink function jars. If you really have to
> load some dependencies with different class loaders, you can load them in
> the open method of a RichSourceFunction or RichSinkFunction.
>
> 百岁 <ba...@dingtalk.com.invalid> 于2021年10月16日周六 下午11:47写道:
>
>> TO: everyone
>> I have create a dataStream demo as below,in the demo,create a very simple
>> example,
>> read stream data from sourceFunction,and send it to sinkFunction without
>> any processing.
>> The point is,by creating the instance of SourceFunction and SinkFunction
>> has used two separately URLClassLoader with different dependencies,for
>> avoiding the code conflict .
>> but the problem is flink client send to server ,the server side throw an
>> classNotFoundException which defined the de classloader dependencies,
>> Obviously the server side has not use the classloader as client side.
>> how can I solve the problem ,is there any one can give me some advice ?
>> thanks a lot
>>
>>
>>
>> public class FlinkStreamDemo {
>>     public static void main(String[] args) throws Exception {
>>
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         SourceFunction<DTO> sourceFunc = createSourceFunction();
>>
>>         DataStreamSource<DTO> dtoDataStreamSource =
>> env.addSource(sourceFunc);
>>
>>         SinkFunction<DTO> sinkFunction = createSink();
>>
>>         dtoDataStreamSource.addSink(sinkFunction);
>>
>>         env.execute("flink-example");
>>     }
>>
>>     private static SinkFunction<DTO> createSink() {
>>         URL[] urls = new URL[]{...};
>>         ClassLoader classLoader = new URLClassLoader(urls);
>>         ServiceLoader<ISinkFunctionFactory> loaders =
>> ServiceLoader.load(ISinkFunctionFactory.class, classLoader);
>>         Iterator<ISinkFunctionFactory> it = loaders.iterator();
>>         if (it.hasNext()) {
>>             return it.next().create();
>>         }
>>         throw new IllegalStateException();
>>     }
>>
>>     private static SourceFunction<DTO> createSourceFunction() {
>>         URL[] urls = new URL[]{...};
>>         ClassLoader classLoader = new URLClassLoader(urls);
>>         ServiceLoader<ISourceFunctionFactory> loaders =
>> ServiceLoader.load(ISourceFunctionFactory.class, classLoader);
>>         Iterator<ISourceFunctionFactory> it = loaders.iterator();
>>         if (it.hasNext()) {
>>             return it.next().create();
>>         }
>>         throw new IllegalStateException();
>>     }
>>
>>     public interface ISinkFunctionFactory {
>>         SinkFunction<DTO> create();
>>     }
>>
>>     public interface ISourceFunctionFactory {
>>         SourceFunction<DTO> create();
>>     }
>> }
>>
>>
>> from:
>> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-24558?filter=allissues
>
>

Re: dataStream can not use multiple classloaders

Posted by Arvid Heise <ar...@apache.org>.
You also must ensure that your SourceFunction is serializable, so it's not
enough to just refer to some classloader, you must ensure that you have
access to it also after deserialization on the task managers.

On Mon, Oct 18, 2021 at 4:24 AM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> There is only one classloader for user code by default in runtime. The
> main method of your code is only executed on the client side. It generates
> a job graph and sends it to the cluster.
>
> To avoid class loading conflict it is recommended to shade the
> dependencies of your source and sink function jars. If you really have to
> load some dependencies with different class loaders, you can load them in
> the open method of a RichSourceFunction or RichSinkFunction.
>
> 百岁 <ba...@dingtalk.com.invalid> 于2021年10月16日周六 下午11:47写道:
>
>> TO: everyone
>> I have create a dataStream demo as below,in the demo,create a very simple
>> example,
>> read stream data from sourceFunction,and send it to sinkFunction without
>> any processing.
>> The point is,by creating the instance of SourceFunction and SinkFunction
>> has used two separately URLClassLoader with different dependencies,for
>> avoiding the code conflict .
>> but the problem is flink client send to server ,the server side throw an
>> classNotFoundException which defined the de classloader dependencies,
>> Obviously the server side has not use the classloader as client side.
>> how can I solve the problem ,is there any one can give me some advice ?
>> thanks a lot
>>
>>
>>
>> public class FlinkStreamDemo {
>>     public static void main(String[] args) throws Exception {
>>
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         SourceFunction<DTO> sourceFunc = createSourceFunction();
>>
>>         DataStreamSource<DTO> dtoDataStreamSource =
>> env.addSource(sourceFunc);
>>
>>         SinkFunction<DTO> sinkFunction = createSink();
>>
>>         dtoDataStreamSource.addSink(sinkFunction);
>>
>>         env.execute("flink-example");
>>     }
>>
>>     private static SinkFunction<DTO> createSink() {
>>         URL[] urls = new URL[]{...};
>>         ClassLoader classLoader = new URLClassLoader(urls);
>>         ServiceLoader<ISinkFunctionFactory> loaders =
>> ServiceLoader.load(ISinkFunctionFactory.class, classLoader);
>>         Iterator<ISinkFunctionFactory> it = loaders.iterator();
>>         if (it.hasNext()) {
>>             return it.next().create();
>>         }
>>         throw new IllegalStateException();
>>     }
>>
>>     private static SourceFunction<DTO> createSourceFunction() {
>>         URL[] urls = new URL[]{...};
>>         ClassLoader classLoader = new URLClassLoader(urls);
>>         ServiceLoader<ISourceFunctionFactory> loaders =
>> ServiceLoader.load(ISourceFunctionFactory.class, classLoader);
>>         Iterator<ISourceFunctionFactory> it = loaders.iterator();
>>         if (it.hasNext()) {
>>             return it.next().create();
>>         }
>>         throw new IllegalStateException();
>>     }
>>
>>     public interface ISinkFunctionFactory {
>>         SinkFunction<DTO> create();
>>     }
>>
>>     public interface ISourceFunctionFactory {
>>         SourceFunction<DTO> create();
>>     }
>> }
>>
>>
>> from:
>> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-24558?filter=allissues
>
>

Re: dataStream can not use multiple classloaders

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

There is only one classloader for user code by default in runtime. The main
method of your code is only executed on the client side. It generates a job
graph and sends it to the cluster.

To avoid class loading conflict it is recommended to shade the dependencies
of your source and sink function jars. If you really have to load some
dependencies with different class loaders, you can load them in the open
method of a RichSourceFunction or RichSinkFunction.

百岁 <ba...@dingtalk.com.invalid> 于2021年10月16日周六 下午11:47写道:

> TO: everyone
> I have create a dataStream demo as below,in the demo,create a very simple
> example,
> read stream data from sourceFunction,and send it to sinkFunction without
> any processing.
> The point is,by creating the instance of SourceFunction and SinkFunction
> has used two separately URLClassLoader with different dependencies,for
> avoiding the code conflict .
> but the problem is flink client send to server ,the server side throw an
> classNotFoundException which defined the de classloader dependencies,
> Obviously the server side has not use the classloader as client side.
> how can I solve the problem ,is there any one can give me some advice ?
> thanks a lot
>
>
>
> public class FlinkStreamDemo {
>     public static void main(String[] args) throws Exception {
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         SourceFunction<DTO> sourceFunc = createSourceFunction();
>
>         DataStreamSource<DTO> dtoDataStreamSource =
> env.addSource(sourceFunc);
>
>         SinkFunction<DTO> sinkFunction = createSink();
>
>         dtoDataStreamSource.addSink(sinkFunction);
>
>         env.execute("flink-example");
>     }
>
>     private static SinkFunction<DTO> createSink() {
>         URL[] urls = new URL[]{...};
>         ClassLoader classLoader = new URLClassLoader(urls);
>         ServiceLoader<ISinkFunctionFactory> loaders =
> ServiceLoader.load(ISinkFunctionFactory.class, classLoader);
>         Iterator<ISinkFunctionFactory> it = loaders.iterator();
>         if (it.hasNext()) {
>             return it.next().create();
>         }
>         throw new IllegalStateException();
>     }
>
>     private static SourceFunction<DTO> createSourceFunction() {
>         URL[] urls = new URL[]{...};
>         ClassLoader classLoader = new URLClassLoader(urls);
>         ServiceLoader<ISourceFunctionFactory> loaders =
> ServiceLoader.load(ISourceFunctionFactory.class, classLoader);
>         Iterator<ISourceFunctionFactory> it = loaders.iterator();
>         if (it.hasNext()) {
>             return it.next().create();
>         }
>         throw new IllegalStateException();
>     }
>
>     public interface ISinkFunctionFactory {
>         SinkFunction<DTO> create();
>     }
>
>     public interface ISourceFunctionFactory {
>         SourceFunction<DTO> create();
>     }
> }
>
>
> from:
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-24558?filter=allissues

Re: dataStream can not use multiple classloaders

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

There is only one classloader for user code by default in runtime. The main
method of your code is only executed on the client side. It generates a job
graph and sends it to the cluster.

To avoid class loading conflict it is recommended to shade the dependencies
of your source and sink function jars. If you really have to load some
dependencies with different class loaders, you can load them in the open
method of a RichSourceFunction or RichSinkFunction.

百岁 <ba...@dingtalk.com.invalid> 于2021年10月16日周六 下午11:47写道:

> TO: everyone
> I have create a dataStream demo as below,in the demo,create a very simple
> example,
> read stream data from sourceFunction,and send it to sinkFunction without
> any processing.
> The point is,by creating the instance of SourceFunction and SinkFunction
> has used two separately URLClassLoader with different dependencies,for
> avoiding the code conflict .
> but the problem is flink client send to server ,the server side throw an
> classNotFoundException which defined the de classloader dependencies,
> Obviously the server side has not use the classloader as client side.
> how can I solve the problem ,is there any one can give me some advice ?
> thanks a lot
>
>
>
> public class FlinkStreamDemo {
>     public static void main(String[] args) throws Exception {
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         SourceFunction<DTO> sourceFunc = createSourceFunction();
>
>         DataStreamSource<DTO> dtoDataStreamSource =
> env.addSource(sourceFunc);
>
>         SinkFunction<DTO> sinkFunction = createSink();
>
>         dtoDataStreamSource.addSink(sinkFunction);
>
>         env.execute("flink-example");
>     }
>
>     private static SinkFunction<DTO> createSink() {
>         URL[] urls = new URL[]{...};
>         ClassLoader classLoader = new URLClassLoader(urls);
>         ServiceLoader<ISinkFunctionFactory> loaders =
> ServiceLoader.load(ISinkFunctionFactory.class, classLoader);
>         Iterator<ISinkFunctionFactory> it = loaders.iterator();
>         if (it.hasNext()) {
>             return it.next().create();
>         }
>         throw new IllegalStateException();
>     }
>
>     private static SourceFunction<DTO> createSourceFunction() {
>         URL[] urls = new URL[]{...};
>         ClassLoader classLoader = new URLClassLoader(urls);
>         ServiceLoader<ISourceFunctionFactory> loaders =
> ServiceLoader.load(ISourceFunctionFactory.class, classLoader);
>         Iterator<ISourceFunctionFactory> it = loaders.iterator();
>         if (it.hasNext()) {
>             return it.next().create();
>         }
>         throw new IllegalStateException();
>     }
>
>     public interface ISinkFunctionFactory {
>         SinkFunction<DTO> create();
>     }
>
>     public interface ISourceFunctionFactory {
>         SourceFunction<DTO> create();
>     }
> }
>
>
> from:
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-24558?filter=allissues