You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/24 11:46:15 UTC

[pulsar] branch master updated: [improve][function] Allow not providing the class name when loading a Function nar file (#16079)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bedabc8d98 [improve][function] Allow not providing the class name when loading a Function nar file (#16079)
8bedabc8d98 is described below

commit 8bedabc8d98142e4ba7bbe79a4f20de9bd513ed2
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Jun 24 13:46:06 2022 +0200

    [improve][function] Allow not providing the class name when loading a Function nar file (#16079)
    
    * [improve][function] Allow not providing the class name when loading a nar file
    Currently, not providing the class name works only for builtin Functions.
    This allows it also for files loaded through the API or with a package URL
    
    * Do not get package classloader if runtime is not Java
---
 .../worker/PulsarFunctionLocalRunTest.java         | 14 ++++++++-
 .../apache/pulsar/io/AbstractPulsarE2ETest.java    |  1 +
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 15 +++++++++-
 .../org/apache/pulsar/functions/LocalRunner.java   | 25 ++++++++++++----
 .../pulsar/functions/utils/FunctionCommon.java     |  4 +++
 .../functions/utils/FunctionConfigUtils.java       |  9 ++----
 .../functions/worker/rest/api/FunctionsImpl.java   | 35 +++++++++++++++++-----
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |  6 ++--
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |  8 +++--
 9 files changed, 91 insertions(+), 26 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 388cd687b90..a8344be8dc8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -301,6 +301,7 @@ public class PulsarFunctionLocalRunTest {
         fileServer = new FileServer();
         fileServer.serveFile("/pulsar-io-data-generator.nar", getPulsarIODataGeneratorNar());
         fileServer.serveFile("/pulsar-functions-api-examples.jar", getPulsarApiExamplesJar());
+        fileServer.serveFile("/pulsar-functions-api-examples.nar", getPulsarApiExamplesNar());
         fileServer.start();
     }
 
@@ -443,7 +444,7 @@ public class PulsarFunctionLocalRunTest {
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();
 
         FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
-                jarFilePathUrl != null && jarFilePathUrl.startsWith(Utils.BUILTIN), sourceTopic, sinkTopic, subscriptionName);
+                jarFilePathUrl != null && (jarFilePathUrl.startsWith(Utils.BUILTIN) || jarFilePathUrl.endsWith(".nar")), sourceTopic, sinkTopic, subscriptionName);
         functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
 
         functionConfig.setJar(jarFilePathUrl);
@@ -722,11 +723,22 @@ public class PulsarFunctionLocalRunTest {
         testE2EPulsarFunctionLocalRun(jarFilePathUrl);
     }
 
+    @Test(timeOut = 20000)
+    public void testE2EPulsarFunctionLocalRunWithNar() throws Exception {
+        String jarFilePathUrl = getPulsarApiExamplesNar().toURI().toString();
+        testE2EPulsarFunctionLocalRun(jarFilePathUrl);
+    }
+
     @Test(timeOut = 40000)
     public void testE2EPulsarFunctionLocalRunURL() throws Exception {
         testE2EPulsarFunctionLocalRun(fileServer.getUrl("/pulsar-functions-api-examples.jar"));
     }
 
+    @Test(timeOut = 40000)
+    public void testE2EPulsarFunctionLocalRunNarURL() throws Exception {
+        testE2EPulsarFunctionLocalRun(fileServer.getUrl("/pulsar-functions-api-examples.nar"));
+    }
+
     @Test(timeOut = 20000, groups = "builtin")
     public void testE2EPulsarFunctionLocalRunBuiltin() throws Exception {
         String jarFilePathUrl = String.format("%s://exclamation", Utils.BUILTIN);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
index 1ac908c6ec2..da748bafea5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
@@ -219,6 +219,7 @@ public abstract class AbstractPulsarE2ETest {
         fileServer.serveFile("/pulsar-io-data-generator.nar", getPulsarIODataGeneratorNar());
         fileServer.serveFile("/pulsar-io-batch-data-generator.nar", getPulsarIOBatchDataGeneratorNar());
         fileServer.serveFile("/pulsar-functions-api-examples.jar", getPulsarApiExamplesJar());
+        fileServer.serveFile("/pulsar-functions-api-examples.nar", getPulsarApiExamplesNar());
         fileServer.start();
 
         Awaitility.await().until(() -> functionsWorkerService.getLeaderService().isLeader());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index e68a556524c..32ea853f269 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.io;
 
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarApiExamplesJar;
+import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarApiExamplesNar;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -124,7 +125,8 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic2).subscriptionName("sub").subscribe();
 
         FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
-                jarFilePathUrl.startsWith(Utils.BUILTIN), "my.*", sinkTopic, subscriptionName);
+                jarFilePathUrl.startsWith(Utils.BUILTIN) || jarFilePathUrl.endsWith(".nar"), "my.*",
+                sinkTopic, subscriptionName);
         functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
 
         if (jarFilePathUrl.startsWith(Utils.BUILTIN)) {
@@ -196,11 +198,22 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
         testE2EPulsarFunction(jarFilePathUrl);
     }
 
+    @Test(timeOut = 20000)
+    public void testE2EPulsarFunctionWithNarFile() throws Exception {
+        String jarFilePathUrl = getPulsarApiExamplesNar().toURI().toString();
+        testE2EPulsarFunction(jarFilePathUrl);
+    }
+
     @Test(timeOut = 40000)
     public void testE2EPulsarFunctionWithUrl() throws Exception {
         testE2EPulsarFunction(fileServer.getUrl("/pulsar-functions-api-examples.jar"));
     }
 
+    @Test(timeOut = 40000)
+    public void testE2EPulsarFunctionWithNarUrl() throws Exception {
+        testE2EPulsarFunction(fileServer.getUrl("/pulsar-functions-api-examples.nar"));
+    }
+
     @Test(timeOut = 20000, groups = "builtin")
     public void testPulsarFunctionBuiltin() throws Exception {
         String jarFilePathUrl = String.format("%s://exclamation", Utils.BUILTIN);
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index e83653100ad..2b9b25ed743 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -345,23 +345,38 @@ public class LocalRunner implements AutoCloseable {
                         functionDetails = FunctionConfigUtils.convert(
                                 functionConfig,
                                 FunctionConfigUtils.validateJavaFunction(functionConfig, builtInFunctionClassLoader));
-                    } else if (Utils.isFunctionPackageUrlSupported(userCodeFile)) {
+                    } else if (userCodeFile != null && Utils.isFunctionPackageUrlSupported(userCodeFile)) {
                         File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
-                        userCodeClassLoader = FunctionConfigUtils.validate(functionConfig, file);
+                        ClassLoader functionClassLoader = FunctionCommon.getClassLoaderFromPackage(
+                                Function.FunctionDetails.ComponentType.FUNCTION,
+                                functionConfig.getClassName(), file, narExtractionDirectory);
+                        functionDetails = FunctionConfigUtils.convert(
+                                functionConfig,
+                                FunctionConfigUtils.validateJavaFunction(functionConfig, functionClassLoader));
+                        userCodeClassLoader = functionClassLoader;
                         userCodeClassLoaderCreated = true;
                     } else if (userCodeFile != null) {
                         File file = new File(userCodeFile);
                         if (!file.exists()) {
                             throw new RuntimeException("User jar does not exist");
                         }
-                        userCodeClassLoader = FunctionConfigUtils.validate(functionConfig, file);
+                        ClassLoader functionClassLoader = FunctionCommon.getClassLoaderFromPackage(
+                                Function.FunctionDetails.ComponentType.FUNCTION,
+                                functionConfig.getClassName(), file, narExtractionDirectory);
+                        functionDetails = FunctionConfigUtils.convert(
+                                functionConfig,
+                                FunctionConfigUtils.validateJavaFunction(functionConfig, functionClassLoader));
+                        userCodeClassLoader = functionClassLoader;
                         userCodeClassLoaderCreated = true;
                     } else {
                         if (!(runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
                             throw new IllegalStateException("The jar property must be specified in FunctionConfig.");
                         }
-                        FunctionConfigUtils.validateJavaFunction(functionConfig, Thread.currentThread()
-                                .getContextClassLoader());
+                        functionDetails = FunctionConfigUtils.convert(
+                                functionConfig,
+                                FunctionConfigUtils.validateJavaFunction(
+                                        functionConfig,
+                                        Thread.currentThread().getContextClassLoader()));
                     }
                 } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
                     userCodeFile = functionConfig.getGo();
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index e4233acd4c4..8174b9722cf 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.WindowFunction;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.io.core.BatchSource;
 import org.apache.pulsar.io.core.Sink;
@@ -438,6 +439,9 @@ public class FunctionCommon {
                 }
                 try {
                     if (componentType
+                            == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.FUNCTION) {
+                        connectorClassName = FunctionUtils.getFunctionClass((NarClassLoader) narClassLoader);
+                    } else if (componentType
                             == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
                         connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
                     } else {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 08dddf3dbfa..69b4dcfd647 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -701,10 +701,6 @@ public class FunctionConfigUtils {
         }
     }
 
-    private static boolean isBuiltin(FunctionConfig functionConfig) {
-        return functionConfig.getJar() != null && functionConfig.getJar().startsWith("builtin://");
-    }
-
     private static void doCommonChecks(FunctionConfig functionConfig) {
         if (isEmpty(functionConfig.getTenant())) {
             throw new IllegalArgumentException("Function tenant cannot be null");
@@ -715,9 +711,8 @@ public class FunctionConfigUtils {
         if (isEmpty(functionConfig.getName())) {
             throw new IllegalArgumentException("Function name cannot be null");
         }
-        // go doesn't need className
-        if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON
-                || (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA && !isBuiltin(functionConfig))) {
+        // go doesn't need className. Java className is done in doJavaChecks.
+        if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
             if (isEmpty(functionConfig.getClassName())) {
                 throw new IllegalArgumentException("Function classname cannot be null");
             }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 4a765cc4056..466071c490e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -788,6 +788,8 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
                 functionConfig, worker().getWorkerConfig().isForwardSourceMessageProperty());
 
         String archive = functionConfig.getJar();
+        ClassLoader classLoader = null;
+        // check if function is builtin and extract classloader
         if (!StringUtils.isEmpty(archive)) {
             if (archive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
                 archive = archive.replaceFirst("^builtin://", "");
@@ -799,17 +801,36 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
                 if (function == null) {
                     throw new IllegalArgumentException(String.format("No Function %s found", archive));
                 }
-                return FunctionConfigUtils.convert(
-                        functionConfig,
-                        FunctionConfigUtils.validateJavaFunction(functionConfig, function.getClassLoader()));
+                classLoader = function.getClassLoader();
             }
         }
-        ClassLoader clsLoader = null;
+        boolean shouldCloseClassLoader = false;
         try {
-            clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
-            return FunctionConfigUtils.convert(functionConfig, clsLoader);
+
+            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+                // if function is not builtin, attempt to extract classloader from package file if it exists
+                if (classLoader == null && componentPackageFile != null) {
+                    classLoader = getClassLoaderFromPackage(functionConfig.getClassName(),
+                            componentPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
+                    shouldCloseClassLoader = true;
+                }
+
+                if (classLoader == null) {
+                    throw new IllegalArgumentException("Function package is not provided");
+                }
+
+                FunctionConfigUtils.ExtractedFunctionDetails functionDetails = FunctionConfigUtils.validateJavaFunction(
+                        functionConfig, classLoader);
+                return FunctionConfigUtils.convert(functionConfig, functionDetails);
+            } else {
+                classLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
+                shouldCloseClassLoader = true;
+                return FunctionConfigUtils.convert(functionConfig, classLoader);
+            }
         } finally {
-            ClassLoaderUtils.closeClassLoader(clsLoader);
+            if (shouldCloseClassLoader) {
+                ClassLoaderUtils.closeClassLoader(classLoader);
+            }
         }
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index ca405c5736a..306901d436e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -293,7 +293,7 @@ public class FunctionApiV2ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Package is not provided")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function package is not provided")
     public void testRegisterFunctionMissingPackage() {
         try {
             testRegisterFunctionMissingArguments(
@@ -356,7 +356,9 @@ public class FunctionApiV2ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function classname cannot be null")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function package does not have"
+            + " the correct format. Pulsar cannot determine if the package is a NAR package or JAR package. Function "
+            + "classname is not provided and attempts to load it as a NAR package produced the following error.*")
     public void testRegisterFunctionMissingClassName() {
         try {
             testRegisterFunctionMissingArguments(
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 288001a2964..2edf48c95a9 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -298,7 +298,7 @@ public class FunctionApiV3ResourceTest {
     }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Package is not provided")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function package is not provided")
     public void testRegisterFunctionMissingPackage() {
         try {
             testRegisterFunctionMissingArguments(
@@ -326,7 +326,7 @@ public class FunctionApiV3ResourceTest {
                     tenant,
                     namespace,
                     function,
-                    null,
+                    mockedInputStream,
                     null,
                     mockedFormData,
                     outputTopic,
@@ -361,7 +361,9 @@ public class FunctionApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function classname cannot be null")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function package does not have"
+            + " the correct format. Pulsar cannot determine if the package is a NAR package or JAR package. Function "
+            + "classname is not provided and attempts to load it as a NAR package produced the following error.*")
     public void testRegisterFunctionMissingClassName() {
         try {
             testRegisterFunctionMissingArguments(