You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/03 11:38:23 UTC
[pulsar] branch master updated: [Functions] Fix classloader leaks (#12973)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cab946b [Functions] Fix classloader leaks (#12973)
cab946b is described below
commit cab946b4ca68e1ffc6dee3932bc4c0fc7e7da66e
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Dec 3 13:37:21 2021 +0200
[Functions] Fix classloader leaks (#12973)
* Fix classloader leak in FunctionCommon.getClassLoaderFromPackage
* Fix classloader leak in SinksImpl and SourcesImpl
* Fix logic for shouldCloseClassLoader
---
.../pulsar/common/util/ClassLoaderUtils.java | 14 ++
.../pulsar/functions/utils/FunctionCommon.java | 151 ++++++++++++---------
.../functions/worker/rest/api/SinksImpl.java | 32 +++--
.../functions/worker/rest/api/SourcesImpl.java | 33 +++--
4 files changed, 140 insertions(+), 90 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
index 0e1e188..69e4c63 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
@@ -18,16 +18,20 @@
*/
package org.apache.pulsar.common.util;
+import java.io.Closeable;
import java.io.File;
+import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import lombok.extern.slf4j.Slf4j;
/**
* Helper methods wrt Classloading.
*/
+@Slf4j
public class ClassLoaderUtils {
/**
* Load a jar.
@@ -76,4 +80,14 @@ public class ClassLoaderUtils {
String.format("%s does not implement %s", className, klass.getName()));
}
}
+
+ public static void closeClassLoader(ClassLoader classLoader) {
+ if (classLoader instanceof Closeable) {
+ try {
+ ((Closeable) classLoader).close();
+ } catch (IOException e) {
+ log.error("Error closing classloader {}", classLoader, e);
+ }
+ }
+ }
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index a13695e..f72814d 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -382,97 +382,114 @@ public class FunctionCommon {
String narExtractionDirectory) {
String connectorClassName = className;
ClassLoader jarClassLoader = null;
+ boolean keepJarClassLoader = false;
ClassLoader narClassLoader = null;
+ boolean keepNarClassLoader = false;
Exception jarClassLoaderException = null;
Exception narClassLoaderException = null;
try {
- jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
- } catch (Exception e) {
- jarClassLoaderException = e;
- }
- try {
- narClassLoader = FunctionCommon.extractNarClassLoader(packageFile, narExtractionDirectory);
- } catch (Exception e) {
- narClassLoaderException = e;
- }
-
- // if connector class name is not provided, we can only try to load archive as a NAR
- if (isEmpty(connectorClassName)) {
- if (narClassLoader == null) {
- throw new IllegalArgumentException(String.format("%s package does not have the correct format. " +
- "Pulsar cannot determine if the package is a NAR package or JAR package. " +
- "%s classname is not provided and attempts to load it as a NAR package produced the following error.",
- capFirstLetter(componentType), capFirstLetter(componentType)),
- narClassLoaderException);
- }
try {
- if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
- connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
- } else {
- connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
- }
- } catch (IOException e) {
- throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
- componentType.toString().toLowerCase()), e);
+ jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
+ } catch (Exception e) {
+ jarClassLoaderException = e;
}
-
try {
- narClassLoader.loadClass(connectorClassName);
- return narClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path", capFirstLetter(componentType), connectorClassName), e);
+ narClassLoader = FunctionCommon.extractNarClassLoader(packageFile, narExtractionDirectory);
+ } catch (Exception e) {
+ narClassLoaderException = e;
}
- } else {
- // if connector class name is provided, we need to try to load it as a JAR and as a NAR.
- if (jarClassLoader != null) {
+ // if connector class name is not provided, we can only try to load archive as a NAR
+ if (isEmpty(connectorClassName)) {
+ if (narClassLoader == null) {
+ throw new IllegalArgumentException(String.format("%s package does not have the correct format. " +
+ "Pulsar cannot determine if the package is a NAR package or JAR package. " +
+ "%s classname is not provided and attempts to load it as a NAR package produced " +
+ "the following error.",
+ capFirstLetter(componentType), capFirstLetter(componentType)),
+ narClassLoaderException);
+ }
try {
- jarClassLoader.loadClass(connectorClassName);
- return jarClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- // class not found in JAR try loading as a NAR and searching for the class
- if (narClassLoader != null) {
-
- try {
- narClassLoader.loadClass(connectorClassName);
- return narClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e1) {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path",
- capFirstLetter(componentType), connectorClassName), e1);
- }
+ if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
+ connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
} else {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path", capFirstLetter(componentType),
- connectorClassName), e);
+ connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
}
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
+ componentType.toString().toLowerCase()), e);
}
- } else if (narClassLoader != null) {
+
try {
narClassLoader.loadClass(connectorClassName);
+ keepNarClassLoader = true;
return narClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(
- String.format("%s class %s must be in class path",
- capFirstLetter(componentType), connectorClassName), e1);
+ String.format("%s class %s must be in class path", capFirstLetter(componentType),
+ connectorClassName), e);
}
+
} else {
- StringBuilder errorMsg = new StringBuilder(capFirstLetter(componentType)
- + " package does not have the correct format."
- + " Pulsar cannot determine if the package is a NAR package or JAR package.");
+ // if connector class name is provided, we need to try to load it as a JAR and as a NAR.
+ if (jarClassLoader != null) {
+ try {
+ jarClassLoader.loadClass(connectorClassName);
+ keepJarClassLoader = true;
+ return jarClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ // class not found in JAR try loading as a NAR and searching for the class
+ if (narClassLoader != null) {
+
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ keepNarClassLoader = true;
+ return narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+ throw new IllegalArgumentException(
+ String.format("%s class %s must be in class path",
+ capFirstLetter(componentType), connectorClassName), e1);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("%s class %s must be in class path", capFirstLetter(componentType),
+ connectorClassName), e);
+ }
+ }
+ } else if (narClassLoader != null) {
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ keepNarClassLoader = true;
+ return narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+ throw new IllegalArgumentException(
+ String.format("%s class %s must be in class path",
+ capFirstLetter(componentType), connectorClassName), e1);
+ }
+ } else {
+ StringBuilder errorMsg = new StringBuilder(capFirstLetter(componentType)
+ + " package does not have the correct format."
+ + " Pulsar cannot determine if the package is a NAR package or JAR package.");
- if (jarClassLoaderException != null) {
- errorMsg.append(" Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
- }
+ if (jarClassLoaderException != null) {
+ errorMsg.append(" Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
+ }
- if (narClassLoaderException != null) {
- errorMsg.append(" Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
- }
+ if (narClassLoaderException != null) {
+ errorMsg.append(" Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
+ }
- throw new IllegalArgumentException(errorMsg.toString());
+ throw new IllegalArgumentException(errorMsg.toString());
+ }
+ }
+ } finally {
+ if (!keepJarClassLoader) {
+ ClassLoaderUtils.closeClassLoader(jarClassLoader);
+ }
+ if (!keepNarClassLoader) {
+ ClassLoaderUtils.closeClassLoader(narClassLoader);
}
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 5e055c9..ab69ab9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -732,19 +733,28 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
}
}
- // if sink is not builtin, attempt to extract classloader from package file if it exists
- if (classLoader == null && sinkPackageFile != null) {
- classLoader = getClassLoaderFromPackage(sinkConfig.getClassName(),
- sinkPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
- }
+ boolean shouldCloseClassLoader = false;
+ try {
- if (classLoader == null) {
- throw new IllegalArgumentException("Sink package is not provided");
- }
+ // if sink is not builtin, attempt to extract classloader from package file if it exists
+ if (classLoader == null && sinkPackageFile != null) {
+ classLoader = getClassLoaderFromPackage(sinkConfig.getClassName(),
+ sinkPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
+ shouldCloseClassLoader = true;
+ }
- SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validateAndExtractDetails(
- sinkConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
- return SinkConfigUtils.convert(sinkConfig, sinkDetails);
+ if (classLoader == null) {
+ throw new IllegalArgumentException("Sink package is not provided");
+ }
+
+ SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validateAndExtractDetails(
+ sinkConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
+ return SinkConfigUtils.convert(sinkConfig, sinkDetails);
+ } finally {
+ if (shouldCloseClassLoader) {
+ ClassLoaderUtils.closeClassLoader(classLoader);
+ }
+ }
}
private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 22c53d0..df2dca8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -728,20 +729,28 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
}
}
- // if source is not builtin, attempt to extract classloader from package file if it exists
- if (classLoader == null && sourcePackageFile != null) {
- classLoader = getClassLoaderFromPackage(sourceConfig.getClassName(),
- sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
- }
+ boolean shouldCloseClassLoader = false;
+ try {
+ // if source is not builtin, attempt to extract classloader from package file if it exists
+ if (classLoader == null && sourcePackageFile != null) {
+ classLoader = getClassLoaderFromPackage(sourceConfig.getClassName(),
+ sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
+ shouldCloseClassLoader = true;
+ }
- if (classLoader == null) {
- throw new IllegalArgumentException("Source package is not provided");
- }
+ if (classLoader == null) {
+ throw new IllegalArgumentException("Source package is not provided");
+ }
- SourceConfigUtils.ExtractedSourceDetails sourceDetails
- = SourceConfigUtils.validateAndExtractDetails(
- sourceConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
- return SourceConfigUtils.convert(sourceConfig, sourceDetails);
+ SourceConfigUtils.ExtractedSourceDetails sourceDetails
+ = SourceConfigUtils.validateAndExtractDetails(
+ sourceConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
+ return SourceConfigUtils.convert(sourceConfig, sourceDetails);
+ } finally {
+ if (shouldCloseClassLoader) {
+ ClassLoaderUtils.closeClassLoader(classLoader);
+ }
+ }
}
private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {