You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/24 11:46:15 UTC
[pulsar] branch master updated: [improve][function] Allow not providing the class name when loading a Function nar file (#16079)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8bedabc8d98 [improve][function] Allow not providing the class name when loading a Function nar file (#16079)
8bedabc8d98 is described below
commit 8bedabc8d98142e4ba7bbe79a4f20de9bd513ed2
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Jun 24 13:46:06 2022 +0200
[improve][function] Allow not providing the class name when loading a Function nar file (#16079)
* [improve][function] Allow not providing the class name when loading a nar file
Currently, not providing the class name works only for builtin Functions.
This allows it also for files loaded through the API or with a package URL
* Do not get package classloader if runtime is not Java
---
.../worker/PulsarFunctionLocalRunTest.java | 14 ++++++++-
.../apache/pulsar/io/AbstractPulsarE2ETest.java | 1 +
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 15 +++++++++-
.../org/apache/pulsar/functions/LocalRunner.java | 25 ++++++++++++----
.../pulsar/functions/utils/FunctionCommon.java | 4 +++
.../functions/utils/FunctionConfigUtils.java | 9 ++----
.../functions/worker/rest/api/FunctionsImpl.java | 35 +++++++++++++++++-----
.../rest/api/v2/FunctionApiV2ResourceTest.java | 6 ++--
.../rest/api/v3/FunctionApiV3ResourceTest.java | 8 +++--
9 files changed, 91 insertions(+), 26 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 388cd687b90..a8344be8dc8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -301,6 +301,7 @@ public class PulsarFunctionLocalRunTest {
fileServer = new FileServer();
fileServer.serveFile("/pulsar-io-data-generator.nar", getPulsarIODataGeneratorNar());
fileServer.serveFile("/pulsar-functions-api-examples.jar", getPulsarApiExamplesJar());
+ fileServer.serveFile("/pulsar-functions-api-examples.nar", getPulsarApiExamplesNar());
fileServer.start();
}
@@ -443,7 +444,7 @@ public class PulsarFunctionLocalRunTest {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();
FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
- jarFilePathUrl != null && jarFilePathUrl.startsWith(Utils.BUILTIN), sourceTopic, sinkTopic, subscriptionName);
+ jarFilePathUrl != null && (jarFilePathUrl.startsWith(Utils.BUILTIN) || jarFilePathUrl.endsWith(".nar")), sourceTopic, sinkTopic, subscriptionName);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setJar(jarFilePathUrl);
@@ -722,11 +723,22 @@ public class PulsarFunctionLocalRunTest {
testE2EPulsarFunctionLocalRun(jarFilePathUrl);
}
+ @Test(timeOut = 20000)
+ public void testE2EPulsarFunctionLocalRunWithNar() throws Exception {
+ String jarFilePathUrl = getPulsarApiExamplesNar().toURI().toString();
+ testE2EPulsarFunctionLocalRun(jarFilePathUrl);
+ }
+
@Test(timeOut = 40000)
public void testE2EPulsarFunctionLocalRunURL() throws Exception {
testE2EPulsarFunctionLocalRun(fileServer.getUrl("/pulsar-functions-api-examples.jar"));
}
+ @Test(timeOut = 40000)
+ public void testE2EPulsarFunctionLocalRunNarURL() throws Exception {
+ testE2EPulsarFunctionLocalRun(fileServer.getUrl("/pulsar-functions-api-examples.nar"));
+ }
+
@Test(timeOut = 20000, groups = "builtin")
public void testE2EPulsarFunctionLocalRunBuiltin() throws Exception {
String jarFilePathUrl = String.format("%s://exclamation", Utils.BUILTIN);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
index 1ac908c6ec2..da748bafea5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
@@ -219,6 +219,7 @@ public abstract class AbstractPulsarE2ETest {
fileServer.serveFile("/pulsar-io-data-generator.nar", getPulsarIODataGeneratorNar());
fileServer.serveFile("/pulsar-io-batch-data-generator.nar", getPulsarIOBatchDataGeneratorNar());
fileServer.serveFile("/pulsar-functions-api-examples.jar", getPulsarApiExamplesJar());
+ fileServer.serveFile("/pulsar-functions-api-examples.nar", getPulsarApiExamplesNar());
fileServer.start();
Awaitility.await().until(() -> functionsWorkerService.getLeaderService().isLeader());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index e68a556524c..32ea853f269 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.io;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarApiExamplesJar;
+import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarApiExamplesNar;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -124,7 +125,8 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic2).subscriptionName("sub").subscribe();
FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
- jarFilePathUrl.startsWith(Utils.BUILTIN), "my.*", sinkTopic, subscriptionName);
+ jarFilePathUrl.startsWith(Utils.BUILTIN) || jarFilePathUrl.endsWith(".nar"), "my.*",
+ sinkTopic, subscriptionName);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
if (jarFilePathUrl.startsWith(Utils.BUILTIN)) {
@@ -196,11 +198,22 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
testE2EPulsarFunction(jarFilePathUrl);
}
+ @Test(timeOut = 20000)
+ public void testE2EPulsarFunctionWithNarFile() throws Exception {
+ String jarFilePathUrl = getPulsarApiExamplesNar().toURI().toString();
+ testE2EPulsarFunction(jarFilePathUrl);
+ }
+
@Test(timeOut = 40000)
public void testE2EPulsarFunctionWithUrl() throws Exception {
testE2EPulsarFunction(fileServer.getUrl("/pulsar-functions-api-examples.jar"));
}
+ @Test(timeOut = 40000)
+ public void testE2EPulsarFunctionWithNarUrl() throws Exception {
+ testE2EPulsarFunction(fileServer.getUrl("/pulsar-functions-api-examples.nar"));
+ }
+
@Test(timeOut = 20000, groups = "builtin")
public void testPulsarFunctionBuiltin() throws Exception {
String jarFilePathUrl = String.format("%s://exclamation", Utils.BUILTIN);
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index e83653100ad..2b9b25ed743 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -345,23 +345,38 @@ public class LocalRunner implements AutoCloseable {
functionDetails = FunctionConfigUtils.convert(
functionConfig,
FunctionConfigUtils.validateJavaFunction(functionConfig, builtInFunctionClassLoader));
- } else if (Utils.isFunctionPackageUrlSupported(userCodeFile)) {
+ } else if (userCodeFile != null && Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
- userCodeClassLoader = FunctionConfigUtils.validate(functionConfig, file);
+ ClassLoader functionClassLoader = FunctionCommon.getClassLoaderFromPackage(
+ Function.FunctionDetails.ComponentType.FUNCTION,
+ functionConfig.getClassName(), file, narExtractionDirectory);
+ functionDetails = FunctionConfigUtils.convert(
+ functionConfig,
+ FunctionConfigUtils.validateJavaFunction(functionConfig, functionClassLoader));
+ userCodeClassLoader = functionClassLoader;
userCodeClassLoaderCreated = true;
} else if (userCodeFile != null) {
File file = new File(userCodeFile);
if (!file.exists()) {
throw new RuntimeException("User jar does not exist");
}
- userCodeClassLoader = FunctionConfigUtils.validate(functionConfig, file);
+ ClassLoader functionClassLoader = FunctionCommon.getClassLoaderFromPackage(
+ Function.FunctionDetails.ComponentType.FUNCTION,
+ functionConfig.getClassName(), file, narExtractionDirectory);
+ functionDetails = FunctionConfigUtils.convert(
+ functionConfig,
+ FunctionConfigUtils.validateJavaFunction(functionConfig, functionClassLoader));
+ userCodeClassLoader = functionClassLoader;
userCodeClassLoaderCreated = true;
} else {
if (!(runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
throw new IllegalStateException("The jar property must be specified in FunctionConfig.");
}
- FunctionConfigUtils.validateJavaFunction(functionConfig, Thread.currentThread()
- .getContextClassLoader());
+ functionDetails = FunctionConfigUtils.convert(
+ functionConfig,
+ FunctionConfigUtils.validateJavaFunction(
+ functionConfig,
+ Thread.currentThread().getContextClassLoader()));
}
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
userCodeFile = functionConfig.getGo();
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index e4233acd4c4..8174b9722cf 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowFunction;
import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Sink;
@@ -438,6 +439,9 @@ public class FunctionCommon {
}
try {
if (componentType
+ == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.FUNCTION) {
+ connectorClassName = FunctionUtils.getFunctionClass((NarClassLoader) narClassLoader);
+ } else if (componentType
== org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
} else {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 08dddf3dbfa..69b4dcfd647 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -701,10 +701,6 @@ public class FunctionConfigUtils {
}
}
- private static boolean isBuiltin(FunctionConfig functionConfig) {
- return functionConfig.getJar() != null && functionConfig.getJar().startsWith("builtin://");
- }
-
private static void doCommonChecks(FunctionConfig functionConfig) {
if (isEmpty(functionConfig.getTenant())) {
throw new IllegalArgumentException("Function tenant cannot be null");
@@ -715,9 +711,8 @@ public class FunctionConfigUtils {
if (isEmpty(functionConfig.getName())) {
throw new IllegalArgumentException("Function name cannot be null");
}
- // go doesn't need className
- if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON
- || (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA && !isBuiltin(functionConfig))) {
+ // go doesn't need className. Java className is done in doJavaChecks.
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
if (isEmpty(functionConfig.getClassName())) {
throw new IllegalArgumentException("Function classname cannot be null");
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 4a765cc4056..466071c490e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -788,6 +788,8 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
functionConfig, worker().getWorkerConfig().isForwardSourceMessageProperty());
String archive = functionConfig.getJar();
+ ClassLoader classLoader = null;
+ // check if function is builtin and extract classloader
if (!StringUtils.isEmpty(archive)) {
if (archive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
archive = archive.replaceFirst("^builtin://", "");
@@ -799,17 +801,36 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
if (function == null) {
throw new IllegalArgumentException(String.format("No Function %s found", archive));
}
- return FunctionConfigUtils.convert(
- functionConfig,
- FunctionConfigUtils.validateJavaFunction(functionConfig, function.getClassLoader()));
+ classLoader = function.getClassLoader();
}
}
- ClassLoader clsLoader = null;
+ boolean shouldCloseClassLoader = false;
try {
- clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
- return FunctionConfigUtils.convert(functionConfig, clsLoader);
+
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+ // if function is not builtin, attempt to extract classloader from package file if it exists
+ if (classLoader == null && componentPackageFile != null) {
+ classLoader = getClassLoaderFromPackage(functionConfig.getClassName(),
+ componentPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
+ shouldCloseClassLoader = true;
+ }
+
+ if (classLoader == null) {
+ throw new IllegalArgumentException("Function package is not provided");
+ }
+
+ FunctionConfigUtils.ExtractedFunctionDetails functionDetails = FunctionConfigUtils.validateJavaFunction(
+ functionConfig, classLoader);
+ return FunctionConfigUtils.convert(functionConfig, functionDetails);
+ } else {
+ classLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
+ shouldCloseClassLoader = true;
+ return FunctionConfigUtils.convert(functionConfig, classLoader);
+ }
} finally {
- ClassLoaderUtils.closeClassLoader(clsLoader);
+ if (shouldCloseClassLoader) {
+ ClassLoaderUtils.closeClassLoader(classLoader);
+ }
}
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index ca405c5736a..306901d436e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -293,7 +293,7 @@ public class FunctionApiV2ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Package is not provided")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function package is not provided")
public void testRegisterFunctionMissingPackage() {
try {
testRegisterFunctionMissingArguments(
@@ -356,7 +356,9 @@ public class FunctionApiV2ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function classname cannot be null")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function package does not have"
+ + " the correct format. Pulsar cannot determine if the package is a NAR package or JAR package. Function "
+ + "classname is not provided and attempts to load it as a NAR package produced the following error.*")
public void testRegisterFunctionMissingClassName() {
try {
testRegisterFunctionMissingArguments(
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 288001a2964..2edf48c95a9 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -298,7 +298,7 @@ public class FunctionApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Package is not provided")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function package is not provided")
public void testRegisterFunctionMissingPackage() {
try {
testRegisterFunctionMissingArguments(
@@ -326,7 +326,7 @@ public class FunctionApiV3ResourceTest {
tenant,
namespace,
function,
- null,
+ mockedInputStream,
null,
mockedFormData,
outputTopic,
@@ -361,7 +361,9 @@ public class FunctionApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function classname cannot be null")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function package does not have"
+ + " the correct format. Pulsar cannot determine if the package is a NAR package or JAR package. Function "
+ + "classname is not provided and attempts to load it as a NAR package produced the following error.*")
public void testRegisterFunctionMissingClassName() {
try {
testRegisterFunctionMissingArguments(