You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/09 12:08:12 UTC

[pulsar] branch master updated: [improve][function] Get function class name from the NAR when using built-in functions (#15693)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b3cd139335e [improve][function] Get function class name from the NAR when using built-in functions (#15693)
b3cd139335e is described below

commit b3cd139335e65c6161abd7653e2c4c0cb61c983f
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Thu Jun 9 14:08:04 2022 +0200

    [improve][function] Get function class name from the NAR when using built-in functions (#15693)
    
    * Get function class name from the NAR manifest when using built-in functions
    
    * Exclude pulsar-io.yaml from examples JAR
    
    * Build a NAR separately from the JAR for the function examples
    
    * Improve admin cli tes for built-in functions
---
 pulsar-broker/pom.xml                              | 18 ++++
 .../worker/PulsarFunctionLocalRunTest.java         | 31 ++++++-
 .../PulsarFunctionTestTemporaryDirectory.java      |  8 +-
 .../apache/pulsar/io/AbstractPulsarE2ETest.java    |  6 ++
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 47 ++++++++---
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  |  1 -
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 15 ++--
 .../org/apache/pulsar/common/functions/Utils.java  | 13 +--
 pulsar-functions/java-examples-builtin/pom.xml     | 50 +++++++++++
 .../resources/META-INF/services/pulsar-io.yaml     | 22 +++++
 .../org/apache/pulsar/functions/LocalRunner.java   | 82 ++++++++++--------
 pulsar-functions/pom.xml                           |  2 +
 .../pulsar/functions/utils/FunctionCommon.java     |  6 +-
 .../functions/utils/FunctionConfigUtils.java       | 96 ++++++++++++++++------
 .../{Functions.java => FunctionArchive.java}       | 13 ++-
 .../functions/utils/functions/FunctionUtils.java   | 46 ++++++-----
 .../functions/utils/FunctionConfigUtilsTest.java   |  8 +-
 .../pulsar/functions/worker/FunctionsManager.java  | 13 ++-
 .../functions/worker/rest/api/ComponentImpl.java   |  8 +-
 .../functions/worker/rest/api/FunctionsImpl.java   | 45 +++++-----
 .../worker/rest/api/FunctionsImplTest.java         |  2 +-
 .../rest/api/v2/FunctionApiV2ResourceTest.java     | 16 ++--
 .../rest/api/v3/FunctionApiV3ResourceTest.java     | 14 ++--
 23 files changed, 397 insertions(+), 165 deletions(-)

diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index c6d55b5f5f9..94702e2427c 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -320,6 +320,14 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-api-examples-builtin</artifactId>
+      <version>${project.version}</version>
+      <type>pom</type>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-batch-data-generator</artifactId>
@@ -460,6 +468,15 @@
                   <outputDirectory>${project.build.directory}</outputDirectory>
                   <destFileName>pulsar-functions-api-examples.jar</destFileName>
                 </artifactItem>
+                <artifactItem>
+                  <groupId>${project.groupId}</groupId>
+                  <artifactId>pulsar-functions-api-examples-builtin</artifactId>
+                  <version>${project.version}</version>
+                  <type>jar</type>
+                  <overWrite>true</overWrite>
+                  <outputDirectory>${project.build.directory}</outputDirectory>
+                  <destFileName>pulsar-functions-api-examples.nar</destFileName>
+                </artifactItem>
               </artifactItems>
             </configuration>
           </execution>
@@ -473,6 +490,7 @@
           <systemPropertyVariables>
             <pulsar-io-data-generator.nar.path>${project.build.directory}/pulsar-io-data-generator.nar</pulsar-io-data-generator.nar.path>
             <pulsar-functions-api-examples.jar.path>${project.build.directory}/pulsar-functions-api-examples.jar</pulsar-functions-api-examples.jar.path>
+            <pulsar-functions-api-examples.nar.path>${project.build.directory}/pulsar-functions-api-examples.nar</pulsar-functions-api-examples.nar.path>
             <pulsar-io-batch-data-generator.nar.path>${project.build.directory}/pulsar-io-batch-data-generator.nar</pulsar-io-batch-data-generator.nar.path>
             <!-- workaround issue #13750 which gets triggered if org.apache.bookkeeper.meta.MetadataDrivers class gets loaded before org.apache.pulsar.metadata.bookkeeper.BKCluster constructor is called -->
             <bookkeeper.metadata.bookie.drivers>org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver</bookkeeper.metadata.bookie.drivers>
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 8cb067762da..388cd687b90 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -147,6 +147,16 @@ public class PulsarFunctionLocalRunTest {
                         + SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_JAR_FILE_PATH + " system property"));
     }
 
+    private static final String SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH =
+            "pulsar-functions-api-examples.nar.path";
+
+    public static File getPulsarApiExamplesNar() {
+        return new File(Objects.requireNonNull(
+                System.getProperty(SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH)
+                , "pulsar-functions-api-examples.nar file location must be specified with "
+                        + SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH + " system property"));
+    }
+
     private static final String SYSTEM_PROPERTY_NAME_BATCH_NAR_FILE_PATH = "pulsar-io-batch-data-generator.nar.path";
 
     public static File getPulsarIOBatchDataGeneratorNar() {
@@ -235,6 +245,11 @@ public class PulsarFunctionLocalRunTest {
 
             File file = getPulsarIODataGeneratorNar();
             Files.copy(file.toPath(), new File(connectorsDir, file.getName()).toPath());
+
+            File functionsDir = new File(workerConfig.getFunctionsDirectory());
+
+            file = getPulsarApiExamplesNar();
+            Files.copy(file.toPath(), new File(functionsDir, file.getName()).toPath());
         }
 
         Optional<WorkerService> functionWorkerService = Optional.empty();
@@ -350,6 +365,7 @@ public class PulsarFunctionLocalRunTest {
     protected static FunctionConfig createFunctionConfig(String tenant,
                                                          String namespace,
                                                          String functionName,
+                                                         boolean isBuiltin,
                                                          String sourceTopic,
                                                          String sinkTopic,
                                                          String subscriptionName) {
@@ -363,7 +379,9 @@ public class PulsarFunctionLocalRunTest {
         functionConfig.setSubName(subscriptionName);
         functionConfig.setInputs(Collections.singleton(sourceTopic));
         functionConfig.setAutoAck(true);
-        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+        if (!isBuiltin) {
+            functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+        }
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
         functionConfig.setOutput(sinkTopic);
         functionConfig.setCleanupSubscription(true);
@@ -425,7 +443,7 @@ public class PulsarFunctionLocalRunTest {
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();
 
         FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
-                sourceTopic, sinkTopic, subscriptionName);
+                jarFilePathUrl != null && jarFilePathUrl.startsWith(Utils.BUILTIN), sourceTopic, sinkTopic, subscriptionName);
         functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
 
         functionConfig.setJar(jarFilePathUrl);
@@ -441,6 +459,7 @@ public class PulsarFunctionLocalRunTest {
                 .tlsAllowInsecureConnection(true)
                 .tlsHostNameVerificationEnabled(false)
                 .metricsPortStart(metricsPort)
+                .functionsDirectory(workerConfig.getFunctionsDirectory())
                 .brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
         localRunner.start(false);
 
@@ -594,7 +613,7 @@ public class PulsarFunctionLocalRunTest {
         Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(sinkTopic).subscriptionName("sub").subscribe();
 
         FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
-                sourceTopic, sinkTopic, subscriptionName);
+                jarFilePathUrl != null && jarFilePathUrl.startsWith(Utils.BUILTIN), sourceTopic, sinkTopic, subscriptionName);
         //set jsr310ConversionEnabled、alwaysAllowNull
         Map<String,String> schemaInput = new HashMap<>();
         schemaInput.put(sourceTopic, "{\"schemaType\":\"AVRO\",\"schemaProperties\":{\"__jsr310ConversionEnabled\":\"true\",\"__alwaysAllowNull\":\"true\"}}");
@@ -708,6 +727,12 @@ public class PulsarFunctionLocalRunTest {
         testE2EPulsarFunctionLocalRun(fileServer.getUrl("/pulsar-functions-api-examples.jar"));
     }
 
+    @Test(timeOut = 20000, groups = "builtin")
+    public void testE2EPulsarFunctionLocalRunBuiltin() throws Exception {
+        String jarFilePathUrl = String.format("%s://exclamation", Utils.BUILTIN);
+        testE2EPulsarFunctionLocalRun(jarFilePathUrl);
+    }
+
     @Test(timeOut = 40000)
     public void testE2EPulsarFunctionLocalRunMultipleInstances() throws Throwable {
         runWithPulsarFunctionsClassLoader(() -> testE2EPulsarFunctionLocalRun(null, 2));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestTemporaryDirectory.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestTemporaryDirectory.java
index b97d2a1203d..156a4d945e6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestTemporaryDirectory.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestTemporaryDirectory.java
@@ -27,8 +27,8 @@ import org.apache.commons.io.FileUtils;
 import org.testng.Assert;
 
 /**
- * Creates a temporary directory that contains 3 subdirectories,
- * "narExtractionDirectory", "downloadDirectory" and "connectorsDirectory",
+ * Creates a temporary directory that contains 4 subdirectories,
+ * "narExtractionDirectory", "downloadDirectory", "connectorsDirectory" and "functionsDirectory",
  * which are assigned to the provided workerConfig's respective settings with
  * the {@link #useTemporaryDirectoriesForWorkerConfig(WorkerConfig)} method
  */
@@ -37,6 +37,7 @@ public class PulsarFunctionTestTemporaryDirectory {
     private final File narExtractionDirectory;
     private final File downloadDirectory;
     private final File connectorsDirectory;
+    private final File functionsDirectory;
 
     private PulsarFunctionTestTemporaryDirectory(String tempDirectoryNamePrefix) throws IOException {
         tempDirectory = Files.createTempDirectory(tempDirectoryNamePrefix).toFile();
@@ -46,6 +47,8 @@ public class PulsarFunctionTestTemporaryDirectory {
         downloadDirectory.mkdir();
         connectorsDirectory = new File(tempDirectory, "connectorsDirectory");
         connectorsDirectory.mkdir();
+        functionsDirectory = new File(tempDirectory, "functionsDirectory");
+        functionsDirectory.mkdir();
     }
 
     public static PulsarFunctionTestTemporaryDirectory create(String tempDirectoryNamePrefix) {
@@ -60,6 +63,7 @@ public class PulsarFunctionTestTemporaryDirectory {
         workerConfig.setNarExtractionDirectory(narExtractionDirectory.getAbsolutePath());
         workerConfig.setDownloadDirectory(downloadDirectory.getAbsolutePath());
         workerConfig.setConnectorsDirectory(connectorsDirectory.getAbsolutePath());
+        workerConfig.setFunctionsDirectory(functionsDirectory.getAbsolutePath());
     }
 
     public void delete() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
index 7c0f2f0bbe1..1ac908c6ec2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
 import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarApiExamplesJar;
+import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarApiExamplesNar;
 import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarIOBatchDataGeneratorNar;
 import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar;
 import static org.mockito.Mockito.spy;
@@ -158,6 +159,11 @@ public abstract class AbstractPulsarE2ETest {
 
             file = getPulsarIOBatchDataGeneratorNar();
             Files.copy(file.toPath(), new File(connectorsDir, file.getName()).toPath());
+
+            File functionsDir = new File(workerConfig.getFunctionsDirectory());
+
+            file = getPulsarApiExamplesNar();
+            Files.copy(file.toPath(), new File(functionsDir, file.getName()).toPath());
         }
 
         Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 564fa269524..e68a556524c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
 import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
@@ -76,7 +77,7 @@ import org.testng.annotations.Test;
 @Test(groups = "broker-io")
 public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
 
-    protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
+    protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, boolean isBuiltin, String sourceTopic, String sinkTopic, String subscriptionName) {
         FunctionConfig functionConfig = new FunctionConfig();
         functionConfig.setTenant(tenant);
         functionConfig.setNamespace(namespace);
@@ -89,7 +90,9 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
             functionConfig.setTopicsPattern(sourceTopicPattern);
         }
         functionConfig.setAutoAck(true);
-        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+        if (!isBuiltin) {
+            functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+        }
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
         functionConfig.setOutput(sinkTopic);
         functionConfig.setCleanupSubscription(true);
@@ -121,14 +124,26 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic2).subscriptionName("sub").subscribe();
 
         FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
-                "my.*", sinkTopic, subscriptionName);
+                jarFilePathUrl.startsWith(Utils.BUILTIN), "my.*", sinkTopic, subscriptionName);
         functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
-        admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+        if (jarFilePathUrl.startsWith(Utils.BUILTIN)) {
+            functionConfig.setJar(jarFilePathUrl);
+            admin.functions().createFunction(functionConfig, null);
+        } else {
+            admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+        }
 
         // try to update function to test: update-function functionality
         functionConfig.setParallelism(2);
         functionConfig.setOutput(sinkTopic2);
-        admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+        if (jarFilePathUrl.startsWith(Utils.BUILTIN)) {
+            functionConfig.setJar(jarFilePathUrl);
+            admin.functions().updateFunction(functionConfig, null);
+        } else {
+            admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
+        }
 
         Awaitility.await().ignoreExceptions().untilAsserted(() -> {
             TopicStats topicStats = admin.topics().getStats(sinkTopic2);
@@ -186,6 +201,18 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
         testE2EPulsarFunction(fileServer.getUrl("/pulsar-functions-api-examples.jar"));
     }
 
+    @Test(timeOut = 20000, groups = "builtin")
+    public void testPulsarFunctionBuiltin() throws Exception {
+        String jarFilePathUrl = String.format("%s://exclamation", Utils.BUILTIN);
+        testE2EPulsarFunction(jarFilePathUrl);
+    }
+
+    @Test(timeOut = 20000, groups = "builtin", expectedExceptions = {PulsarAdminException.class}, expectedExceptionsMessageRegExp = "No Function foo found")
+    public void testPulsarFunctionBuiltinDoesNotExist() throws Exception {
+        String jarFilePathUrl = String.format("%s://foo", Utils.BUILTIN);
+        testE2EPulsarFunction(jarFilePathUrl);
+    }
+
     @Test(timeOut = 30000)
     public void testReadCompactedFunction() throws Exception {
         final String namespacePortion = "io";
@@ -225,7 +252,7 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
 
         // 4 Setup function
         // set source topic to null because we are setting the topic information separately
-        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, false,
                 null, sinkTopic, subscriptionName);
         Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
         ConsumerConfig consumerConfig = new ConsumerConfig();
@@ -275,7 +302,7 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
 
         String jarFilePathUrl = getPulsarApiExamplesJar().toURI().toString();
-        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, false,
                 "my.*", sinkTopic, subscriptionName);
         admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
 
@@ -611,7 +638,7 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
 
         String jarFilePathUrl = getPulsarApiExamplesJar().toURI().toString();
-        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, false,
                 "my.*", sinkTopic, subscriptionName);
         admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
 
@@ -693,7 +720,7 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
         admin.tenants().updateTenant(tenant, propAdmin);
 
         String jarFilePathUrl = getPulsarApiExamplesJar().toURI().toString();
-        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, false,
                 "my.*", sinkTopic, subscriptionName);
         if (!validRoleName) {
             // create a non-superuser admin to test the api
@@ -735,7 +762,7 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
 
         String jarFilePathUrl = getPulsarApiExamplesJar().toURI().toString();
-        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, false,
                 sourceTopicName, sinkTopic, subscriptionName);
         admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
 
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index efb5ae8ad64..faa94ebef5a 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -436,7 +436,6 @@ public class CmdFunctionsTest {
                 "--jar", BUILTIN_NAR,
                 "--tenant", "sample",
                 "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
         });
 
         CreateFunction creater = cmd.getCreater();
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index a9c83a2cd09..61bf9c438d4 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -643,7 +643,8 @@ public class CmdFunctions extends CmdBase {
 
         protected void validateFunctionConfigs(FunctionConfig functionConfig) {
             // go doesn't need className
-            if (functionConfig.getPy() != null || functionConfig.getJar() != null) {
+            if (functionConfig.getPy() != null
+                    || (functionConfig.getJar() != null && !functionConfig.getJar().startsWith("builtin://"))) {
                 if (StringUtils.isEmpty(functionConfig.getClassName())) {
                     throw new ParameterException("No Function Classname specified");
                 }
@@ -651,6 +652,9 @@ public class CmdFunctions extends CmdBase {
             if (StringUtils.isEmpty(functionConfig.getName())) {
                 org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig);
             }
+            if (StringUtils.isEmpty(functionConfig.getName())) {
+                throw new IllegalArgumentException("No Function name specified");
+            }
             if (StringUtils.isEmpty(functionConfig.getTenant())) {
                 org.apache.pulsar.common.functions.Utils.inferMissingTenant(functionConfig);
             }
@@ -958,13 +962,12 @@ public class CmdFunctions extends CmdBase {
 
         @Override
         protected void validateFunctionConfigs(FunctionConfig functionConfig) {
-            if (StringUtils.isEmpty(functionConfig.getClassName())) {
-                if (StringUtils.isEmpty(functionConfig.getName())) {
-                    throw new ParameterException("Function Name not provided");
-                }
-            } else if (StringUtils.isEmpty(functionConfig.getName())) {
+            if (StringUtils.isEmpty(functionConfig.getName())) {
                 org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig);
             }
+            if (StringUtils.isEmpty(functionConfig.getName())) {
+                throw new ParameterException("Function Name not provided");
+            }
             if (StringUtils.isEmpty(functionConfig.getTenant())) {
                 org.apache.pulsar.common.functions.Utils.inferMissingTenant(functionConfig);
             }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
index 2de38bbe02f..e4c9b9daeb5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
@@ -22,6 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import java.util.Arrays;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.packages.management.core.common.PackageType;
@@ -46,11 +47,13 @@ public class Utils {
     }
 
     public static void inferMissingFunctionName(FunctionConfig functionConfig) {
-        String[] domains = functionConfig.getClassName().split("\\.");
-        if (domains.length == 0) {
-            functionConfig.setName(functionConfig.getClassName());
-        } else {
-            functionConfig.setName(domains[domains.length - 1]);
+        if (!StringUtils.isEmpty(functionConfig.getClassName())) {
+            String[] domains = functionConfig.getClassName().split("\\.");
+            if (domains.length == 0) {
+                functionConfig.setName(functionConfig.getClassName());
+            } else {
+                functionConfig.setName(domains[domains.length - 1]);
+            }
         }
     }
 
diff --git a/pulsar-functions/java-examples-builtin/pom.xml b/pulsar-functions/java-examples-builtin/pom.xml
new file mode 100644
index 00000000000..7822aa9e83a
--- /dev/null
+++ b/pulsar-functions/java-examples-builtin/pom.xml
@@ -0,0 +1,50 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-functions</artifactId>
+    <version>2.11.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-functions-api-examples-builtin</artifactId>
+  <name>Pulsar Functions :: API Examples (NAR)</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-api-examples</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/pulsar-functions/java-examples-builtin/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-functions/java-examples-builtin/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 00000000000..32cc0fb8ac0
--- /dev/null
+++ b/pulsar-functions/java-examples-builtin/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: exclamation
+description: Exclamation function
+functionClass: org.apache.pulsar.functions.api.examples.ExclamationFunction
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 5b3422693b1..e83653100ad 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -48,7 +48,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Builder;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.io.SinkConfig;
@@ -75,11 +74,10 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
+import org.apache.pulsar.functions.utils.functions.FunctionArchive;
 import org.apache.pulsar.functions.utils.functions.FunctionUtils;
-import org.apache.pulsar.functions.utils.functions.Functions;
 import org.apache.pulsar.functions.utils.io.Connector;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-import org.apache.pulsar.functions.worker.WorkerConfig;
 
 @Slf4j
 public class LocalRunner implements AutoCloseable {
@@ -89,6 +87,7 @@ public class LocalRunner implements AutoCloseable {
     private final String narExtractionDirectory;
     private final File narExtractionDirectoryCreated;
     private final String connectorsDir;
+    private final String functionsDir;
     private final Thread shutdownHook;
     private final int instanceLivenessCheck;
     private ClassLoader userCodeClassLoader;
@@ -218,7 +217,8 @@ public class LocalRunner implements AutoCloseable {
                        boolean useTls, boolean tlsAllowInsecureConnection, boolean tlsHostNameVerificationEnabled,
                        String tlsTrustCertFilePath, int instanceIdOffset, RuntimeEnv runtimeEnv,
                        String secretsProviderClassName, String secretsProviderConfig, String narExtractionDirectory,
-                       String connectorsDirectory, Integer metricsPortStart, boolean exitOnError) {
+                       String connectorsDirectory, String functionsDirectory, Integer metricsPortStart,
+                       boolean exitOnError) {
         this.functionConfig = functionConfig;
         this.sourceConfig = sourceConfig;
         this.sinkConfig = sinkConfig;
@@ -242,15 +242,8 @@ public class LocalRunner implements AutoCloseable {
             this.narExtractionDirectoryCreated = createNarExtractionTempDirectory();
             this.narExtractionDirectory = this.narExtractionDirectoryCreated.getAbsolutePath();
         }
-        if (connectorsDirectory != null) {
-            this.connectorsDir = connectorsDirectory;
-        } else {
-            String pulsarHome = System.getenv("PULSAR_HOME");
-            if (pulsarHome == null) {
-                pulsarHome = Paths.get("").toAbsolutePath().toString();
-            }
-            this.connectorsDir = Paths.get(pulsarHome, "connectors").toString();
-        }
+        this.connectorsDir = connectorsDirectory != null ? connectorsDirectory : getPulsarDirectory("connectors");
+        this.functionsDir = functionsDirectory != null ? functionsDirectory : getPulsarDirectory("functions");
         this.metricsPortStart = metricsPortStart;
         this.exitOnError = exitOnError;
         this.instanceLivenessCheck = exitOnError ? 0 : 30000;
@@ -263,6 +256,14 @@ public class LocalRunner implements AutoCloseable {
         });
     }
 
+    private static String getPulsarDirectory(String directory) {
+        String pulsarHome = System.getenv("PULSAR_HOME");
+        if (pulsarHome == null) {
+            pulsarHome = Paths.get("").toAbsolutePath().toString();
+        }
+        return Paths.get(pulsarHome, directory).toString();
+    }
+
     private static File createNarExtractionTempDirectory() {
         try {
             return Files.createTempDirectory("pulsar_localrunner_nars_").toFile();
@@ -328,28 +329,23 @@ public class LocalRunner implements AutoCloseable {
                 throw new IllegalArgumentException("Pulsar Function local run already started!");
             }
             Runtime.getRuntime().addShutdownHook(shutdownHook);
-            Function.FunctionDetails functionDetails;
-            String userCodeFile = null;
+            Function.FunctionDetails functionDetails = null;
+            String userCodeFile;
             int parallelism;
             if (functionConfig != null) {
                 FunctionConfigUtils.inferMissingArguments(functionConfig, true);
                 parallelism = functionConfig.getParallelism();
                 if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
                     userCodeFile = functionConfig.getJar();
-
-                    boolean isBuiltin = !StringUtils.isEmpty(functionConfig.getJar())
-                            && functionConfig.getJar().startsWith(Utils.BUILTIN);
-                    if (isBuiltin) {
-                        WorkerConfig workerConfig =
-                                WorkerConfig.load(System.getenv("PULSAR_HOME") + "/conf/functions_worker.yml");
-                        Functions functions = FunctionUtils
-                                .searchForFunctions(System.getenv("PULSAR_HOME")
-                                        + workerConfig.getFunctionsDirectory().replaceFirst("^.", ""));
-                        String functionType = functionConfig.getJar().replaceFirst("^builtin://", "");
-                        userCodeFile = functions.getFunctions().get(functionType).toString();
-                    }
-
-                    if (Utils.isFunctionPackageUrlSupported(userCodeFile)) {
+                    ClassLoader builtInFunctionClassLoader = userCodeFile != null
+                            ? isBuiltInFunction(userCodeFile)
+                            : null;
+                    if (builtInFunctionClassLoader != null) {
+                        userCodeClassLoader = builtInFunctionClassLoader;
+                        functionDetails = FunctionConfigUtils.convert(
+                                functionConfig,
+                                FunctionConfigUtils.validateJavaFunction(functionConfig, builtInFunctionClassLoader));
+                    } else if (Utils.isFunctionPackageUrlSupported(userCodeFile)) {
                         File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
                         userCodeClassLoader = FunctionConfigUtils.validate(functionConfig, file);
                         userCodeClassLoaderCreated = true;
@@ -375,9 +371,11 @@ public class LocalRunner implements AutoCloseable {
                     throw new UnsupportedOperationException();
                 }
 
-                functionDetails = FunctionConfigUtils.convert(functionConfig,
-                        userCodeClassLoader != null ? userCodeClassLoader :
-                                Thread.currentThread().getContextClassLoader());
+                if (functionDetails == null) {
+                    functionDetails = FunctionConfigUtils.convert(functionConfig,
+                            userCodeClassLoader != null ? userCodeClassLoader :
+                                    Thread.currentThread().getContextClassLoader());
+                }
             } else if (sourceConfig != null) {
                 inferMissingArguments(sourceConfig);
                 userCodeFile = sourceConfig.getArchive();
@@ -590,7 +588,7 @@ public class LocalRunner implements AutoCloseable {
                 }
             }
         }, 30000, 30000);
-        java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> statusCheckTimer.cancel()));
+        java.lang.Runtime.getRuntime().addShutdownHook(new Thread(statusCheckTimer::cancel));
     }
 
 
@@ -679,6 +677,20 @@ public class LocalRunner implements AutoCloseable {
         }
     }
 
+    private ClassLoader isBuiltInFunction(String functionType) throws IOException {
+        // Validate the connector type from the locally available connectors
+        TreeMap<String, FunctionArchive> functions = getFunctions();
+
+        String functionName = functionType.replaceFirst("^builtin://", "");
+        FunctionArchive function = functions.get(functionName);
+        if (function != null && function.getFunctionDefinition().getFunctionClass() != null) {
+            // Function type is a valid built-in type.
+            return function.getClassLoader();
+        } else {
+            return null;
+        }
+    }
+
     private ClassLoader isBuiltInSource(String sourceType) throws IOException {
         // Validate the connector type from the locally available connectors
         TreeMap<String, Connector> connectors = getConnectors();
@@ -707,6 +719,10 @@ public class LocalRunner implements AutoCloseable {
         }
     }
 
+    private TreeMap<String, FunctionArchive> getFunctions() throws IOException {
+        return FunctionUtils.searchForFunctions(functionsDir);
+    }
+
     private TreeMap<String, Connector> getConnectors() throws IOException {
         return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory);
     }
diff --git a/pulsar-functions/pom.xml b/pulsar-functions/pom.xml
index 2bff9d42938..d60692163c8 100644
--- a/pulsar-functions/pom.xml
+++ b/pulsar-functions/pom.xml
@@ -45,6 +45,7 @@
           <module>proto</module>
           <module>api-java</module>
           <module>java-examples</module>
+          <module>java-examples-builtin</module>
           <module>utils</module>
           <module>instance</module>
           <module>runtime</module>
@@ -62,6 +63,7 @@
         <module>proto</module>
         <module>api-java</module>
         <module>java-examples</module>
+        <module>java-examples-builtin</module>
         <module>utils</module>
         <module>instance</module>
         <module>runtime</module>
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index d6f948a0fe7..02202ae7978 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -92,8 +92,12 @@ public class FunctionCommon {
 
     public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig, ClassLoader classLoader)
             throws ClassNotFoundException {
+        return getFunctionTypes(functionConfig, classLoader.loadClass(functionConfig.getClassName()));
+    }
+
+    public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig, Class functionClass)
+            throws ClassNotFoundException {
         boolean isWindowConfigPresent = functionConfig.getWindowConfig() != null;
-        Class functionClass = classLoader.loadClass(functionConfig.getClassName());
         return getFunctionTypes(functionClass, isWindowConfigPresent);
     }
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 99bca74ce90..08dddf3dbfa 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.MalformedURLException;
 import java.util.Collection;
@@ -37,6 +38,9 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -49,10 +53,20 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
 
 @Slf4j
 public class FunctionConfigUtils {
 
+    @Getter
+    @Setter
+    @AllArgsConstructor
+    public static class ExtractedFunctionDetails {
+        private String functionClassName;
+        private String typeArg0;
+        private String typeArg1;
+    }
+
     static final Integer MAX_PENDING_ASYNC_REQUESTS_DEFAULT = 1000;
     static final Boolean FORWARD_SOURCE_MESSAGE_PROPERTY_DEFAULT = Boolean.TRUE;
 
@@ -61,33 +75,42 @@ public class FunctionConfigUtils {
     public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
             throws IllegalArgumentException {
 
-        boolean isBuiltin =
-                !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar()
-                        .startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
-
-        Class<?>[] typeArgs = null;
         if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
             if (classLoader != null) {
                 try {
-                    typeArgs = FunctionCommon.getFunctionTypes(functionConfig, classLoader);
+                    Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(functionConfig, classLoader);
+                    return convert(
+                            functionConfig,
+                            new ExtractedFunctionDetails(
+                                    functionConfig.getClassName(),
+                                    typeArgs[0].getName(),
+                                    typeArgs[1].getName()));
                 } catch (ClassNotFoundException | NoClassDefFoundError e) {
                     throw new IllegalArgumentException(
                             String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
                 }
             }
         }
+        return convert(functionConfig, new ExtractedFunctionDetails(functionConfig.getClassName(), null, null));
+    }
+
+    public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFunctionDetails extractedDetails)
+             throws IllegalArgumentException {
+
+        boolean isBuiltin = !StringUtils.isEmpty(functionConfig.getJar())
+                && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
 
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
 
         // Setup source
         Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder();
         if (functionConfig.getInputs() != null) {
-            functionConfig.getInputs().forEach((topicName -> {
+            functionConfig.getInputs().forEach((topicName ->
                 sourceSpecBuilder.putInputSpecs(topicName,
                         Function.ConsumerSpec.newBuilder()
                                 .setIsRegexPattern(false)
-                                .build());
-            }));
+                                .build())
+            ));
         }
         if (functionConfig.getTopicsPattern() != null && !functionConfig.getTopicsPattern().isEmpty()) {
             sourceSpecBuilder.putInputSpecs(functionConfig.getTopicsPattern(),
@@ -166,7 +189,7 @@ public class FunctionConfigUtils {
 
         // Set subscription position
         if (functionConfig.getSubscriptionPosition() != null) {
-            Function.SubscriptionPosition subPosition = null;
+            Function.SubscriptionPosition subPosition;
             if (SubscriptionInitialPosition.Earliest == functionConfig.getSubscriptionPosition()) {
                 subPosition = Function.SubscriptionPosition.EARLIEST;
             } else {
@@ -175,8 +198,8 @@ public class FunctionConfigUtils {
             sourceSpecBuilder.setSubscriptionPosition(subPosition);
         }
 
-        if (typeArgs != null) {
-            sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
+        if (extractedDetails.getTypeArg0() != null) {
+            sourceSpecBuilder.setTypeClassName(extractedDetails.getTypeArg0());
         }
         if (functionConfig.getTimeoutMs() != null) {
             sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs());
@@ -217,8 +240,8 @@ public class FunctionConfigUtils {
                         String.format("Incorrect custom schema outputs,Topic %s ", functionConfig.getOutput()));
             }
         }
-        if (typeArgs != null) {
-            sinkSpecBuilder.setTypeClassName(typeArgs[1].getName());
+        if (extractedDetails.getTypeArg1() != null) {
+            sinkSpecBuilder.setTypeClassName(extractedDetails.getTypeArg1());
         }
         if (functionConfig.getProducerConfig() != null) {
             ProducerConfig producerConf = functionConfig.getProducerConfig();
@@ -285,14 +308,14 @@ public class FunctionConfigUtils {
         // windowing related
         WindowConfig windowConfig = functionConfig.getWindowConfig();
         if (windowConfig != null) {
-            windowConfig.setActualWindowFunctionClassName(functionConfig.getClassName());
+            windowConfig.setActualWindowFunctionClassName(extractedDetails.getFunctionClassName());
             configs.put(WindowConfig.WINDOW_CONFIG_KEY, windowConfig);
             // set class name to window function executor
             functionDetailsBuilder.setClassName("org.apache.pulsar.functions.windowing.WindowFunctionExecutor");
 
         } else {
-            if (functionConfig.getClassName() != null) {
-                functionDetailsBuilder.setClassName(functionConfig.getClassName());
+            if (extractedDetails.getFunctionClassName() != null) {
+                functionDetailsBuilder.setClassName(extractedDetails.getFunctionClassName());
             }
         }
         if (!configs.isEmpty()) {
@@ -517,10 +540,21 @@ public class FunctionConfigUtils {
         }
     }
 
-    private static void doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
+    private static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
 
+        String functionClassName = functionConfig.getClassName();
+        Class functionClass;
         try {
-            Class functionClass = clsLoader.loadClass(functionConfig.getClassName());
+            // if class name in function config is not set, this should be a built-in function
+            // thus we should try to find its class name in the NAR service definition
+            if (functionClassName == null) {
+                try {
+                    functionClassName = FunctionUtils.getFunctionClass(clsLoader);
+                } catch (IOException e) {
+                    throw new IllegalArgumentException("Failed to extract source class from archive", e);
+                }
+            }
+            functionClass = clsLoader.loadClass(functionClassName);
 
             if (!org.apache.pulsar.functions.api.Function.class.isAssignableFrom(functionClass)
                     && !java.util.function.Function.class.isAssignableFrom(functionClass)
@@ -536,7 +570,7 @@ public class FunctionConfigUtils {
 
         Class<?>[] typeArgs;
         try {
-            typeArgs = FunctionCommon.getFunctionTypes(functionConfig, clsLoader);
+            typeArgs = FunctionCommon.getFunctionTypes(functionConfig, functionClass);
         } catch (ClassNotFoundException | NoClassDefFoundError e) {
             throw new IllegalArgumentException(
                     String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
@@ -590,7 +624,10 @@ public class FunctionConfigUtils {
         }
 
         if (Void.class.equals(typeArgs[1])) {
-            return;
+            return new FunctionConfigUtils.ExtractedFunctionDetails(
+                    functionClassName,
+                    typeArgs[0].getName(),
+                    typeArgs[1].getName());
         }
 
         // One and only one of outputSchemaType and outputSerdeClassName should be set
@@ -612,6 +649,10 @@ public class FunctionConfigUtils {
             ValidatorUtils
                     .validateCryptoKeyReader(functionConfig.getProducerConfig().getCryptoConfig(), clsLoader, true);
         }
+        return new FunctionConfigUtils.ExtractedFunctionDetails(
+                functionClassName,
+                typeArgs[0].getName(),
+                typeArgs[1].getName());
     }
 
     private static void doPythonChecks(FunctionConfig functionConfig) {
@@ -660,6 +701,10 @@ public class FunctionConfigUtils {
         }
     }
 
+    private static boolean isBuiltin(FunctionConfig functionConfig) {
+        return functionConfig.getJar() != null && functionConfig.getJar().startsWith("builtin://");
+    }
+
     private static void doCommonChecks(FunctionConfig functionConfig) {
         if (isEmpty(functionConfig.getTenant())) {
             throw new IllegalArgumentException("Function tenant cannot be null");
@@ -672,7 +717,7 @@ public class FunctionConfigUtils {
         }
         // go doesn't need className
         if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON
-                || functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+                || (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA && !isBuiltin(functionConfig))) {
             if (isEmpty(functionConfig.getClassName())) {
                 throw new IllegalArgumentException("Function classname cannot be null");
             }
@@ -826,7 +871,7 @@ public class FunctionConfigUtils {
     public static ClassLoader validate(FunctionConfig functionConfig, File functionPackageFile) {
         doCommonChecks(functionConfig);
         if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
-            ClassLoader classLoader = null;
+            ClassLoader classLoader;
             if (functionPackageFile != null) {
                 try {
                     classLoader = loadJar(functionPackageFile);
@@ -860,9 +905,10 @@ public class FunctionConfigUtils {
         }
     }
 
-    public static void validateJavaFunction(FunctionConfig functionConfig, ClassLoader classLoader) {
+    public static ExtractedFunctionDetails validateJavaFunction(FunctionConfig functionConfig,
+                                                                ClassLoader classLoader) {
         doCommonChecks(functionConfig);
-        doJavaChecks(functionConfig, classLoader);
+        return doJavaChecks(functionConfig, classLoader);
     }
 
     public static FunctionConfig validateUpdate(FunctionConfig existingConfig, FunctionConfig newConfig) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java
similarity index 79%
rename from pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java
rename to pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java
index 206d1a52d55..8d621cc965f 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java
@@ -19,15 +19,14 @@
 package org.apache.pulsar.functions.utils.functions;
 
 import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import lombok.Builder;
 import lombok.Data;
 import org.apache.pulsar.common.functions.FunctionDefinition;
 
+@Builder
 @Data
-public class Functions {
-    final List<FunctionDefinition> functionsDefinitions = new ArrayList<>();
-    final Map<String, Path> functions = new TreeMap<>();
+public class FunctionArchive {
+    private Path archivePath;
+    private ClassLoader classLoader;
+    private FunctionDefinition functionDefinition;
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
index 68c1fdd9be4..8b70a4b6508 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
@@ -24,7 +24,7 @@ import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Collections;
+import java.util.TreeMap;
 import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -43,7 +43,7 @@ public class FunctionUtils {
     private static final String PULSAR_IO_SERVICE_NAME = "pulsar-io.yaml";
 
     /**
-     * Extract the Pulsar Function class from a functionctor archive.
+     * Extract the Pulsar Function class from a function or archive.
      */
     public static String getFunctionClass(ClassLoader classLoader) throws IOException {
         NarClassLoader ncl = (NarClassLoader) classLoader;
@@ -71,24 +71,21 @@ public class FunctionUtils {
         return conf.getFunctionClass();
     }
 
-    public static FunctionDefinition getFunctionDefinition(String narPath) throws IOException {
-        try (NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                .narFile(new File(narPath))
-                .build();) {
-            String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
-            return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, FunctionDefinition.class);
-        }
+    public static FunctionDefinition getFunctionDefinition(NarClassLoader narClassLoader) throws IOException {
+        String configStr = narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+        return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, FunctionDefinition.class);
     }
-    public static Functions searchForFunctions(String functionsDirectory) throws IOException {
+
+    public static TreeMap<String, FunctionArchive> searchForFunctions(String functionsDirectory) throws IOException {
         return searchForFunctions(functionsDirectory, false);
     }
 
-    public static Functions searchForFunctions(String functionsDirectory, boolean alwaysPopulatePath)
-            throws IOException {
+    public static TreeMap<String, FunctionArchive> searchForFunctions(String functionsDirectory,
+                                                                      boolean alwaysPopulatePath) throws IOException {
         Path path = Paths.get(functionsDirectory).toAbsolutePath();
         log.info("Searching for functions in {}", path);
 
-        Functions functions = new Functions();
+        TreeMap<String, FunctionArchive> functions = new TreeMap<>();
 
         if (!path.toFile().exists()) {
             log.warn("Functions archive directory not found");
@@ -98,24 +95,29 @@ public class FunctionUtils {
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
                 try {
-                    FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(archive.toString());
+
+                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
+                            .narFile(new File(archive.toString()))
+                            .build();
+
+                    FunctionArchive.FunctionArchiveBuilder functionArchiveBuilder = FunctionArchive.builder();
+                    FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(ncl);
                     log.info("Found function {} from {}", cntDef, archive);
-                    log.error(cntDef.getName());
-                    log.error(cntDef.getFunctionClass());
+
+                    functionArchiveBuilder.archivePath(archive);
+
+                    functionArchiveBuilder.classLoader(ncl);
+                    functionArchiveBuilder.functionDefinition(cntDef);
+
                     if (alwaysPopulatePath || !StringUtils.isEmpty(cntDef.getFunctionClass())) {
-                        functions.functions.put(cntDef.getName(), archive);
+                        functions.put(cntDef.getName(), functionArchiveBuilder.build());
                     }
-
-                    functions.functionsDefinitions.add(cntDef);
                 } catch (Throwable t) {
                     log.warn("Failed to load function from {}", archive, t);
                 }
             }
         }
 
-        Collections.sort(functions.functionsDefinitions,
-                (c1, c2) -> String.CASE_INSENSITIVE_ORDER.compare(c1.getName(), c2.getName()));
-
         return functions;
     }
 }
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index a6ebde14983..a2ee418a11c 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -84,7 +84,7 @@ public class FunctionConfigUtilsTest {
         producerConfig.setUseThreadLocalProducers(true);
         producerConfig.setBatchBuilder("DEFAULT");
         functionConfig.setProducerConfig(producerConfig);
-        Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
+        Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
         FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
 
         // add default resources
@@ -125,7 +125,7 @@ public class FunctionConfigUtilsTest {
         producerConfig.setUseThreadLocalProducers(true);
         producerConfig.setBatchBuilder("KEY_BASED");
         functionConfig.setProducerConfig(producerConfig);
-        Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
+        Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
         FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
 
         // add default resources
@@ -584,7 +584,7 @@ public class FunctionConfigUtilsTest {
     @Test
     public void testPoolMessages() {
         FunctionConfig functionConfig = createFunctionConfig();
-        Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
+        Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
         assertFalse(functionDetails.getSource().getInputSpecsMap().get("test-input").getPoolMessages());
         FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
         assertFalse(convertedConfig.getInputSpecs().get("test-input").isPoolMessages());
@@ -594,7 +594,7 @@ public class FunctionConfigUtilsTest {
                 .poolMessages(true).build());
         functionConfig.setInputSpecs(inputSpecs);
 
-        functionDetails = FunctionConfigUtils.convert(functionConfig, null);
+        functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
         assertTrue(functionDetails.getSource().getInputSpecsMap().get("test-input").getPoolMessages());
 
         convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
index f643915c2b3..0f1c0fcc835 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
@@ -20,27 +20,26 @@ package org.apache.pulsar.functions.worker;
 
 import java.io.IOException;
 import java.nio.file.Path;
-import java.util.List;
+import java.util.TreeMap;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.functions.FunctionDefinition;
+import org.apache.pulsar.functions.utils.functions.FunctionArchive;
 import org.apache.pulsar.functions.utils.functions.FunctionUtils;
-import org.apache.pulsar.functions.utils.functions.Functions;
 
 @Slf4j
 public class FunctionsManager {
 
-    private Functions functions;
+    private TreeMap<String, FunctionArchive> functions;
 
     public FunctionsManager(WorkerConfig workerConfig) throws IOException {
         this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
     }
 
-    public List<FunctionDefinition> getFunctions() {
-        return functions.getFunctionsDefinitions();
+    public FunctionArchive getFunction(String functionType) {
+        return functions.get(functionType);
     }
 
     public Path getFunctionArchive(String functionType) {
-        return functions.getFunctions().get(functionType);
+        return functions.get(functionType).getArchivePath();
     }
 
     public void reloadFunctions(WorkerConfig workerConfig) throws IOException {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 801bf520911..e1103214b32 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -46,6 +46,7 @@ import java.util.Base64;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -99,8 +100,8 @@ import org.apache.pulsar.functions.utils.ComponentTypeUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
+import org.apache.pulsar.functions.utils.functions.FunctionArchive;
 import org.apache.pulsar.functions.utils.functions.FunctionUtils;
-import org.apache.pulsar.functions.utils.functions.Functions;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -1457,8 +1458,9 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
                 String sType = pkgPath.replaceFirst("^builtin://", "");
                 final String connectorsDir = worker().getWorkerConfig().getConnectorsDirectory();
                 log.warn("Processing package {} ; looking at the dir {}", pkgPath, connectorsDir);
-                Functions sinksOrSources = FunctionUtils.searchForFunctions(connectorsDir, true);
-                Path narPath = sinksOrSources.getFunctions().get(sType);
+                TreeMap<String, FunctionArchive> sinksOrSources =
+                        FunctionUtils.searchForFunctions(connectorsDir, true);
+                Path narPath = sinksOrSources.get(sType).getArchivePath();
                 if (narPath == null) {
                     throw new IllegalStateException("Didn't find " + pkgPath + " in " + connectorsDir);
                 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index bc14f7deb9e..4a765cc4056 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -57,7 +58,9 @@ import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.ComponentTypeUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.utils.functions.FunctionArchive;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.FunctionsManager;
 import org.apache.pulsar.functions.worker.PulsarWorkerService;
 import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.service.api.Functions;
@@ -118,7 +121,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
             if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
                 String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant,
                         worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
-                if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
+                if (!namespaces.contains(qualifiedNamespaceWithCluster)) {
                     log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, functionName, namespace);
                     throw new RestException(Response.Status.BAD_REQUEST, "Namespace does not exist");
                 }
@@ -144,7 +147,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
                     ComponentTypeUtils.toString(componentType), functionName));
         }
 
-        Function.FunctionDetails functionDetails = null;
+        Function.FunctionDetails functionDetails;
         boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
         File componentPackageFile = null;
         try {
@@ -329,7 +332,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
             throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
         }
 
-        Function.FunctionDetails functionDetails = null;
+        Function.FunctionDetails functionDetails;
         File componentPackageFile = null;
         try {
 
@@ -775,35 +778,39 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
                                                                  final String namespace,
                                                                  final String componentName,
                                                                  final FunctionConfig functionConfig,
-                                                                 final File componentPackageFile) throws IOException {
+                                                                 final File componentPackageFile) {
 
         // The rest end points take precedence over whatever is there in function config
-        Path archivePath = null;
         functionConfig.setTenant(tenant);
         functionConfig.setNamespace(namespace);
         functionConfig.setName(componentName);
         FunctionConfigUtils.inferMissingArguments(
                 functionConfig, worker().getWorkerConfig().isForwardSourceMessageProperty());
 
-        if (!StringUtils.isEmpty(functionConfig.getJar())) {
-            String builtinArchive = functionConfig.getJar();
-            if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
-                builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
-            }
-            try {
-                archivePath = this.worker().getFunctionsManager().getFunctionArchive(builtinArchive);
-            } catch (Exception e) {
-                throw new IllegalArgumentException(String.format("No Function archive %s found", archivePath));
+        String archive = functionConfig.getJar();
+        if (!StringUtils.isEmpty(archive)) {
+            if (archive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+                archive = archive.replaceFirst("^builtin://", "");
+
+                FunctionsManager functionsManager = this.worker().getFunctionsManager();
+                FunctionArchive function = functionsManager.getFunction(archive);
+
+                // check if builtin function exists
+                if (function == null) {
+                    throw new IllegalArgumentException(String.format("No Function %s found", archive));
+                }
+                return FunctionConfigUtils.convert(
+                        functionConfig,
+                        FunctionConfigUtils.validateJavaFunction(functionConfig, function.getClassLoader()));
             }
         }
         ClassLoader clsLoader = null;
-        if (archivePath != null) {
-            clsLoader = FunctionConfigUtils.validate(functionConfig, archivePath.toFile());
-        } else {
+        try {
             clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
+            return FunctionConfigUtils.convert(functionConfig, clsLoader);
+        } finally {
+            ClassLoaderUtils.closeClassLoader(clsLoader);
         }
-        return FunctionConfigUtils.convert(functionConfig, clsLoader);
-
     }
 
     private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 3a98416fe81..4b7e9940230 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -373,6 +373,6 @@ public class FunctionsImplTest {
 
     public static Function.FunctionDetails createDefaultFunctionDetails() {
         FunctionConfig functionConfig = createDefaultFunctionConfig();
-        return FunctionConfigUtils.convert(functionConfig, null);
+        return FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
     }
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 2bad2bd266e..ca405c5736a 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -539,7 +539,7 @@ public class FunctionApiV2ResourceTest {
                     inputStream,
                     details,
                     functionPkgUrl,
-                    JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
+                    JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, (ClassLoader) null)),
                     null);
         } catch (InvalidProtocolBufferException e) {
             throw new RuntimeException(e);
@@ -557,7 +557,7 @@ public class FunctionApiV2ResourceTest {
                     mockedInputStream,
                     mockedFormData,
                     null,
-                    JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
+                    JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, (ClassLoader) null)),
                     null);
         } catch (InvalidProtocolBufferException e) {
             throw new RuntimeException(e);
@@ -940,7 +940,7 @@ public class FunctionApiV2ResourceTest {
                     inputStream,
                     details,
                     null,
-                    JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
+                    JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, (ClassLoader) null)),
                     null);
         } catch (InvalidProtocolBufferException e) {
             throw new RuntimeException(e);
@@ -968,7 +968,7 @@ public class FunctionApiV2ResourceTest {
                     mockedInputStream,
                     mockedFormData,
                     null,
-                    JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
+                    JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, (ClassLoader) null)),
                     null);
         } catch (InvalidProtocolBufferException e) {
             throw new RuntimeException(e);
@@ -1045,7 +1045,7 @@ public class FunctionApiV2ResourceTest {
                     null,
                     null,
                     filePackageUrl,
-                    JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
+                    JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, (ClassLoader) null)),
                     null);
         } catch (InvalidProtocolBufferException e) {
             throw new RuntimeException(e);
@@ -1437,7 +1437,7 @@ public class FunctionApiV2ResourceTest {
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
         try {
             resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
-                    JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)), null);
+                    JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, (ClassLoader) null)), null);
         } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException(e);
         }
@@ -1471,7 +1471,7 @@ public class FunctionApiV2ResourceTest {
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
         try {
             resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl,
-                    JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)), null);
+                    JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, (ClassLoader) null)), null);
         } catch (InvalidProtocolBufferException e) {
             throw new RuntimeException(e);
         }
@@ -1493,6 +1493,6 @@ public class FunctionApiV2ResourceTest {
 
     public static FunctionDetails createDefaultFunctionDetails() {
         FunctionConfig functionConfig = createDefaultFunctionConfig();
-        return FunctionConfigUtils.convert(functionConfig, null);
+        return FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
     }
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index fae1ae6d975..288001a2964 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -37,7 +37,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URL;
-import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -74,6 +73,7 @@ import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.utils.functions.FunctionArchive;
 import org.apache.pulsar.functions.utils.functions.FunctionUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -1529,14 +1529,12 @@ public class FunctionApiV3ResourceTest {
         when(worker.getWorkerConfig()).thenReturn(config);
         FunctionsImpl function = new FunctionsImpl(() -> worker);
 
-        Map<String, Path> functionsMap = new TreeMap<>();
-        functionsMap.put("cassandra", file.toPath());
-        org.apache.pulsar.functions.utils.functions.Functions mockedFunctions =
-                mock(org.apache.pulsar.functions.utils.functions.Functions.class);
-        when(mockedFunctions.getFunctions()).thenReturn(functionsMap);
+        TreeMap<String, FunctionArchive> functions = new TreeMap<>();
+        FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
+        functions.put("cassandra", functionArchive);
 
         mockStatic(FunctionUtils.class, ctx -> {
-            ctx.when(() -> FunctionUtils.searchForFunctions(anyString(), anyBoolean())).thenReturn(mockedFunctions);
+            ctx.when(() -> FunctionUtils.searchForFunctions(anyString(), anyBoolean())).thenReturn(functions);
 
         });
 
@@ -1648,6 +1646,6 @@ public class FunctionApiV3ResourceTest {
 
     public static FunctionDetails createDefaultFunctionDetails() {
         FunctionConfig functionConfig = createDefaultFunctionConfig();
-        return FunctionConfigUtils.convert(functionConfig, null);
+        return FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
     }
 }