You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/08 09:58:27 UTC

[GitHub] [flink] zhuzhurk commented on a diff in pull request #19860: [FLINK-27658][runtime] Introduce MutableURLClassLoader allow to register and remove user jar dynamically

zhuzhurk commented on code in PR #19860:
URL: https://github.com/apache/flink/pull/19860#discussion_r892124115


##########
flink-core/src/main/java/org/apache/flink/core/classloading/MutableURLClassLoader.java:
##########
@@ -0,0 +1,152 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.core.classloading;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.util.FlinkUserCodeClassLoader;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
+
+/**
+ * This classloader delegates to the actual user classloader {@link FlinkUserCodeClassLoader}. Upon
+ * the existed function of {@link FlinkUserCodeClassLoader}, it also provides extra {@link
+ * #addURL(URL)} and {@link #removeURL(URL)} methods, this allows user to add and remove jar
+ * dynamically.
+ *
+ * <p>User can specify the class load order by set the parameter (classloader.resolve-order) in
+ * {@link Configuration}, default order is child-first.
+ */
+public class MutableURLClassLoader extends URLClassLoader {

Review Comment:
   When will users directly create this classloader?



##########
flink-core/src/main/java/org/apache/flink/core/classloading/MutableURLClassLoader.java:
##########
@@ -0,0 +1,152 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.core.classloading;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.util.FlinkUserCodeClassLoader;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
+
+/**
+ * This classloader delegates to the actual user classloader {@link FlinkUserCodeClassLoader}. Upon
+ * the existed function of {@link FlinkUserCodeClassLoader}, it also provides extra {@link
+ * #addURL(URL)} and {@link #removeURL(URL)} methods, this allows user to add and remove jar
+ * dynamically.
+ *
+ * <p>User can specify the class load order by set the parameter (classloader.resolve-order) in
+ * {@link Configuration}, default order is child-first.
+ */
+public class MutableURLClassLoader extends URLClassLoader {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MutableURLClassLoader.class);
+
+    private final Set<URL> registeredUrls;
+    private final Configuration config;
+    private final ClassLoader parentClassLoader;
+    private volatile FlinkUserCodeClassLoader ownerClassLoader;
+
+    public MutableURLClassLoader(URL[] urls, ClassLoader parent, Configuration config) {
+        super(new URL[0], parent);
+        this.registeredUrls = new HashSet<>(Arrays.asList(urls));
+        this.config = config;
+        this.parentClassLoader = parent;
+        this.ownerClassLoader = buildUserCodeClassLoader();
+    }
+
+    private FlinkUserCodeClassLoader buildUserCodeClassLoader() {
+        final String[] alwaysParentFirstLoaderPatterns =
+                CoreOptions.getParentFirstLoaderPatterns(config);
+
+        // According to the specified class load order,child-first or parent-first
+        // child-first: load from this classloader firstly, if not found, then from parent
+        // parent-first: load from parent firstly, if not found, then from this classloader
+        final String classLoaderResolveOrder =
+                config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
+        FlinkUserCodeClassLoaders.ResolveOrder resolveOrder =
+                FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder);
+        return (FlinkUserCodeClassLoader)
+                FlinkUserCodeClassLoaders.create(
+                        resolveOrder,
+                        registeredUrls.toArray(new URL[0]),
+                        parentClassLoader,
+                        alwaysParentFirstLoaderPatterns,
+                        NOOP_EXCEPTION_HANDLER,
+                        false);
+    }
+
+    @Override
+    public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+        try {
+            return ownerClassLoader.loadClass(name, resolve);
+        } catch (ClassNotFoundException cnf) {

Review Comment:
   The try-catch is not needed.



##########
flink-core/src/main/java/org/apache/flink/core/classloading/MutableURLClassLoader.java:
##########
@@ -0,0 +1,152 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.core.classloading;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.util.FlinkUserCodeClassLoader;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
+
+/**
+ * This classloader delegates to the actual user classloader {@link FlinkUserCodeClassLoader}. Upon
+ * the existed function of {@link FlinkUserCodeClassLoader}, it also provides extra {@link
+ * #addURL(URL)} and {@link #removeURL(URL)} methods, this allows user to add and remove jar
+ * dynamically.
+ *
+ * <p>User can specify the class load order by set the parameter (classloader.resolve-order) in
+ * {@link Configuration}, default order is child-first.
+ */
+public class MutableURLClassLoader extends URLClassLoader {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MutableURLClassLoader.class);
+
+    private final Set<URL> registeredUrls;
+    private final Configuration config;
+    private final ClassLoader parentClassLoader;
+    private volatile FlinkUserCodeClassLoader ownerClassLoader;
+
+    public MutableURLClassLoader(URL[] urls, ClassLoader parent, Configuration config) {
+        super(new URL[0], parent);
+        this.registeredUrls = new HashSet<>(Arrays.asList(urls));
+        this.config = config;
+        this.parentClassLoader = parent;
+        this.ownerClassLoader = buildUserCodeClassLoader();
+    }
+
+    private FlinkUserCodeClassLoader buildUserCodeClassLoader() {
+        final String[] alwaysParentFirstLoaderPatterns =
+                CoreOptions.getParentFirstLoaderPatterns(config);
+
+        // According to the specified class load order,child-first or parent-first
+        // child-first: load from this classloader firstly, if not found, then from parent
+        // parent-first: load from parent firstly, if not found, then from this classloader
+        final String classLoaderResolveOrder =
+                config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
+        FlinkUserCodeClassLoaders.ResolveOrder resolveOrder =
+                FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder);
+        return (FlinkUserCodeClassLoader)
+                FlinkUserCodeClassLoaders.create(
+                        resolveOrder,
+                        registeredUrls.toArray(new URL[0]),
+                        parentClassLoader,
+                        alwaysParentFirstLoaderPatterns,
+                        NOOP_EXCEPTION_HANDLER,
+                        false);
+    }
+
+    @Override
+    public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+        try {
+            return ownerClassLoader.loadClass(name, resolve);
+        } catch (ClassNotFoundException cnf) {
+            throw cnf;
+        }
+    }
+
+    @Override
+    public void addURL(URL url) {
+        // add url to registeredUrls
+        registeredUrls.add(url);
+        // add it to ownerClassLoader
+        ownerClassLoader.addURL(url);
+    }
+
+    /**
+     * Here the old ownerClassLoader can't be close directly because of java class loader's
+     * all-inclusive responsibility delegation mechanism. The mechanism is that when a ClassLoder
+     * loads a class, unless another ClassLoder is specified explicitly, the classes that the class
+     * depends on and references are also loaded by the ClassLoder. Hence, we can't close the old
+     * ownerClassLoader when remove url, it may cause a {@link ClassNotFoundException} exception.
+     *
+     * <p>If call this method frequently, it maybe results in a classloader leak. Please use this

Review Comment:
   I'm a bit concerned about the side effect. And I do not fully understand the benefit of the removal. 
   What problem would happen if URLs are not removed from a userClassLoader? If it is only for table environment to support multiple different jobs submission from SQL client/gateway, I would suggest to move the class to table module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org