You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/05 00:08:42 UTC

[pulsar] branch master updated: [#6003][pulsar-functions] Possibility to add builtin Functions (#6895)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b0098a  [#6003][pulsar-functions] Possibility to add builtin Functions (#6895)
9b0098a is described below

commit 9b0098a13ac67d5e342b9b6b15db1f680fc1d6d6
Author: oncode <in...@oncode.it>
AuthorDate: Fri Jun 5 02:08:25 2020 +0200

    [#6003][pulsar-functions] Possibility to add builtin Functions (#6895)
    
    Master Issue: #6003
    
    ### Motivation
    
    This pull request implements the possibility to add builtin functions (in the same way of the build in connectors).
    
    The builtin function must include a `pulsar-io.yml` file with the following content
    
    ```yml
    name: <function-name>
    description: <function-desciption>
    functionClass: <function-class>
    ```
    
    e.g.
    ```yml
    name: test-function
    description: test function description
    functionClass: it.oncode.pulsar.functions.TestFunction
    ```
    
    it is possible to create a builtin function in the same way of the builtin sinks/sources.
    
    Example in scala
    ```scala
    val functionConfigBuilder: FunctionConfigBuilder = FunctionConfig.builder()
        val function =
          functionConfigBuilder
            .tenant("public")
            .namespace("default")
            .jar("builtin://test-function")
            .name("test-function-name")
            .className("it.oncode.pulsar.functions.TestFunction")
            .inputs(Seq("channel_in").asJava)
            .output("channel_out")
            .runtime(FunctionConfig.Runtime.JAVA)
            .build()
    
    Pulsar.admin.functions
          .createFunction(function, null)
    
    ```
    
    Function folder to be specified in the `conf/functions_worker.yml` conf file
    
    e.g.
    `functionsDirectory: ./functions`
    
    Function package must be in `*.nar` format like for source/sink connectors
    
    ### Modifications
    
    I modified the `pulsar-function-utils`, `pulsar-functions-worker` and `pulsar-common` modules on the basis of the built in connectors implementation.
    Also `Function.proto` has been modified in order to include the `builtin` property
    
    #### What this MR does not include
    
    - modification of pulsar-admin to fetch the available buildin functions
    - the related documentation
    
    This is a feature that is critical for us, I think we could open an issue for the remaining points and consider to merge this PR.
---
 conf/functions_worker.yml                          |   1 +
 .../common/functions/FunctionDefinition.java       |  47 +++++++++
 .../apache/pulsar/common/nar/NarClassLoader.java   |  12 +--
 pulsar-functions/localrun-shaded/pom.xml           |  24 +++--
 .../org/apache/pulsar/functions/LocalRunner.java   |  21 +++-
 .../proto/src/main/proto/Function.proto            |   3 +
 .../pulsar/functions/worker/WorkerConfig.java      |   7 ++
 .../functions/utils/FunctionConfigUtils.java       |  17 +--
 .../functions/utils/functions/FunctionUtils.java   | 115 +++++++++++++++++++++
 .../functions/utils/functions/Functions.java       |  35 +++++++
 .../pulsar/functions/worker/FunctionActioner.java  |   8 +-
 .../functions/worker/FunctionRuntimeManager.java   |   4 +-
 .../pulsar/functions/worker/FunctionsManager.java  |  49 +++++++++
 .../pulsar/functions/worker/WorkerService.java     |   4 +-
 .../pulsar/functions/worker/WorkerUtils.java       |   4 +
 .../functions/worker/rest/api/FunctionsImpl.java   |  23 ++++-
 .../functions/worker/FunctionActionerTest.java     |   6 +-
 .../worker/FunctionRuntimeManagerTest.java         |  15 ++-
 .../functions/worker/MembershipManagerTest.java    |   4 +
 19 files changed, 364 insertions(+), 35 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index a2071f1..4ecf1e0 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -214,3 +214,4 @@ brokerClientTrustCertsFilePath:
 ########################
 
 connectorsDirectory: ./connectors
+functionsDirectory: ./functions
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java
new file mode 100644
index 0000000..44163cb
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.functions;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Basic information about a Pulsar function.
+ */
+@Data
+@NoArgsConstructor
+public class FunctionDefinition {
+
+    /**
+     * The name of the function type.
+     */
+    private String name;
+
+    /**
+     * Description to be used for user help.
+     */
+    private String description;
+
+    /**
+     * The class name for the function implementation.
+     *
+     * <p>If not defined, it will be assumed this function cannot act as a data.
+     */
+    private String functionClass;
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index e33cbfa..bc6af79 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -139,12 +139,12 @@ public class NarClassLoader extends URLClassLoader {
 
     public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars,
                                                 String narExtractionDirectory) throws IOException {
-        File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
-        try {
-            return new NarClassLoader(unpacked, additionalJars, NarClassLoader.class.getClassLoader());
-        } catch (ClassNotFoundException | NoClassDefFoundError e) {
-            throw new IOException(e);
-        }
+        return  NarClassLoader.getFromArchive(narPath, additionalJars, NarClassLoader.class.getClassLoader(),
+                                                NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);
+    }
+
+    public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars) throws IOException {
+        return NarClassLoader.getFromArchive(narPath, additionalJars, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);
     }
 
     public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars, ClassLoader parent,
diff --git a/pulsar-functions/localrun-shaded/pom.xml b/pulsar-functions/localrun-shaded/pom.xml
index fce2f24..f27c4b5 100644
--- a/pulsar-functions/localrun-shaded/pom.xml
+++ b/pulsar-functions/localrun-shaded/pom.xml
@@ -20,7 +20,7 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -71,10 +71,11 @@
                                 </filter>
                             </filters>
                             <relocations>
-                                <relocation>
+                                <!-- Netty cannot be shaded, this is causing java.lang.NoSuchMethodError -->
+                                <!-- <relocation>
                                     <pattern>com.typesafe.netty</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.typesafe.netty</shadedPattern>
-                                </relocation>
+                                </relocation> -->
                                 <relocation>
                                     <pattern>com.google</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.google</shadedPattern>
@@ -179,10 +180,11 @@
                                     <pattern>org.apache.distributedlog</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog</shadedPattern>
                                 </relocation>
-                                <relocation>
+                                <!-- Jackson cannot be shaded, this is causing java.lang.NoSuchMethodError when calling getThreadLocalYaml-->
+                                <!-- <relocation>
                                     <pattern>com.fasterxml</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.fasterxml</shadedPattern>
-                                </relocation>
+                                </relocation> -->
                                 <relocation>
                                     <pattern>org.inferred</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.inferred</shadedPattern>
@@ -199,10 +201,11 @@
                                     <pattern>dlshade</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.dlshade</shadedPattern>
                                 </relocation>
-                                <relocation>
+                                <!-- This refers to an older version of Jackson -->
+                                <!-- <relocation>
                                     <pattern>org.codehaus.jackson</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.jackson</shadedPattern>
-                                </relocation>
+                                </relocation> -->
                                 <relocation>
                                     <pattern>net.java.dev.jna</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.java.dev.jna</shadedPattern>
@@ -315,10 +318,11 @@
                                     <pattern>com.beust</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.beust</shadedPattern>
                                 </relocation>
-                                <relocation>
+                                <!-- Netty cannot be shaded, this is causing java.lang.NoSuchMethodError -->
+                                <!-- <relocation>
                                     <pattern>io.netty</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.netty</shadedPattern>
-                                </relocation>
+                                </relocation> -->
                                 <relocation>
                                     <pattern>org.hamcrest</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.hamcrest</shadedPattern>
@@ -357,4 +361,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
+</project>
\ No newline at end of file
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 8229de2..2eb6c97 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -44,8 +44,11 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.functions.Functions;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
+import org.apache.pulsar.functions.worker.WorkerConfig;
 
 import java.io.File;
 import java.io.IOException;
@@ -212,6 +215,14 @@ public class LocalRunner {
                                 .getProtectionDomain().getCodeSource().getLocation().getFile();
                     }
 
+                    boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
+                    if(isBuiltin){
+                        WorkerConfig workerConfig = WorkerConfig.load(System.getenv("PULSAR_HOME") + "/conf/functions_worker.yml");
+                        Functions functions = FunctionUtils.searchForFunctions(System.getenv("PULSAR_HOME") + workerConfig.getFunctionsDirectory().replaceFirst("^.", ""));
+                        String functionType = functionConfig.getJar().replaceFirst("^builtin://", "");
+                        userCodeFile = functions.getFunctions().get(functionType).toString();
+                    }
+                     
                     if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
                         File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
                         classLoader = FunctionConfigUtils.validate(functionConfig, file);
@@ -440,9 +451,10 @@ public class LocalRunner {
         // Validate the connector source type from the locally available connectors
         Connectors connectors = getConnectors();
 
-        if (connectors.getSources().containsKey(sourceType)) {
+        String source = sourceType.replaceFirst("^builtin://", "");
+        if (connectors.getSources().containsKey(source)) {
             // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
-            return connectors.getSources().get(sourceType).toString();
+            return connectors.getSources().get(source).toString();
         } else {
             return null;
         }
@@ -452,9 +464,10 @@ public class LocalRunner {
         // Validate the connector source type from the locally available connectors
         Connectors connectors = getConnectors();
 
-        if (connectors.getSinks().containsKey(sinkType)) {
+        String sink = sinkType.replaceFirst("^builtin://", "");
+        if (connectors.getSinks().containsKey(sink)) {
             // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
-            return connectors.getSinks().get(sinkType).toString();
+            return connectors.getSinks().get(sink).toString();
         } else {
             return null;
         }
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 97215bd..f01d63c 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -81,6 +81,9 @@ message FunctionDetails {
     string runtimeFlags = 17;
     ComponentType componentType = 18;
     string customRuntimeOptions = 19;
+    /* If specified, this will refer to an archive that is
+     * already present in the server */
+    string builtin = 20;
 }
 
 message ConsumerSpec {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index b1cf7cb..8e256a6 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -79,6 +79,8 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     private static final String CATEGORY_STATE = "State Management";
     @Category
     private static final String CATEGORY_CONNECTORS = "Connectors";
+    @Category
+    private static final String CATEGORY_FUNCTIONS = "Functions";
 
     @FieldContext(
         category = CATEGORY_WORKER,
@@ -146,6 +148,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     )
     private Boolean validateConnectorConfig = false;
     @FieldContext(
+        category = CATEGORY_FUNCTIONS,
+        doc = "The path to the location to locate builtin functions"
+    )
+    private String functionsDirectory = "./functions";
+    @FieldContext(
         category = CATEGORY_FUNC_METADATA_MNG,
         doc = "The pulsar topic used for storing function metadata"
     )
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 4e8f2d9..c6615e1 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -54,6 +54,8 @@ public class FunctionConfigUtils {
 	
     public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
             throws IllegalArgumentException {
+        
+        boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
 
         Class<?>[] typeArgs = null;
         if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
@@ -249,6 +251,11 @@ public class FunctionConfigUtils {
             functionDetailsBuilder.setCustomRuntimeOptions(functionConfig.getCustomRuntimeOptions());
         }
 
+        if (isBuiltin) {
+            String builtin = functionConfig.getJar().replaceFirst("^builtin://", "");
+            functionDetailsBuilder.setBuiltin(builtin);
+        }
+
         return functionDetailsBuilder.build();
     }
 
@@ -596,12 +603,6 @@ public class FunctionConfigUtils {
             throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity");
         }
 
-        if (!isEmpty(functionConfig.getJar()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getJar())
-                && functionConfig.getJar().startsWith(BUILTIN)) {
-            if (!new File(functionConfig.getJar()).exists()) {
-                throw new IllegalArgumentException("The supplied jar file does not exist");
-            }
-        }
         if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getPy())
                 && functionConfig.getPy().startsWith(BUILTIN)) {
             if (!new File(functionConfig.getPy()).exists()) {
@@ -698,6 +699,10 @@ public class FunctionConfigUtils {
             mergedConfig.setClassName(newConfig.getClassName());
         }
 
+        if (!StringUtils.isEmpty(newConfig.getJar())) {
+            mergedConfig.setJar(newConfig.getJar());
+        }
+
         if (newConfig.getInputSpecs() == null) {
             newConfig.setInputSpecs(new HashMap<>());
         }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
new file mode 100644
index 0000000..1551421
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.functions;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.functions.FunctionDefinition;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.utils.Exceptions;
+import org.apache.pulsar.functions.api.Function;
+
+
+@UtilityClass
+@Slf4j
+public class FunctionUtils {
+
+    private static final String PULSAR_IO_SERVICE_NAME = "pulsar-io.yaml";
+
+    /**
+     * Extract the Pulsar Function class from a functionctor archive.
+     */
+    public static String getFunctionClass(ClassLoader classLoader) throws IOException {
+        NarClassLoader ncl = (NarClassLoader) classLoader;
+        String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+
+        FunctionDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
+        FunctionDefinition.class);
+        if (StringUtils.isEmpty(conf.getFunctionClass())) {
+            throw new IOException(
+                    String.format("The '%s' functionctor does not provide a function implementation", conf.getName()));
+        }
+
+        try {
+            // Try to load source class and check it implements Function interface
+            Class functionClass = ncl.loadClass(conf.getFunctionClass());
+            if (!(Function.class.isAssignableFrom(functionClass))) {
+                throw new IOException(
+                        "Class " + conf.getFunctionClass() + " does not implement interface " + Function.class.getName());
+            }
+        } catch (Throwable t) {
+            Exceptions.rethrowIOException(t);
+        }
+
+        return conf.getFunctionClass();
+    }
+
+    public static FunctionDefinition getFunctionDefinition(String narPath) throws IOException {
+        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
+            String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+            return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, FunctionDefinition.class);
+        }
+    }
+
+    public static Functions searchForFunctions(String functionsDirectory) throws IOException {
+        Path path = Paths.get(functionsDirectory).toAbsolutePath();
+        log.info("Searching for functions in {}", path);
+
+        Functions functions = new Functions();
+
+        if (!path.toFile().exists()) {
+            log.warn("Functions archive directory not found");
+            return functions;
+        }
+
+        try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
+            for (Path archive : stream) {
+                try {
+                    FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(archive.toString());
+                    log.info("Found function {} from {}", cntDef, archive);
+                    log.error(cntDef.getName());
+                    log.error(cntDef.getFunctionClass());
+                    if (!StringUtils.isEmpty(cntDef.getFunctionClass())) {
+                        functions.functions.put(cntDef.getName(), archive);
+                    }
+
+                    functions.functionsDefinitions.add(cntDef);
+                } catch (Throwable t) {
+                    log.warn("Failed to load function from {}", archive, t);
+                }
+            }
+        }
+
+        Collections.sort(functions.functionsDefinitions,
+                (c1, c2) -> String.CASE_INSENSITIVE_ORDER.compare(c1.getName(), c2.getName()));
+
+        return functions;
+    }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java
new file mode 100644
index 0000000..a7538a8
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.functions;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import lombok.Data;
+
+import org.apache.pulsar.common.functions.FunctionDefinition;
+
+@Data
+public class Functions {
+    final List<FunctionDefinition> functionsDefinitions = new ArrayList<>();
+    final Map<String, Path> functions = new TreeMap<>();
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index ad93ba2..ccf00d5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -78,16 +78,18 @@ public class FunctionActioner {
     private final RuntimeFactory runtimeFactory;
     private final Namespace dlogNamespace;
     private final ConnectorsManager connectorsManager;
+    private final FunctionsManager functionsManager;
     private final PulsarAdmin pulsarAdmin;
 
     public FunctionActioner(WorkerConfig workerConfig,
                             RuntimeFactory runtimeFactory,
                             Namespace dlogNamespace,
-                            ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin) {
+                            ConnectorsManager connectorsManager,FunctionsManager functionsManager,PulsarAdmin pulsarAdmin) {
         this.workerConfig = workerConfig;
         this.runtimeFactory = runtimeFactory;
         this.dlogNamespace = dlogNamespace;
         this.connectorsManager = connectorsManager;
+        this.functionsManager = functionsManager;
         this.pulsarAdmin = pulsarAdmin;
     }
 
@@ -439,6 +441,10 @@ public class FunctionActioner {
             }
         }
 
+        if (!StringUtils.isEmpty(functionDetails.getBuiltin())) {
+            return functionsManager.getFunctionArchive(functionDetails.getBuiltin()).toFile();
+        }
+
         throw new IOException("Could not find built in archive definition");
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 5ac73d9..075146e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -131,7 +131,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
 
 
     public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace,
-                                  MembershipManager membershipManager, ConnectorsManager connectorsManager,
+                                  MembershipManager membershipManager, ConnectorsManager connectorsManager, FunctionsManager functionsManager,
                                   FunctionMetaDataManager functionMetaDataManager) throws Exception {
         this.workerConfig = workerConfig;
         this.workerService = workerService;
@@ -196,7 +196,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
         this.runtimeFactory.initialize(workerConfig, authConfig, secretsProviderConfigurator, functionAuthProvider, runtimeCustomizer);
 
         this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
-                dlogNamespace, connectorsManager, workerService.getBrokerAdmin());
+                dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin());
 
         this.membershipManager = membershipManager;
         this.functionMetaDataManager = functionMetaDataManager;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
new file mode 100644
index 0000000..427cc1e
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.common.functions.FunctionDefinition;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.functions.Functions;
+@Slf4j
+public class FunctionsManager {
+
+    private Functions functions;
+
+    public FunctionsManager(WorkerConfig workerConfig) throws IOException {
+        this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
+    }
+
+    public List<FunctionDefinition> getFunctions() {
+        return functions.getFunctionsDefinitions();
+    }
+
+    public Path getFunctionArchive(String functionType) {
+        return functions.getFunctions().get(functionType);
+    }
+
+    public void reloadFunctions(WorkerConfig workerConfig) throws IOException {
+        this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
+    }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 6fe7500..edaade6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -66,6 +66,7 @@ public class WorkerService {
     private AuthenticationService authenticationService;
     private AuthorizationService authorizationService;
     private ConnectorsManager connectorsManager;
+    private FunctionsManager functionsManager;
     private PulsarAdmin brokerAdmin;
     private PulsarAdmin functionAdmin;
     private final MetricsGenerator metricsGenerator;
@@ -170,6 +171,7 @@ public class WorkerService {
                     this.workerConfig, this.schedulerManager, this.client);
 
             this.connectorsManager = new ConnectorsManager(workerConfig);
+            this.functionsManager = new FunctionsManager(workerConfig);
 
             //create membership manager
             String coordinationTopic = workerConfig.getClusterCoordinationTopic();
@@ -180,7 +182,7 @@ public class WorkerService {
 
             // create function runtime manager
             this.functionRuntimeManager = new FunctionRuntimeManager(
-                    this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionMetaDataManager);
+                this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionsManager, functionMetaDataManager);
 
             // Setting references to managers in scheduler
             this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index a4e46a8..5cad320 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -309,6 +309,10 @@ public final class WorkerUtils {
             }
         }
 
+        if (!StringUtils.isEmpty(functionDetails.getBuiltin())) {
+            return true;
+        }
+
         return false;
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index d806c69..3ed7f38 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.WorkerUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
 import javax.ws.rs.WebApplicationException;
@@ -48,6 +49,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.nio.file.Path;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -651,11 +653,30 @@ public class FunctionsImpl extends ComponentImpl {
                                                                  final File componentPackageFile) throws IOException {
 
         // The rest end points take precedence over whatever is there in function config
+        Path archivePath = null;
         functionConfig.setTenant(tenant);
         functionConfig.setNamespace(namespace);
         functionConfig.setName(componentName);
         FunctionConfigUtils.inferMissingArguments(functionConfig);
-        ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
+
+        if (!StringUtils.isEmpty(functionConfig.getJar())) {
+            String builtinArchive = functionConfig.getJar();
+            if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+                builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
+            }
+            try {
+                archivePath = this.worker().getFunctionsManager().getFunctionArchive(builtinArchive);
+            } catch (Exception e) {
+                throw new IllegalArgumentException(String.format("No Function archive %s found", archivePath));
+            }
+        }
+        ClassLoader clsLoader  = null;
+        if(archivePath != null){
+            clsLoader = FunctionConfigUtils.validate(functionConfig, archivePath.toFile());
+        }
+        else{
+            clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
+        }
         return FunctionConfigUtils.convert(functionConfig, clsLoader);
 
     }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index e066404..dc49036 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -78,7 +78,7 @@ public class FunctionActionerTest {
 
         @SuppressWarnings("resource")
         FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
-                new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
+                new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class));
         Runtime runtime = mock(Runtime.class);
         Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
@@ -124,7 +124,7 @@ public class FunctionActionerTest {
 
         @SuppressWarnings("resource")
         FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
-                new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
+                new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class));
 
         // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call
         // RuntimeSpawner
@@ -186,7 +186,7 @@ public class FunctionActionerTest {
 
         @SuppressWarnings("resource")
         FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
-                new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
+                new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class));
 
 
         String pkgPathLocation = "http://invalid/my-file.jar";
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index a6569b3..2e6ed5b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -101,6 +101,7 @@ public class FunctionRuntimeManagerTest {
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class)));
         FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
         doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -183,6 +184,7 @@ public class FunctionRuntimeManagerTest {
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class)));
         FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
         doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -268,6 +270,7 @@ public class FunctionRuntimeManagerTest {
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class));
         FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
         doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -397,6 +400,7 @@ public class FunctionRuntimeManagerTest {
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class));
         FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
         doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -574,6 +578,7 @@ public class FunctionRuntimeManagerTest {
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class));
         FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
         doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -632,7 +637,7 @@ public class FunctionRuntimeManagerTest {
 
         FunctionActioner functionActioner = spy(new FunctionActioner(
                 workerConfig,
-                kubernetesRuntimeFactory, null, null, null));
+                kubernetesRuntimeFactory, null, null, null, null));
 
         // test new assignment update functions
         FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
@@ -641,6 +646,7 @@ public class FunctionRuntimeManagerTest {
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class));
         functionRuntimeManager.setFunctionActioner(functionActioner);
 
@@ -744,6 +750,7 @@ public class FunctionRuntimeManagerTest {
                     mock(Namespace.class),
                     mock(MembershipManager.class),
                     mock(ConnectorsManager.class),
+                    mock(FunctionsManager.class),
                     mock(FunctionMetaDataManager.class));
 
             fail();
@@ -767,6 +774,7 @@ public class FunctionRuntimeManagerTest {
                     mock(Namespace.class),
                     mock(MembershipManager.class),
                     mock(ConnectorsManager.class),
+                    mock(FunctionsManager.class),
                     mock(FunctionMetaDataManager.class));
 
             fail();
@@ -790,6 +798,7 @@ public class FunctionRuntimeManagerTest {
                     mock(Namespace.class),
                     mock(MembershipManager.class),
                     mock(ConnectorsManager.class),
+                    mock(FunctionsManager.class),
                     mock(FunctionMetaDataManager.class));
 
             fail();
@@ -813,6 +822,7 @@ public class FunctionRuntimeManagerTest {
                     mock(Namespace.class),
                     mock(MembershipManager.class),
                     mock(ConnectorsManager.class),
+                    mock(FunctionsManager.class),
                     mock(FunctionMetaDataManager.class));
 
             assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
@@ -841,6 +851,7 @@ public class FunctionRuntimeManagerTest {
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class));
 
         assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), KubernetesRuntimeFactory.class);
@@ -868,6 +879,7 @@ public class FunctionRuntimeManagerTest {
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class));
 
         assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ProcessRuntimeFactory.class);
@@ -891,6 +903,7 @@ public class FunctionRuntimeManagerTest {
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
                 mock(FunctionMetaDataManager.class));
 
         assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 2d0460b..14d1044 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -154,6 +154,7 @@ public class MembershipManagerTest {
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
                 functionMetaDataManager));
         MembershipManager membershipManager = spy(new MembershipManager(workerService, pulsarClient, pulsarAdmin));
 
@@ -226,6 +227,7 @@ public class MembershipManagerTest {
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
                 functionMetaDataManager));
 
         MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
@@ -313,6 +315,7 @@ public class MembershipManagerTest {
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
                 functionMetaDataManager));
         MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
 
@@ -392,6 +395,7 @@ public class MembershipManagerTest {
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
                 functionMetaDataManager));
         MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));