You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/03 11:38:23 UTC

[pulsar] branch master updated: [Functions] Fix classloader leaks (#12973)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cab946b  [Functions] Fix classloader leaks (#12973)
cab946b is described below

commit cab946b4ca68e1ffc6dee3932bc4c0fc7e7da66e
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Dec 3 13:37:21 2021 +0200

    [Functions] Fix classloader leaks (#12973)
    
    * Fix classloader leak in FunctionCommon.getClassLoaderFromPackage
    
    * Fix classloader leak in SinksImpl and SourcesImpl
    
    * Fix logic for shouldCloseClassLoader
---
 .../pulsar/common/util/ClassLoaderUtils.java       |  14 ++
 .../pulsar/functions/utils/FunctionCommon.java     | 151 ++++++++++++---------
 .../functions/worker/rest/api/SinksImpl.java       |  32 +++--
 .../functions/worker/rest/api/SourcesImpl.java     |  33 +++--
 4 files changed, 140 insertions(+), 90 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
index 0e1e188..69e4c63 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
@@ -18,16 +18,20 @@
  */
 package org.apache.pulsar.common.util;
 
+import java.io.Closeable;
 import java.io.File;
+import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Helper methods wrt Classloading.
  */
+@Slf4j
 public class ClassLoaderUtils {
     /**
      * Load a jar.
@@ -76,4 +80,14 @@ public class ClassLoaderUtils {
                     String.format("%s does not implement %s", className, klass.getName()));
         }
     }
+
+    public static void closeClassLoader(ClassLoader classLoader) {
+        if (classLoader instanceof Closeable) {
+            try {
+                ((Closeable) classLoader).close();
+            } catch (IOException e) {
+                log.error("Error closing classloader {}", classLoader, e);
+            }
+        }
+    }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index a13695e..f72814d 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -382,97 +382,114 @@ public class FunctionCommon {
             String narExtractionDirectory) {
         String connectorClassName = className;
         ClassLoader jarClassLoader = null;
+        boolean keepJarClassLoader = false;
         ClassLoader narClassLoader = null;
+        boolean keepNarClassLoader = false;
 
         Exception jarClassLoaderException = null;
         Exception narClassLoaderException = null;
 
         try {
-            jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
-        } catch (Exception e) {
-            jarClassLoaderException = e;
-        }
-        try {
-            narClassLoader = FunctionCommon.extractNarClassLoader(packageFile, narExtractionDirectory);
-        } catch (Exception e) {
-            narClassLoaderException = e;
-        }
-
-        // if connector class name is not provided, we can only try to load archive as a NAR
-        if (isEmpty(connectorClassName)) {
-            if (narClassLoader == null) {
-                throw new IllegalArgumentException(String.format("%s package does not have the correct format. " +
-                                "Pulsar cannot determine if the package is a NAR package or JAR package. " +
-                                "%s classname is not provided and attempts to load it as a NAR package produced the following error.",
-                        capFirstLetter(componentType), capFirstLetter(componentType)),
-                        narClassLoaderException);
-            }
             try {
-                if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
-                    connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
-                } else {
-                    connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
-                }
-            } catch (IOException e) {
-                throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
-                        componentType.toString().toLowerCase()), e);
+                jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
+            } catch (Exception e) {
+                jarClassLoaderException = e;
             }
-
             try {
-                narClassLoader.loadClass(connectorClassName);
-                return narClassLoader;
-            } catch (ClassNotFoundException | NoClassDefFoundError e) {
-                throw new IllegalArgumentException(
-                        String.format("%s class %s must be in class path", capFirstLetter(componentType), connectorClassName), e);
+                narClassLoader = FunctionCommon.extractNarClassLoader(packageFile, narExtractionDirectory);
+            } catch (Exception e) {
+                narClassLoaderException = e;
             }
 
-        } else {
-            // if connector class name is provided, we need to try to load it as a JAR and as a NAR.
-            if (jarClassLoader != null) {
+            // if connector class name is not provided, we can only try to load archive as a NAR
+            if (isEmpty(connectorClassName)) {
+                if (narClassLoader == null) {
+                    throw new IllegalArgumentException(String.format("%s package does not have the correct format. " +
+                                    "Pulsar cannot determine if the package is a NAR package or JAR package. " +
+                                    "%s classname is not provided and attempts to load it as a NAR package produced " +
+                                    "the following error.",
+                            capFirstLetter(componentType), capFirstLetter(componentType)),
+                            narClassLoaderException);
+                }
                 try {
-                    jarClassLoader.loadClass(connectorClassName);
-                    return jarClassLoader;
-                } catch (ClassNotFoundException | NoClassDefFoundError e) {
-                    // class not found in JAR try loading as a NAR and searching for the class
-                    if (narClassLoader != null) {
-
-                        try {
-                            narClassLoader.loadClass(connectorClassName);
-                            return narClassLoader;
-                        } catch (ClassNotFoundException | NoClassDefFoundError e1) {
-                            throw new IllegalArgumentException(
-                                    String.format("%s class %s must be in class path",
-                                            capFirstLetter(componentType), connectorClassName), e1);
-                        }
+                    if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
+                        connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
                     } else {
-                        throw new IllegalArgumentException(
-                                String.format("%s class %s must be in class path", capFirstLetter(componentType),
-                                        connectorClassName), e);
+                        connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
                     }
+                } catch (IOException e) {
+                    throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
+                            componentType.toString().toLowerCase()), e);
                 }
-            } else if (narClassLoader != null) {
+
                 try {
                     narClassLoader.loadClass(connectorClassName);
+                    keepNarClassLoader = true;
                     return narClassLoader;
-                } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+                } catch (ClassNotFoundException | NoClassDefFoundError e) {
                     throw new IllegalArgumentException(
-                            String.format("%s class %s must be in class path",
-                                    capFirstLetter(componentType), connectorClassName), e1);
+                            String.format("%s class %s must be in class path", capFirstLetter(componentType),
+                                    connectorClassName), e);
                 }
+
             } else {
-                StringBuilder errorMsg = new StringBuilder(capFirstLetter(componentType)
-                        + " package does not have the correct format."
-                        + " Pulsar cannot determine if the package is a NAR package or JAR package.");
+                // if connector class name is provided, we need to try to load it as a JAR and as a NAR.
+                if (jarClassLoader != null) {
+                    try {
+                        jarClassLoader.loadClass(connectorClassName);
+                        keepJarClassLoader = true;
+                        return jarClassLoader;
+                    } catch (ClassNotFoundException | NoClassDefFoundError e) {
+                        // class not found in JAR try loading as a NAR and searching for the class
+                        if (narClassLoader != null) {
+
+                            try {
+                                narClassLoader.loadClass(connectorClassName);
+                                keepNarClassLoader = true;
+                                return narClassLoader;
+                            } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+                                throw new IllegalArgumentException(
+                                        String.format("%s class %s must be in class path",
+                                                capFirstLetter(componentType), connectorClassName), e1);
+                            }
+                        } else {
+                            throw new IllegalArgumentException(
+                                    String.format("%s class %s must be in class path", capFirstLetter(componentType),
+                                            connectorClassName), e);
+                        }
+                    }
+                } else if (narClassLoader != null) {
+                    try {
+                        narClassLoader.loadClass(connectorClassName);
+                        keepNarClassLoader = true;
+                        return narClassLoader;
+                    } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+                        throw new IllegalArgumentException(
+                                String.format("%s class %s must be in class path",
+                                        capFirstLetter(componentType), connectorClassName), e1);
+                    }
+                } else {
+                    StringBuilder errorMsg = new StringBuilder(capFirstLetter(componentType)
+                            + " package does not have the correct format."
+                            + " Pulsar cannot determine if the package is a NAR package or JAR package.");
 
-                if (jarClassLoaderException != null) {
-                    errorMsg.append(" Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
-                }
+                    if (jarClassLoaderException != null) {
+                        errorMsg.append(" Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
+                    }
 
-                if (narClassLoaderException != null) {
-                    errorMsg.append(" Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
-                }
+                    if (narClassLoaderException != null) {
+                        errorMsg.append(" Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
+                    }
 
-                throw new IllegalArgumentException(errorMsg.toString());
+                    throw new IllegalArgumentException(errorMsg.toString());
+                }
+            }
+        } finally {
+            if (!keepJarClassLoader) {
+                ClassLoaderUtils.closeClassLoader(jarClassLoader);
+            }
+            if (!keepNarClassLoader) {
+                ClassLoaderUtils.closeClassLoader(narClassLoader);
             }
         }
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 5e055c9..ab69ab9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -732,19 +733,28 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
             }
         }
 
-        // if sink is not builtin, attempt to extract classloader from package file if it exists
-        if (classLoader == null && sinkPackageFile != null) {
-            classLoader = getClassLoaderFromPackage(sinkConfig.getClassName(),
-                    sinkPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
-        }
+        boolean shouldCloseClassLoader = false;
+        try {
 
-        if (classLoader == null) {
-            throw new IllegalArgumentException("Sink package is not provided");
-        }
+            // if sink is not builtin, attempt to extract classloader from package file if it exists
+            if (classLoader == null && sinkPackageFile != null) {
+                classLoader = getClassLoaderFromPackage(sinkConfig.getClassName(),
+                        sinkPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
+                shouldCloseClassLoader = true;
+            }
 
-        SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validateAndExtractDetails(
-                sinkConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
-        return SinkConfigUtils.convert(sinkConfig, sinkDetails);
+            if (classLoader == null) {
+                throw new IllegalArgumentException("Sink package is not provided");
+            }
+
+            SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validateAndExtractDetails(
+                    sinkConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
+            return SinkConfigUtils.convert(sinkConfig, sinkDetails);
+        } finally {
+            if (shouldCloseClassLoader) {
+                ClassLoaderUtils.closeClassLoader(classLoader);
+            }
+        }
     }
 
     private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 22c53d0..df2dca8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -728,20 +729,28 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
             }
         }
 
-        // if source is not builtin, attempt to extract classloader from package file if it exists
-        if (classLoader == null && sourcePackageFile != null) {
-            classLoader = getClassLoaderFromPackage(sourceConfig.getClassName(),
-                    sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
-        }
+        boolean shouldCloseClassLoader = false;
+        try {
+            // if source is not builtin, attempt to extract classloader from package file if it exists
+            if (classLoader == null && sourcePackageFile != null) {
+                classLoader = getClassLoaderFromPackage(sourceConfig.getClassName(),
+                        sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
+                shouldCloseClassLoader = true;
+            }
 
-        if (classLoader == null) {
-            throw new IllegalArgumentException("Source package is not provided");
-        }
+            if (classLoader == null) {
+                throw new IllegalArgumentException("Source package is not provided");
+            }
 
-        SourceConfigUtils.ExtractedSourceDetails sourceDetails
-                = SourceConfigUtils.validateAndExtractDetails(
-                        sourceConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
-        return SourceConfigUtils.convert(sourceConfig, sourceDetails);
+            SourceConfigUtils.ExtractedSourceDetails sourceDetails
+                    = SourceConfigUtils.validateAndExtractDetails(
+                            sourceConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
+            return SourceConfigUtils.convert(sourceConfig, sourceDetails);
+        } finally {
+            if (shouldCloseClassLoader) {
+                ClassLoaderUtils.closeClassLoader(classLoader);
+            }
+        }
     }
 
     private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {