You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/28 08:30:32 UTC

[GitHub] [flink] baisui1981 edited a comment on pull request #17521: [FLINK-24558][API/DataStream]make parent ClassLoader variable which c…

baisui1981 edited a comment on pull request #17521:
URL: https://github.com/apache/flink/pull/17521#issuecomment-953619514


   > I don't think it is a good idea to give users direct access to this part of the code. It just yet again increases the API surface, and for some _very_ important internal thing that we need to be able to change at a whim.
   > 
   > Furthermore, I don't see yet how this would solve the issue at hand. The proposed interface provides no differentiating factor that could be used to create different classloaders for each task (like the Task ID). Even then, the classloader is shared across different tasks running on the same TM, so it must behave the same way? Given that they all have access to the same jars, I'm curious how the behavior is supposed to be different in the first place.
   > 
   > All in all, I think this needs way more discussion.
   
   hi @zentol thanks for your reply,  We are building a data center product based on flink, expecting to integrate various third-party components which is provided as flink `SourceFunction`(like [various source cdc connectors ](https://github.com/ververica/flink-cdc-connectors)) and `SinkFunction`(like [elasticsearch7](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-elasticsearch7)) and so on, then a new problem is always unavoidable as described in the [FLINK-24558](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-24558?filter=allissues)。
   to solve this problem, i am  intend to add an extend point for customized `org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.ClassLoaderFactory` ,the main motivation is **making the parent class can be variable to my designated parent classLoader**, the parent classloader can be delegated to load clientSide class which is wrapped in multiple plugin bundle
   
    in my project I have written a class [TISFlinClassLoaderFactory](https://github.com/qlangtech/plugins/blob/v3.1.0/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/plugins/incr/flink/TISFlinClassLoaderFactory.java) by extend with new introduced interface of `ClassLoaderFactoryBuilder ` and write an [service meta configuration](https://github.com/qlangtech/plugins/blob/v3.1.0/tis-incr/tis-flink-extends/src/main/resources/META-INF/services/org.apache.flink.runtime.execution.librarycache.ClassLoaderFactoryBuilder) ,and then , make a package of the artifact, put it in dir **$FLINK_HOME/lib**
   
   > The proposed interface provides no differentiating factor that could be used to create different classloaders for each task (like the Task ID)
   
   thanks for @zentol your reminding , maybe I shall make the parent classloader unchangeable, because as you say it will shared across different tasks running on the same TM,  Instead , extend directly from `FlinkUserCodeClassLoader`  would be better, example: [TISChildFirstClassLoader](https://github.com/qlangtech/plugins/blob/v3.1.0/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/plugins/incr/flink/TISChildFirstClassLoader.java)
   
    plugin inventory (**differentiating factor that could be used to create different classloaders**) is store in  jar manifest  , that submit from flink clientSide as param `libraryURLs`. On serverSide extract the `plugin inventory` by parse the jar manifest, and then take the `plugin inventory` as param to initialize the PluginManager for creating the uberClassloader .
   [TISFlinClassLoaderFactory.java](https://github.com/qlangtech/plugins/blob/v3.1.0/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/plugins/incr/flink/TISFlinClassLoaderFactory.java)
   ``` java
       public BlobLibraryCacheManager.ClassLoaderFactory buildServerLoaderFactory(
               FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder
               , String[] alwaysParentFirstPatterns, @Nullable Consumer<Throwable> exceptionHander, boolean checkClassLoaderLeak) {
   
           return new BlobLibraryCacheManager.DefaultClassLoaderFactory(classLoaderResolveOrder
                   , alwaysParentFirstPatterns, exceptionHander, checkClassLoaderLeak) {
   
               @Override
               public URLClassLoader createClassLoader(URL[] libraryURLs) {
   
                   try {
                       PluginManager pluginManager = TIS.get().getPluginManager();
                       if (libraryURLs.length != 1) {
                           throw new IllegalStateException("length of libraryURLs must be 1 , but now is:" + libraryURLs.length);
                       }
                       for (URL lib : libraryURLs) {
                           try (JarInputStream jarReader = new JarInputStream(lib.openStream())) {
                               Manifest manifest = jarReader.getManifest();
                               Attributes pluginInventory = manifest.getAttributes("plugin_inventory");
                               if (pluginInventory == null) {
                                   throw new IllegalStateException("plugin inventory can not be empty in lib:" + lib);
                               }
                               for (Map.Entry<Object, Object> pluginDesc : pluginInventory.entrySet()) {
                                   pluginManager.dynamicLoadPlugin(String.valueOf(pluginDesc.getKey()));
                               }
                           }
                       }
                       return new TISChildFirstClassLoader(pluginManager.uberClassLoader, libraryURLs, this.getParentClassLoader()
                               , this.alwaysParentFirstPatterns, this.classLoadingExceptionHandler);
                   } catch (IOException e) {
                       throw new RuntimeException(e);
                   }
               }
           };
       }
   ```
   @zentol how about it , give me some suggestions ,thanks
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org