You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by aa...@apache.org on 2019/11/02 03:12:32 UTC

[pulsar] branch master updated: Function runtime pluggable (#5463)

This is an automated email from the ASF dual-hosted git repository.

aahmed pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9728d54  Function runtime pluggable (#5463)
9728d54 is described below

commit 9728d5429bfe359dfc3de4b277b9fbe108e0bd8e
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Nov 1 20:12:24 2019 -0700

    Function runtime pluggable (#5463)
    
    * Allow Pulsar Function Runtimes to be pluggable
    
    * cleaning up
---
 conf/functions_worker.yml                          |  38 ++-
 .../worker/PulsarFunctionE2ESecurityTest.java      |   8 +-
 .../worker/PulsarFunctionLocalRunTest.java         |   7 +-
 .../worker/PulsarFunctionPublishTest.java          |  10 +-
 .../worker/PulsarWorkerAssignmentTest.java         |   7 +-
 .../apache/pulsar/io/PulsarFunctionAdminTest.java  |   7 +-
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    |  11 +-
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |   7 +-
 .../org/apache/pulsar/functions/LocalRunner.java   |   4 +-
 .../functions/runtime/JavaInstanceStarter.java     |   1 +
 .../pulsar/functions/runtime/RuntimeFactory.java   |  13 +
 .../pulsar/functions/runtime/RuntimeUtils.java     |   7 +-
 .../{ => kubernetes}/KubernetesRuntime.java        |   4 +-
 .../{ => kubernetes}/KubernetesRuntimeFactory.java | 277 ++++++++++-----------
 .../kubernetes/KubernetesRuntimeFactoryConfig.java | 128 ++++++++++
 .../runtime/{ => process}/ProcessRuntime.java      |   4 +-
 .../{ => process}/ProcessRuntimeFactory.java       |  68 ++++-
 .../process/ProcessRuntimeFactoryConfig.java       |  46 ++++
 .../runtime/{ => thread}/ThreadRuntime.java        |   5 +-
 .../runtime/{ => thread}/ThreadRuntimeFactory.java |  80 +++---
 .../runtime/thread/ThreadRuntimeFactoryConfig.java |  34 +++
 .../pulsar/functions/worker/WorkerConfig.java      | 212 +++++-----------
 .../KubernetesRuntimeFactoryTest.java              | 109 ++++++--
 .../{ => kubernetes}/KubernetesRuntimeTest.java    |  62 ++---
 .../runtime/{ => process}/ProcessRuntimeTest.java  |  33 ++-
 .../apache/pulsar/functions/utils/Reflections.java |   2 +-
 .../pulsar/functions/utils/ReflectionsTest.java    |   2 +-
 .../functions/worker/FunctionRuntimeManager.java   |  80 ++----
 .../functions/worker/FunctionsStatsGenerator.java  |   2 +-
 .../functions/worker/FunctionActionerTest.java     |  21 +-
 .../worker/FunctionRuntimeManagerTest.java         | 216 +++++++++++++++-
 .../functions/worker/MembershipManagerTest.java    |  19 +-
 .../functions/worker/SchedulerManagerTest.java     |   9 +-
 33 files changed, 1029 insertions(+), 504 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index ed8e18e..88aeaa7 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -92,17 +92,33 @@ topicCompactionFrequencySec: 1800
 # Function Runtime Management
 ###############################
 
-#threadContainerFactory:
+#### Process Runtime  ####
+# Pulsar function instances are launched as processes
+
+functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory
+functionRuntimeFactoryConfigs:
+    # location of log files for functions
+    logDirectory: /tmp
+    # change the jar location only when you put the java instance jar in a different location
+    javaInstanceJarLocation:
+    # change the python instance location only when you put the python instance jar in a different location
+    pythonInstanceLocation:
+    # change the extra dependencies location:
+    extraFunctionDependenciesDir:
+
+#### Thread Runtime ####
+# Pulsar function instances are run as threads
+
+#functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory
+#functionRuntimeFactoryConfigs:
+#  # thread group name
 #  threadGroupName: "Thread Function Container Group"
-processContainerFactory:
-  logDirectory:
-  # change the jar location only when you put the java instance jar in a different location
-  javaInstanceJarLocation:
-  # change the python instance location only when you put the python instance jar in a different location
-  pythonInstanceLocation:
-  # change the extra dependencies location:
-  extraFunctionDependenciesDir:
-#kubernetesContainerFactory:
+
+#### Kubernetes Runtime ####
+# Pulsar function are deployed to Kubernetes
+
+#functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory
+#functionRuntimeFactoryConfigs:
 #  # uri to kubernetes cluster, leave it to empty and it will use the kubernetes settings in function worker
 #  k8Uri:
 #  # the kubernetes namespace to run the function instances. it is `default`, if this setting is left to be empty
@@ -116,7 +132,7 @@ processContainerFactory:
 #  # setting this to true is let function worker to submit functions to the same k8s cluster as function worker
 #  # is running. setting this to false if your function worker is not running as a k8 pod.
 #  submittingInsidePod: false
-#  # setting the pulsar service url that pulsar function should use to connect to pulsar 
+#  # setting the pulsar service url that pulsar function should use to connect to pulsar
 #  # if it is not set, it will use the pulsar service url configured in worker service
 #  pulsarServiceUrl:
 #  # setting the pulsar admin url that pulsar function should use to connect to pulsar
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 5bafb77..3431793 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -47,6 +47,9 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.io.PulsarFunctionE2ETest;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.slf4j.Logger;
@@ -61,6 +64,7 @@ import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -220,7 +224,9 @@ public class PulsarFunctionE2ESecurityTest {
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
         workerConfig.setSchedulerClassName(
                 org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
         // worker talks to local broker
         workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get());
         workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index a60b0ff..e57d5b3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -51,7 +51,10 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.LocalRunner;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink;
 import org.apache.pulsar.io.datagenerator.DataGeneratorSource;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -303,7 +306,9 @@ public class PulsarFunctionLocalRunTest {
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
         workerConfig.setSchedulerClassName(
                 org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName(CLUSTER));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName(CLUSTER), Map.class));
         // worker talks to local broker
         workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get());
         workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index 3301fdb..e68f40d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -46,6 +46,9 @@ import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -235,10 +238,9 @@ public class PulsarFunctionPublishTest {
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
         workerConfig.setSchedulerClassName(
                 org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
-//        workerConfig.setProcessContainerFactory(new WorkerConfig.ProcessContainerFactory()
-//                .setJavaInstanceJarLocation("/Users/jerrypeng/workspace/incubator-pulsar/pulsar-functions/runtime-all/target/java-instance.jar")
-//                .setPythonInstanceLocation(""));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
         // worker talks to local broker
         workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get());
         workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index dc0da52..a39808b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -38,7 +38,10 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -149,7 +152,9 @@ public class PulsarWorkerAssignmentTest {
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
         workerConfig.setSchedulerClassName(
                 org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
         // worker talks to local broker
         workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get());
         workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index f8358a1..134aa96 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -47,6 +47,9 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.WorkerServer;
@@ -178,7 +181,9 @@ public class PulsarFunctionAdminTest {
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
         workerConfig.setSchedulerClassName(
                 org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
         // worker talks to local broker
         workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get());
         workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index d42ce92..422f712 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -54,9 +54,12 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -72,7 +75,6 @@ import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
@@ -197,8 +199,6 @@ public class PulsarFunctionE2ETest {
         config.setBrokerClientTlsEnabled(true);
         config.setAllowAutoTopicCreationType("non-partitioned");
 
-
-
         functionsWorkerService = createPulsarFunctionWorker(config);
         urlTls = new URL(brokerServiceUrl);
         Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
@@ -319,8 +319,9 @@ public class PulsarFunctionE2ETest {
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
         workerConfig.setSchedulerClassName(
                 org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
-        // worker talks to local broker
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));        // worker talks to local broker
         workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get());
         workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get());
         workerConfig.setFailureCheckFreqMs(100);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 41782a4..3ae6daf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -42,11 +42,14 @@ import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.WorkerServer;
@@ -163,7 +166,9 @@ public class PulsarFunctionTlsTest {
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
         workerConfig.setSchedulerClassName(
                 org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
         // worker talks to local broker
         workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get());
         workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePort().get());
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index ad67a7a..996cd5e 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -33,9 +33,9 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
+import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionCommon;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index a26f287..81a871e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.utils.Reflections;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 8cd08f8..c26ed60 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -20,8 +20,12 @@
 package org.apache.pulsar.functions.runtime;
 
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.worker.WorkerConfig;
 
 import java.util.Optional;
 
@@ -30,6 +34,11 @@ import java.util.Optional;
  */
 public interface RuntimeFactory extends AutoCloseable {
 
+    void initialize(WorkerConfig workerConfig,
+                    AuthenticationConfig authenticationConfig,
+                    SecretsProviderConfigurator secretsProviderConfigurator,
+                    Optional<FunctionAuthProvider> functionAuthProvider) throws Exception;
+
     /**
      * Create a function container to execute a java instance.
      *
@@ -53,5 +62,9 @@ public interface RuntimeFactory extends AutoCloseable {
     @Override
     void close();
 
+    static RuntimeFactory getFuntionRuntimeFactory(String className) {
+        return Reflections.createInstance(className, RuntimeFactory.class, Thread.currentThread().getContextClassLoader());
+    }
+
 }
  
\ No newline at end of file
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index ede5c48..351fb67 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -30,6 +30,7 @@ import java.net.InetAddress;
 import java.net.URL;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -50,7 +51,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 public class RuntimeUtils {
 
     private static final String FUNCTIONS_EXTRA_DEPS_PROPERTY = "pulsar.functions.extra.dependencies.dir";
-    static final String FUNCTIONS_INSTANCE_CLASSPATH = "pulsar.functions.instance.classpath";
+    public static final String FUNCTIONS_INSTANCE_CLASSPATH = "pulsar.functions.instance.classpath";
 
     public static List<String> composeCmd(InstanceConfig instanceConfig,
                                           String instanceFile,
@@ -416,4 +417,8 @@ public class RuntimeUtils {
         return input.split("\\s(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
     }
 
+    public static <T> T getRuntimeFunctionConfig(Map<String, Object> configMap, Class<T> functionRuntimeConfigClass) {
+        return ObjectMapperFactory.getThreadLocal().convertValue(configMap, functionRuntimeConfigClass);
+    }
+
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
similarity index 99%
rename from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index 01b40b6..217c0db 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.kubernetes;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.FutureCallback;
@@ -62,6 +62,8 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeUtils;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.Actions;
 import org.apache.pulsar.functions.utils.FunctionCommon;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
similarity index 54%
rename from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index 07a1853..c3fe560 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.kubernetes;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.kubernetes.client.ApiClient;
@@ -28,9 +28,9 @@ import io.kubernetes.client.models.V1ConfigMap;
 import io.kubernetes.client.util.Config;
 import java.nio.file.Paths;
 
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.functions.Resources;
@@ -39,7 +39,10 @@ import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeUtils;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.worker.WorkerConfig;
 
 import java.lang.reflect.Field;
 import java.util.Map;
@@ -54,132 +57,130 @@ import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuth
  * Kubernetes based function container factory implementation.
  */
 @Slf4j
+@Data
 public class KubernetesRuntimeFactory implements RuntimeFactory {
 
     static int NUM_RETRIES = 5;
     static long SLEEP_BETWEEN_RETRIES_MS = 500;
 
-    @Getter
-    @Setter
-    @NoArgsConstructor
-    class KubernetesInfo {
-        private String k8Uri;
-        private String jobNamespace;
-        private String pulsarDockerImageName;
-        private String imagePullPolicy;
-        private String pulsarRootDir;
-        private String pulsarAdminUrl;
-        private String pulsarServiceUrl;
-        private String pythonDependencyRepository;
-        private String pythonExtraDependencyRepository;
-        private String extraDependenciesDir;
-        private String changeConfigMap;
-        private String changeConfigMapNamespace;
-        private int percentMemoryPadding;
-        private double cpuOverCommitRatio;
-        private double memoryOverCommitRatio;
-    }
-    private final KubernetesInfo kubernetesInfo;
-    private final Boolean submittingInsidePod;
-    private final Boolean installUserCodeDependencies;
-    private final Map<String, String> customLabels;
-    private final Integer expectedMetricsCollectionInterval;
-    private final String stateStorageServiceUri;
-    private final AuthenticationConfig authConfig;
-    private final String javaInstanceJarFile;
-    private final String pythonInstanceFile;
-    private final String extraDependenciesDir;
-    private final SecretsProviderConfigurator secretsProviderConfigurator;
+    private String k8Uri;
+    private String jobNamespace;
+    private String pulsarDockerImageName;
+    private String imagePullPolicy;
+    private String pulsarRootDir;
+    private String pulsarAdminUrl;
+    private String pulsarServiceUrl;
+    private String pythonDependencyRepository;
+    private String pythonExtraDependencyRepository;
+    private String extraDependenciesDir;
+    private String changeConfigMap;
+    private String changeConfigMapNamespace;
+    private int percentMemoryPadding;
+    private double cpuOverCommitRatio;
+    private double memoryOverCommitRatio;
+    private Boolean submittingInsidePod;
+    private Boolean installUserCodeDependencies;
+    private Map<String, String> customLabels;
+    private Integer expectedMetricsCollectionInterval;
+    private String stateStorageServiceUri;
+    private AuthenticationConfig authConfig;
+    private String javaInstanceJarFile;
+    private String pythonInstanceFile;
     private final String logDirectory = "logs/functions";
+    private Resources functionInstanceMinResources;
+    private boolean authenticationEnabled;
+
+    @ToString.Exclude
+    @EqualsAndHashCode.Exclude
     private Timer changeConfigMapTimer;
+    @ToString.Exclude
+    @EqualsAndHashCode.Exclude
     private AppsV1Api appsClient;
+    @ToString.Exclude
+    @EqualsAndHashCode.Exclude
     private CoreV1Api coreClient;
-    private Resources functionInstanceMinResources;
-    private final boolean authenticationEnabled;
+    @ToString.Exclude
+    @EqualsAndHashCode.Exclude
+    private SecretsProviderConfigurator secretsProviderConfigurator;
+    @ToString.Exclude
+    @EqualsAndHashCode.Exclude
     private Optional<KubernetesFunctionAuthProvider> authProvider;
-    private final byte[] serverCaBytes;
 
-    @VisibleForTesting
-    public KubernetesRuntimeFactory(String k8Uri,
-                                    String jobNamespace,
-                                    String pulsarDockerImageName,
-                                    String imagePullPolicy,
-                                    String pulsarRootDir,
-                                    Boolean submittingInsidePod,
-                                    Boolean installUserCodeDependencies,
-                                    String pythonDependencyRepository,
-                                    String pythonExtraDependencyRepository,
-                                    String extraDependenciesDir,
-                                    Map<String, String> customLabels,
-                                    int percentMemoryPadding,
-                                    double cpuOverCommitRatio,
-                                    double memoryOverCommitRatio,
-                                    String pulsarServiceUri,
-                                    String pulsarAdminUri,
-                                    String stateStorageServiceUri,
-                                    AuthenticationConfig authConfig,
-                                    byte[] serverCaBytes,
-                                    Integer expectedMetricsCollectionInterval,
-                                    String changeConfigMap,
-                                    String changeConfigMapNamespace,
-                                    Resources functionInstanceMinResources,
-                                    SecretsProviderConfigurator secretsProviderConfigurator,
-                                    boolean authenticationEnabled,
-                                    Optional<FunctionAuthProvider> functionAuthProvider) {
-        this.kubernetesInfo = new KubernetesInfo();
-        this.kubernetesInfo.setK8Uri(k8Uri);
-        if (!isEmpty(jobNamespace)) {
-            this.kubernetesInfo.setJobNamespace(jobNamespace);
+    @ToString.Exclude
+    @EqualsAndHashCode.Exclude
+    private byte[] serverCaBytes;
+
+    @Override
+    public boolean externallyManaged() {
+        return true;
+    }
+
+    @Override
+    public void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig,
+                           SecretsProviderConfigurator secretsProviderConfigurator,
+                           Optional<FunctionAuthProvider> functionAuthProvider) {
+
+        KubernetesRuntimeFactoryConfig factoryConfig = RuntimeUtils.getRuntimeFunctionConfig(
+                workerConfig.getFunctionRuntimeFactoryConfigs(), KubernetesRuntimeFactoryConfig.class);
+
+        this.k8Uri = factoryConfig.getK8Uri();
+        if (!isEmpty(factoryConfig.getJobNamespace())) {
+            this.jobNamespace = factoryConfig.getJobNamespace();
         } else {
-            this.kubernetesInfo.setJobNamespace("default");
+            this.jobNamespace = "default";
         }
-        if (!isEmpty(pulsarDockerImageName)) {
-            this.kubernetesInfo.setPulsarDockerImageName(pulsarDockerImageName);
+        if (!isEmpty(factoryConfig.getPulsarDockerImageName())) {
+            this.pulsarDockerImageName = factoryConfig.getPulsarDockerImageName();
         } else {
-            this.kubernetesInfo.setPulsarDockerImageName("apachepulsar/pulsar");
+            this.pulsarDockerImageName = "apachepulsar/pulsar";
         }
-        if (!isEmpty(imagePullPolicy)) {
-            this.kubernetesInfo.setImagePullPolicy(imagePullPolicy);
+        if (!isEmpty(factoryConfig.getImagePullPolicy())) {
+            this.imagePullPolicy = factoryConfig.getImagePullPolicy();
         } else {
-            this.kubernetesInfo.setImagePullPolicy("IfNotPresent");
+            this.imagePullPolicy = "IfNotPresent";
         }
-        if (!isEmpty(pulsarRootDir)) {
-            this.kubernetesInfo.setPulsarRootDir(pulsarRootDir);
+        if (!isEmpty(factoryConfig.getPulsarRootDir())) {
+            this.pulsarRootDir = factoryConfig.getPulsarRootDir();
         } else {
-            this.kubernetesInfo.setPulsarRootDir("/pulsar");
+            this.pulsarRootDir = "/pulsar";
         }
-        if (StringUtils.isNotEmpty(extraDependenciesDir)) {
-            if (Paths.get(extraDependenciesDir).isAbsolute()) {
-                this.extraDependenciesDir = extraDependenciesDir;
+
+        this.submittingInsidePod = factoryConfig.getSubmittingInsidePod();
+        this.installUserCodeDependencies = factoryConfig.getInstallUserCodeDependencies();
+        this.pythonDependencyRepository = factoryConfig.getPythonDependencyRepository();
+        this.pythonExtraDependencyRepository = factoryConfig.getPythonExtraDependencyRepository();
+
+        if (StringUtils.isNotEmpty(factoryConfig.getExtraFunctionDependenciesDir())) {
+            if (Paths.get(factoryConfig.getExtraFunctionDependenciesDir()).isAbsolute()) {
+                this.extraDependenciesDir = factoryConfig.getExtraFunctionDependenciesDir();
             } else {
-                this.extraDependenciesDir = this.kubernetesInfo.getPulsarRootDir()
-                    + "/" + extraDependenciesDir;
+                this.extraDependenciesDir = this.pulsarRootDir
+                        + "/" + factoryConfig.getExtraFunctionDependenciesDir();
             }
         } else {
-            this.extraDependenciesDir = this.kubernetesInfo.getPulsarRootDir() + "/instances/deps";
+            this.extraDependenciesDir = this.pulsarRootDir + "/instances/deps";
         }
-        this.kubernetesInfo.setExtraDependenciesDir(extraDependenciesDir);
-        this.kubernetesInfo.setPythonDependencyRepository(pythonDependencyRepository);
-        this.kubernetesInfo.setPythonExtraDependencyRepository(pythonExtraDependencyRepository);
-        this.kubernetesInfo.setPulsarServiceUrl(pulsarServiceUri);
-        this.kubernetesInfo.setPulsarAdminUrl(pulsarAdminUri);
-        this.kubernetesInfo.setChangeConfigMap(changeConfigMap);
-        this.kubernetesInfo.setChangeConfigMapNamespace(changeConfigMapNamespace);
-        this.kubernetesInfo.setPercentMemoryPadding(percentMemoryPadding);
-        this.kubernetesInfo.setCpuOverCommitRatio(cpuOverCommitRatio);
-        this.kubernetesInfo.setMemoryOverCommitRatio(memoryOverCommitRatio);
-        this.submittingInsidePod = submittingInsidePod;
-        this.installUserCodeDependencies = installUserCodeDependencies;
-        this.customLabels = customLabels;
-        this.stateStorageServiceUri = stateStorageServiceUri;
-        this.authConfig = authConfig;
-        this.serverCaBytes = serverCaBytes;
-        this.javaInstanceJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/java-instance.jar";
-        this.pythonInstanceFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/python-instance/python_instance_main.py";
-        this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval;
+
+        this.customLabels = factoryConfig.getCustomLabels();
+        this.percentMemoryPadding = factoryConfig.getPercentMemoryPadding();
+        this.cpuOverCommitRatio = factoryConfig.getCpuOverCommitRatio();
+        this.memoryOverCommitRatio = factoryConfig.getMemoryOverCommitRatio();
+        this.pulsarServiceUrl = StringUtils.isEmpty(factoryConfig.getPulsarServiceUrl())
+                ? workerConfig.getPulsarServiceUrl() : factoryConfig.getPulsarServiceUrl();
+        this.pulsarAdminUrl = StringUtils.isEmpty(factoryConfig.getPulsarAdminUrl())
+                ? workerConfig.getPulsarWebServiceUrl() : factoryConfig.getPulsarAdminUrl();
+        this.stateStorageServiceUri = workerConfig.getStateStorageServiceUrl();
+        this.authConfig = authenticationConfig;
+        this.expectedMetricsCollectionInterval = factoryConfig.getExpectedMetricsCollectionInterval() == null
+                ? -1 : factoryConfig.getExpectedMetricsCollectionInterval();
+        this.changeConfigMap = factoryConfig.getChangeConfigMap();
+        this.changeConfigMapNamespace = factoryConfig.getChangeConfigMapNamespace();
+        this.functionInstanceMinResources = workerConfig.getFunctionInstanceMinResources();
         this.secretsProviderConfigurator = secretsProviderConfigurator;
-        this.functionInstanceMinResources = functionInstanceMinResources;
-        this.authenticationEnabled = authenticationEnabled;
+        this.authenticationEnabled = workerConfig.isAuthenticationEnabled();
+        this.javaInstanceJarFile = this.pulsarRootDir + "/instances/java-instance.jar";
+        this.pythonInstanceFile = this.pulsarRootDir + "/instances/python-instance/python_instance_main.py";
+        this.serverCaBytes = workerConfig.getTlsTrustChainBytes();
         try {
             setupClient();
         } catch (Exception e) {
@@ -193,19 +194,13 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
                         + functionAuthProvider.get().getClass().getName() + " must implement KubernetesFunctionAuthProvider");
             } else {
                 KubernetesFunctionAuthProvider kubernetesFunctionAuthProvider = (KubernetesFunctionAuthProvider) functionAuthProvider.get();
-                kubernetesFunctionAuthProvider.initialize(coreClient, jobNamespace, serverCaBytes);
+                kubernetesFunctionAuthProvider.initialize(coreClient, factoryConfig.getJobNamespace(), serverCaBytes);
                 this.authProvider = Optional.of(kubernetesFunctionAuthProvider);
             }
         } else {
             this.authProvider = Optional.empty();
         }
 
-
-    }
-
-    @Override
-    public boolean externallyManaged() {
-        return true;
     }
 
     @Override
@@ -237,29 +232,29 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
         return new KubernetesRuntime(
             appsClient,
             coreClient,
-            this.kubernetesInfo.getJobNamespace(),
+            jobNamespace,
             customLabels,
             installUserCodeDependencies,
-            this.kubernetesInfo.getPythonDependencyRepository(),
-            this.kubernetesInfo.getPythonExtraDependencyRepository(),
-            this.kubernetesInfo.getPulsarDockerImageName(),
-            this.kubernetesInfo.imagePullPolicy,
-            this.kubernetesInfo.getPulsarRootDir(),
+            pythonDependencyRepository,
+            pythonExtraDependencyRepository,
+            pulsarDockerImageName,
+            imagePullPolicy,
+            pulsarRootDir,
             instanceConfig,
             instanceFile,
             extraDependenciesDir,
             logDirectory,
             codePkgUrl,
             originalCodeFileName,
-            this.kubernetesInfo.getPulsarServiceUrl(),
-            this.kubernetesInfo.getPulsarAdminUrl(),
+            pulsarServiceUrl,
+            pulsarAdminUrl,
             stateStorageServiceUri,
             authConfig,
             secretsProviderConfigurator,
             expectedMetricsCollectionInterval,
-            this.kubernetesInfo.getPercentMemoryPadding(),
-            this.kubernetesInfo.getCpuOverCommitRatio(),
-            this.kubernetesInfo.getMemoryOverCommitRatio(),
+            percentMemoryPadding,
+            cpuOverCommitRatio,
+            memoryOverCommitRatio,
             authProvider,
             authenticationEnabled);
     }
@@ -272,13 +267,13 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
     public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
         KubernetesRuntime.doChecks(functionDetails);
         validateMinResourcesRequired(functionDetails);
-        secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient, kubernetesInfo.getJobNamespace(), functionDetails);
+        secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient, jobNamespace, functionDetails);
     }
 
     @VisibleForTesting
     public void setupClient() throws Exception {
         if (appsClient == null) {
-            if (this.kubernetesInfo.getK8Uri() == null) {
+            if (k8Uri == null) {
                 log.info("k8Uri is null thus going by defaults");
                 ApiClient cli;
                 if (submittingInsidePod) {
@@ -292,43 +287,47 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
                 appsClient = new AppsV1Api();
                 coreClient = new CoreV1Api();
             } else {
-                log.info("Setting up k8Client using uri " + this.kubernetesInfo.getK8Uri());
-                final ApiClient apiClient = new ApiClient().setBasePath(this.kubernetesInfo.getK8Uri());
+                log.info("Setting up k8Client using uri " + k8Uri);
+                final ApiClient apiClient = new ApiClient().setBasePath(k8Uri);
                 appsClient = new AppsV1Api(apiClient);
                 coreClient = new CoreV1Api(apiClient);
             }
 
             // Setup a timer to change stuff.
-            if (!isEmpty(this.kubernetesInfo.getChangeConfigMap())) {
+            if (!isEmpty(changeConfigMap)) {
                 changeConfigMapTimer = new Timer();
+                final KubernetesRuntimeFactory THIS = this;
                 changeConfigMapTimer.scheduleAtFixedRate(new TimerTask() {
                     @Override
                     public void run() {
-                        fetchConfigMap();
+                        fetchConfigMap(coreClient, changeConfigMap, changeConfigMapNamespace, THIS);
                     }
                 }, 300000, 300000);
             }
         }
     }
 
-    void fetchConfigMap() {
+    static void fetchConfigMap(CoreV1Api coreClient, String changeConfigMap,
+                               String changeConfigMapNamespace,
+                               KubernetesRuntimeFactory kubernetesRuntimeFactory) {
         try {
-            V1ConfigMap v1ConfigMap = coreClient.readNamespacedConfigMap(kubernetesInfo.getChangeConfigMap(), kubernetesInfo.getChangeConfigMapNamespace(), null, true, false);
+            V1ConfigMap v1ConfigMap = coreClient.readNamespacedConfigMap(changeConfigMap, changeConfigMapNamespace, null, true, false);
             Map<String, String> data = v1ConfigMap.getData();
             if (data != null) {
-                overRideKubernetesConfig(data);
+                overRideKubernetesConfig(data, kubernetesRuntimeFactory);
             }
         } catch (Exception e) {
-            log.error("Error while trying to fetch configmap {} at namespace {}", kubernetesInfo.getChangeConfigMap(), kubernetesInfo.getChangeConfigMapNamespace(), e);
+            log.error("Error while trying to fetch configmap {} at namespace {}", changeConfigMap, changeConfigMapNamespace, e);
         }
     }
 
-    void overRideKubernetesConfig(Map<String, String> data) throws Exception {
-        for (Field field : KubernetesInfo.class.getDeclaredFields()) {
+    static void overRideKubernetesConfig(Map<String, String> data,
+                                         KubernetesRuntimeFactory kubernetesRuntimeFactory) throws Exception {
+        for (Field field : KubernetesRuntimeFactory.class.getDeclaredFields()) {
             field.setAccessible(true);
-            if (data.containsKey(field.getName()) && !data.get(field.getName()).equals(field.get(kubernetesInfo))) {
-                log.info("Kubernetes Config {} changed from {} to {}", field.getName(), field.get(kubernetesInfo), data.get(field.getName()));
-                field.set(kubernetesInfo, data.get(field.getName()));
+            if (data.containsKey(field.getName()) && !data.get(field.getName()).equals(field.get(kubernetesRuntimeFactory))) {
+                log.info("Kubernetes Config {} changed from {} to {}", field.getName(), field.get(kubernetesRuntimeFactory), data.get(field.getName()));
+                field.set(kubernetesRuntimeFactory, data.get(field.getName()));
             }
         }
     }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
new file mode 100644
index 0000000..a527baf
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.runtime.kubernetes;
+
+import lombok.Data;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.common.configuration.FieldContext;
+
+import java.util.Map;
+
+@Data
+@ToString
+@Accessors(chain = true)
+public class KubernetesRuntimeFactoryConfig {
+    @FieldContext(
+        doc = "Uri to kubernetes cluster, leave it to empty and it will use the kubernetes settings in"
+            + " function worker machine"
+    )
+    protected String k8Uri;
+    @FieldContext(
+        doc = "The Kubernetes namespace to run the function instances. It is `default`,"
+            + " if this setting is left to be empty"
+    )
+    protected String jobNamespace;
+    @FieldContext(
+        doc = "The docker image used to run function instance. By default it is `apachepulsar/pulsar`"
+    )
+    protected String pulsarDockerImageName;
+
+    @FieldContext(
+            doc = "The image pull policy for image used to run function instance. By default it is `IfNotPresent`"
+    )
+    protected String imagePullPolicy;
+    @FieldContext(
+            doc = "The root directory of pulsar home directory in the pulsar docker image specified"
+                    + " `pulsarDockerImageName`. By default it is under `/pulsar`. If you are using your own"
+                    + " customized image in `pulsarDockerImageName`, you need to set this setting accordingly"
+    )
+    protected String pulsarRootDir;
+    @FieldContext(
+        doc = "This setting only takes effects if `k8Uri` is set to null. If your function worker is"
+            + " also running as a k8s pod, set this to `true` is let function worker to submit functions to"
+            + " the same k8s cluster as function worker is running. Set this to `false` if your function worker"
+            + " is not running as a k8s pod"
+    )
+    protected Boolean submittingInsidePod;
+    @FieldContext(
+        doc = "The pulsar service url that pulsar functions should use to connect to pulsar."
+            + " If it is not set, it will use the pulsar service url configured in function worker."
+    )
+    protected String pulsarServiceUrl;
+    @FieldContext(
+        doc = "The pulsar admin url that pulsar functions should use to connect to pulsar."
+            + " If it is not set, it will use the pulsar admin url configured in function worker."
+    )
+    protected String pulsarAdminUrl;
+    @FieldContext(
+        doc = "The flag indicates to install user code dependencies. (applied to python package)"
+    )
+    protected Boolean installUserCodeDependencies;
+    @FieldContext(
+        doc = "The repository that pulsar functions use to download python dependencies"
+    )
+    protected String pythonDependencyRepository;
+    @FieldContext(
+        doc = "The repository that pulsar functions use to download extra python dependencies"
+    )
+    protected String pythonExtraDependencyRepository;
+
+    @FieldContext(
+        doc = "the directory for dropping extra function dependencies. "
+            + "If it is not absolute path, it is relative to `pulsarRootDir`"
+    )
+    protected String extraFunctionDependenciesDir;
+    @FieldContext(
+        doc = "The custom labels that function worker uses to select the nodes for pods"
+    )
+    protected Map<String, String> customLabels;
+
+    @FieldContext(
+        doc = "The expected metrics collection interval, in seconds"
+    )
+    protected Integer expectedMetricsCollectionInterval = 30;
+    @FieldContext(
+        doc = "Kubernetes Runtime will periodically checkback on"
+            + " this configMap if defined and if there are any changes"
+            + " to the kubernetes specific stuff, we apply those changes"
+    )
+    protected String changeConfigMap;
+    @FieldContext(
+        doc = "The namespace for storing change config map"
+    )
+    protected String changeConfigMapNamespace;
+
+    @FieldContext(
+            doc = "Additional memory padding added on top of the memory requested by the function per on a per instance basis"
+    )
+    protected int percentMemoryPadding;
+
+    @FieldContext(
+            doc = "The ratio cpu request and cpu limit to be set for a function/source/sink." +
+                    "  The formula for cpu request is cpuRequest = userRequestCpu / cpuOverCommitRatio"
+    )
+    protected double cpuOverCommitRatio = 1.0;
+
+    @FieldContext(
+            doc = "The ratio memory request and memory limit to be set for a function/source/sink." +
+                    "  The formula for memory request is memoryRequest = userRequestMemory / memoryOverCommitRatio"
+    )
+    protected double memoryOverCommitRatio = 1.0;
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
similarity index 98%
rename from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
index 4334123..3c37372 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.process;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -37,6 +37,8 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeUtils;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
similarity index 71%
rename from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
index 87fd7d8..4e2fb8f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
@@ -17,21 +17,26 @@
  * under the License.
  */
 
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.process;
 
 import com.google.common.annotations.VisibleForTesting;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
-import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeUtils;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
+import org.apache.pulsar.functions.worker.WorkerConfig;
 
 import java.nio.file.Paths;
 import java.util.Optional;
-import java.util.function.Consumer;
 
 import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
 
@@ -39,17 +44,25 @@ import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuth
  * Thread based function container factory implementation.
  */
 @Slf4j
+@NoArgsConstructor
+@Data
 public class ProcessRuntimeFactory implements RuntimeFactory {
 
-    private final String pulsarServiceUrl;
-    private final String stateStorageServiceUrl;
-    private final boolean authenticationEnabled;
+    private String pulsarServiceUrl;
+    private String stateStorageServiceUrl;
+    private boolean authenticationEnabled;
     private AuthenticationConfig authConfig;
-    private SecretsProviderConfigurator secretsProviderConfigurator;
     private String javaInstanceJarFile;
     private String pythonInstanceFile;
     private String logDirectory;
     private String extraDependenciesDir;
+
+    @ToString.Exclude
+    @EqualsAndHashCode.Exclude
+    private SecretsProviderConfigurator secretsProviderConfigurator;
+
+    @ToString.Exclude
+    @EqualsAndHashCode.Exclude
     private Optional<FunctionAuthProvider> authProvider;
 
     @VisibleForTesting
@@ -63,6 +76,41 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
                                  SecretsProviderConfigurator secretsProviderConfigurator,
                                  boolean authenticationEnabled,
                                  Optional<FunctionAuthProvider> functionAuthProvider) {
+
+        initialize(pulsarServiceUrl, stateStorageServiceUrl, authConfig, javaInstanceJarFile,
+                pythonInstanceFile, logDirectory, extraDependenciesDir,
+                secretsProviderConfigurator, authenticationEnabled, functionAuthProvider);
+    }
+
+    @Override
+    public void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig,
+                           SecretsProviderConfigurator secretsProviderConfigurator,
+                           Optional<FunctionAuthProvider> functionAuthProvider) {
+        ProcessRuntimeFactoryConfig factoryConfig = RuntimeUtils.getRuntimeFunctionConfig(
+                workerConfig.getFunctionRuntimeFactoryConfigs(), ProcessRuntimeFactoryConfig.class);
+
+        initialize(workerConfig.getPulsarServiceUrl(),
+                workerConfig.getStateStorageServiceUrl(),
+                authenticationConfig,
+                factoryConfig.getJavaInstanceJarLocation(),
+                factoryConfig.getPythonInstanceLocation(),
+                factoryConfig.getLogDirectory(),
+                factoryConfig.getExtraFunctionDependenciesDir(),
+                secretsProviderConfigurator,
+                workerConfig.isAuthenticationEnabled(),
+                functionAuthProvider);
+    }
+
+    private void initialize(String pulsarServiceUrl,
+                            String stateStorageServiceUrl,
+                            AuthenticationConfig authConfig,
+                            String javaInstanceJarFile,
+                            String pythonInstanceFile,
+                            String logDirectory,
+                            String extraDependenciesDir,
+                            SecretsProviderConfigurator secretsProviderConfigurator,
+                            boolean authenticationEnabled,
+                            Optional<FunctionAuthProvider> functionAuthProvider) {
         this.pulsarServiceUrl = pulsarServiceUrl;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.authConfig = authConfig;
@@ -109,14 +157,14 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
 
         if (this.extraDependenciesDir == null) {
             String envProcessContainerExtraDependenciesDir =
-                System.getProperty("pulsar.functions.extra.dependencies.dir");
+                    System.getProperty("pulsar.functions.extra.dependencies.dir");
             if (null != envProcessContainerExtraDependenciesDir) {
                 log.info("Extra dependencies location is not defined using"
-                    + " the location defined in system environment : {}", envProcessContainerExtraDependenciesDir);
+                        + " the location defined in system environment : {}", envProcessContainerExtraDependenciesDir);
                 this.extraDependenciesDir = envProcessContainerExtraDependenciesDir;
             } else {
                 log.info("No extra dependencies location is defined in either"
-                    + " function worker config or system environment");
+                        + " function worker config or system environment");
             }
         }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactoryConfig.java
new file mode 100644
index 0000000..e17abd9
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactoryConfig.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.runtime.process;
+
+import lombok.Data;
+import lombok.ToString;
+import org.apache.pulsar.common.configuration.FieldContext;
+
+@Data
+@ToString
+public class ProcessRuntimeFactoryConfig {
+    @FieldContext(
+        doc = "The path to the java instance. Change the jar location only when you put"
+            + " the java instance jar in a different location"
+    )
+    protected String javaInstanceJarLocation;
+    @FieldContext(
+        doc = "The path to the python instance. Change the python instance location only"
+            + " when you put the python instance in a different location"
+    )
+    protected String pythonInstanceLocation;
+    @FieldContext(
+        doc = "The path to the log directory"
+    )
+    protected String logDirectory;
+    @FieldContext(
+        doc = "the directory for dropping extra function dependencies"
+    )
+    protected String extraFunctionDependenciesDir;
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
similarity index 97%
rename from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index 83e3782..3dea623 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.thread;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
@@ -30,6 +30,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
@@ -39,7 +40,7 @@ import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
  * A function container implemented using java thread.
  */
 @Slf4j
-class ThreadRuntime implements Runtime {
+public class ThreadRuntime implements Runtime {
 
     // The thread that invokes the function
     private Thread fnThread;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
similarity index 71%
rename from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 49bee87..5e65989 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -17,25 +17,30 @@
  * under the License.
  */
 
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.thread;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.prometheus.client.CollectorRegistry;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.functions.auth.FunctionAuthProvider;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeUtils;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
+import org.apache.pulsar.functions.worker.WorkerConfig;
 
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
+import java.util.Optional;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
@@ -43,20 +48,22 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
  * Thread based function container factory implementation.
  */
 @Slf4j
+@NoArgsConstructor
 public class ThreadRuntimeFactory implements RuntimeFactory {
 
-    private final ThreadGroup threadGroup;
-    private final FunctionCacheManager fnCache;
-    private final PulsarClient pulsarClient;
-    private final String storageServiceUrl;
-    private final SecretsProvider secretsProvider;
-    private final CollectorRegistry collectorRegistry;
+    @Getter
+    private ThreadGroup threadGroup;
+    private FunctionCacheManager fnCache;
+    private PulsarClient pulsarClient;
+    private String storageServiceUrl;
+    private SecretsProvider secretsProvider;
+    private CollectorRegistry collectorRegistry;
     private volatile boolean closed;
 
     public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl,
                                 AuthenticationConfig authConfig, SecretsProvider secretsProvider,
                                 CollectorRegistry collectorRegistry, ClassLoader rootClassLoader) throws Exception {
-        this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig),
+        initialize(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig),
                 storageServiceUrl, secretsProvider, collectorRegistry, rootClassLoader);
     }
 
@@ -64,24 +71,9 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
                                 SecretsProvider secretsProvider, CollectorRegistry collectorRegistry,
                                 ClassLoader rootClassLoader) {
-        if (rootClassLoader == null) {
-            rootClassLoader = Thread.currentThread().getContextClassLoader();
-        }
 
-        this.secretsProvider = secretsProvider;
-        this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
-        this.threadGroup = new ThreadGroup(threadGroupName);
-        this.pulsarClient = pulsarClient;
-        this.storageServiceUrl = storageServiceUrl;
-        this.collectorRegistry = collectorRegistry;
-    }
-
-    public static ClassLoader loadJar(ClassLoader parent, File[] jars) throws MalformedURLException {
-        URL[] urls = new URL[jars.length];
-        for (int i = 0; i < jars.length; i++) {
-            urls[i] = jars[i].toURI().toURL();
-        }
-        return new URLClassLoader(urls, parent);
+        initialize(threadGroupName, pulsarClient, storageServiceUrl,
+                secretsProvider, collectorRegistry, rootClassLoader);
     }
 
     private static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
@@ -104,7 +96,35 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
         }
         return null;
     }
-    
+
+    private void initialize(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
+                            SecretsProvider secretsProvider, CollectorRegistry collectorRegistry,
+                            ClassLoader rootClassLoader) {
+        if (rootClassLoader == null) {
+            rootClassLoader = Thread.currentThread().getContextClassLoader();
+        }
+
+        this.secretsProvider = secretsProvider;
+        this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
+        this.threadGroup = new ThreadGroup(threadGroupName);
+        this.pulsarClient = pulsarClient;
+        this.storageServiceUrl = storageServiceUrl;
+        this.collectorRegistry = collectorRegistry;
+    }
+
+    @Override
+    public void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig,
+                           SecretsProviderConfigurator secretsProviderConfigurator,
+                           Optional<FunctionAuthProvider> functionAuthProvider) throws Exception {
+        ThreadRuntimeFactoryConfig factoryConfig = RuntimeUtils.getRuntimeFunctionConfig(
+                workerConfig.getFunctionRuntimeFactoryConfigs(), ThreadRuntimeFactoryConfig.class);
+
+        initialize(factoryConfig.getThreadGroupName(),
+                createPulsarClient(workerConfig.getPulsarServiceUrl(), authenticationConfig),
+                workerConfig.getStateStorageServiceUrl(), new ClearTextSecretsProvider(),
+                null, null);
+    }
+
     @Override
     public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile,
                                          String originalCodeFileName,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryConfig.java
new file mode 100644
index 0000000..c078e8d
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryConfig.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.runtime.thread;
+
+import lombok.Data;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.common.configuration.FieldContext;
+
+@Data
+@ToString
+@Accessors(chain = true)
+public class ThreadRuntimeFactoryConfig {
+    @FieldContext(
+        doc = "The name of thread group running function threads"
+    )
+    protected String threadGroupName;
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
similarity index 72%
rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 7f00a61..7a25629 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -47,6 +47,10 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 
 @Data
 @Setter
@@ -328,162 +332,20 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     }
 
 
-    @Data
-    @Setter
-    @Getter
-    @EqualsAndHashCode
-    @ToString
-    public static class ThreadContainerFactory {
-        @FieldContext(
-            doc = "The name of thread group running function threads"
-        )
-        private String threadGroupName;
-    }
-    @FieldContext(
-        category = CATEGORY_FUNC_RUNTIME_MNG,
-        doc = "Thread based runtime settings"
-    )
-    private ThreadContainerFactory threadContainerFactory;
+    /******** Function Runtime configurations **********/
+
 
-    @Data
-    @Setter
-    @Getter
-    @EqualsAndHashCode
-    @ToString
-    public static class ProcessContainerFactory {
-        @FieldContext(
-            doc = "The path to the java instance. Change the jar location only when you put"
-                + " the java instance jar in a different location"
-        )
-        private String javaInstanceJarLocation;
-        @FieldContext(
-            doc = "The path to the python instance. Change the python instance location only"
-                + " when you put the python instance in a different location"
-        )
-        private String pythonInstanceLocation;
-        @FieldContext(
-            doc = "The path to the log directory"
-        )
-        private String logDirectory;
-        @FieldContext(
-            doc = "the directory for dropping extra function dependencies"
-        )
-        private String extraFunctionDependenciesDir;
-    }
     @FieldContext(
-        category = CATEGORY_FUNC_RUNTIME_MNG,
-        doc = "Process based runtime settings"
+            category = CATEGORY_FUNC_RUNTIME_MNG,
+            doc = "The classname of the function runtime factory."
     )
-    private ProcessContainerFactory processContainerFactory;
-
-    @Data
-    @Setter
-    @Getter
-    @EqualsAndHashCode
-    @ToString
-    public static class KubernetesContainerFactory {
-        @FieldContext(
-            doc = "Uri to kubernetes cluster, leave it to empty and it will use the kubernetes settings in"
-                + " function worker machine"
-        )
-        private String k8Uri;
-        @FieldContext(
-            doc = "The Kubernetes namespace to run the function instances. It is `default`,"
-                + " if this setting is left to be empty"
-        )
-        private String jobNamespace;
-        @FieldContext(
-            doc = "The docker image used to run function instance. By default it is `apachepulsar/pulsar`"
-        )
-        private String pulsarDockerImageName;
-
-        @FieldContext(
-                doc = "The image pull policy for image used to run function instance. By default it is `IfNotPresent`"
-        )
-        private String imagePullPolicy;
-        @FieldContext(
-                doc = "The root directory of pulsar home directory in the pulsar docker image specified"
-                        + " `pulsarDockerImageName`. By default it is under `/pulsar`. If you are using your own"
-                        + " customized image in `pulsarDockerImageName`, you need to set this setting accordingly"
-        )
-        private String pulsarRootDir;
-        @FieldContext(
-            doc = "This setting only takes effects if `k8Uri` is set to null. If your function worker is"
-                + " also running as a k8s pod, set this to `true` is let function worker to submit functions to"
-                + " the same k8s cluster as function worker is running. Set this to `false` if your function worker"
-                + " is not running as a k8s pod"
-        )
-        private Boolean submittingInsidePod;
-        @FieldContext(
-            doc = "The pulsar service url that pulsar functions should use to connect to pulsar."
-                + " If it is not set, it will use the pulsar service url configured in function worker."
-        )
-        private String pulsarServiceUrl;
-        @FieldContext(
-            doc = "The pulsar admin url that pulsar functions should use to connect to pulsar."
-                + " If it is not set, it will use the pulsar admin url configured in function worker."
-        )
-        private String pulsarAdminUrl;
-        @FieldContext(
-            doc = "The flag indicates to install user code dependencies. (applied to python package)"
-        )
-        private Boolean installUserCodeDependencies;
-        @FieldContext(
-            doc = "The repository that pulsar functions use to download python dependencies"
-        )
-        private String pythonDependencyRepository;
-        @FieldContext(
-            doc = "The repository that pulsar functions use to download extra python dependencies"
-        )
-        private String pythonExtraDependencyRepository;
-
-        @FieldContext(
-            doc = "the directory for dropping extra function dependencies. "
-                + "If it is not absolute path, it is relative to `pulsarRootDir`"
-        )
-        private String extraFunctionDependenciesDir;
-        @FieldContext(
-            doc = "The custom labels that function worker uses to select the nodes for pods"
-        )
-        private Map<String, String> customLabels;
-
-        @FieldContext(
-            doc = "The expected metrics collection interval, in seconds"
-        )
-        private Integer expectedMetricsCollectionInterval = 30;
-        @FieldContext(
-            doc = "Kubernetes Runtime will periodically checkback on"
-                + " this configMap if defined and if there are any changes"
-                + " to the kubernetes specific stuff, we apply those changes"
-        )
-        private String changeConfigMap;
-        @FieldContext(
-            doc = "The namespace for storing change config map"
-        )
-        private String changeConfigMapNamespace;
-
-        @FieldContext(
-                doc = "Additional memory padding added on top of the memory requested by the function per on a per instance basis"
-        )
-        private int percentMemoryPadding;
-
-        @FieldContext(
-                doc = "The ratio cpu request and cpu limit to be set for a function/source/sink." +
-                        "  The formula for cpu request is cpuRequest = userRequestCpu / cpuOverCommitRatio"
-        )
-        private double cpuOverCommitRatio = 1.0;
+    private String functionRuntimeFactoryClassName;
 
-        @FieldContext(
-                doc = "The ratio memory request and memory limit to be set for a function/source/sink." +
-                        "  The formula for memory request is memoryRequest = userRequestMemory / memoryOverCommitRatio"
-        )
-        private double memoryOverCommitRatio = 1.0;
-    }
     @FieldContext(
-        category = CATEGORY_FUNC_RUNTIME_MNG,
-        doc = "Kubernetes based runtime settings"
+            category = CATEGORY_FUNC_RUNTIME_MNG,
+            doc = "A map of configs for function runtime factory."
     )
-    private KubernetesContainerFactory kubernetesContainerFactory;
+    private Map<String, Object> functionRuntimeFactoryConfigs;
 
     @FieldContext(
         category = CATEGORY_FUNC_RUNTIME_MNG,
@@ -569,4 +431,54 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     public void setProperties(Properties properties) {
         this.properties = properties;
     }
+
+    /********* DEPRECATED CONFIGS *********/
+
+    @Deprecated
+    @Data
+    /**
+     * @Deprecated in favor for using functionRuntimeFactoryClassName and functionRuntimeFactoryConfigs
+     * for specifying the function runtime and configs to use
+     */
+    public static class ThreadContainerFactory extends ThreadRuntimeFactoryConfig {
+
+    }
+    @FieldContext(
+            category = CATEGORY_FUNC_RUNTIME_MNG,
+            doc = "Thread based runtime settings"
+    )
+    @Deprecated
+    private ThreadContainerFactory threadContainerFactory;
+
+    @Deprecated
+    @Data
+    /**
+     * @Deprecated in favor for using functionRuntimeFactoryClassName and functionRuntimeFactoryConfigs
+     * for specifying the function runtime and configs to use
+     */
+    public static class ProcessContainerFactory extends ProcessRuntimeFactoryConfig {
+
+    }
+    @FieldContext(
+            category = CATEGORY_FUNC_RUNTIME_MNG,
+            doc = "Process based runtime settings"
+    )
+    @Deprecated
+    private ProcessContainerFactory processContainerFactory;
+
+    @Deprecated
+    @Data
+    /**
+     * @Deprecated in favor for using functionRuntimeFactoryClassName and functionRuntimeFactoryConfigs
+     * for specifying the function runtime and configs to use
+     */
+    public static class KubernetesContainerFactory extends KubernetesRuntimeFactoryConfig {
+
+    }
+    @FieldContext(
+            category = CATEGORY_FUNC_RUNTIME_MNG,
+            doc = "Kubernetes based runtime settings"
+    )
+    @Deprecated
+    private KubernetesContainerFactory kubernetesContainerFactory;
 }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
similarity index 69%
rename from pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
rename to pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
index ef21e14..1bbb27d 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
@@ -17,15 +17,17 @@
  * under the License.
  */
 
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.kubernetes;
 
 import io.kubernetes.client.apis.AppsV1Api;
 import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.models.V1ConfigMap;
 import io.kubernetes.client.models.V1PodSpec;
 import io.kubernetes.client.models.V1StatefulSet;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
 import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
@@ -33,8 +35,12 @@ import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.mockito.Mockito;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
@@ -43,6 +49,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.powermock.api.mockito.PowerMockito.doNothing;
 import static org.powermock.api.mockito.PowerMockito.spy;
 import static org.testng.Assert.assertEquals;
@@ -136,31 +144,38 @@ public class KubernetesRuntimeFactoryTest {
     KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir,
                                                             Resources minResources,
                                                             Optional<FunctionAuthProvider> functionAuthProvider) throws Exception {
-        KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory(
-            null,
-            null,
-            null,
-            null,
-            pulsarRootDir,
-            false,
-            true,
-            "myrepo",
-            "anotherrepo",
-            extraDepsDir,
-            null,
-                0,
-                1.0,
-                1.0,
-                pulsarServiceUrl,
-            pulsarAdminUrl,
-            stateStorageServiceUrl,
-            null,
-            null,
-            null,
-            null,
-            null,
-                minResources,
-                new TestSecretProviderConfigurator(), false, functionAuthProvider));
+        KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
+
+        WorkerConfig workerConfig = new WorkerConfig();
+        KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
+        kubernetesRuntimeFactoryConfig.setK8Uri(null);
+        kubernetesRuntimeFactoryConfig.setJobNamespace(null);
+        kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
+        kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
+        kubernetesRuntimeFactoryConfig.setPulsarRootDir(pulsarRootDir);
+        kubernetesRuntimeFactoryConfig.setSubmittingInsidePod(false);
+        kubernetesRuntimeFactoryConfig.setInstallUserCodeDependencies(true);
+        kubernetesRuntimeFactoryConfig.setPythonDependencyRepository("myrepo");
+        kubernetesRuntimeFactoryConfig.setPythonExtraDependencyRepository("anotherrepo");
+        kubernetesRuntimeFactoryConfig.setExtraFunctionDependenciesDir(extraDepsDir);
+        kubernetesRuntimeFactoryConfig.setCustomLabels(null);
+        kubernetesRuntimeFactoryConfig.setPercentMemoryPadding(0);
+        kubernetesRuntimeFactoryConfig.setCpuOverCommitRatio(1.0);
+        kubernetesRuntimeFactoryConfig.setMemoryOverCommitRatio(1.0);
+        kubernetesRuntimeFactoryConfig.setPulsarServiceUrl(pulsarServiceUrl);
+        kubernetesRuntimeFactoryConfig.setPulsarAdminUrl(pulsarAdminUrl);
+        kubernetesRuntimeFactoryConfig.setChangeConfigMapNamespace(null);
+        kubernetesRuntimeFactoryConfig.setChangeConfigMap(null);
+
+        workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig, Map.class));
+
+        workerConfig.setFunctionInstanceMinResources(minResources);
+        workerConfig.setStateStorageServiceUrl(null);
+        workerConfig.setAuthenticationEnabled(false);
+
+        factory.initialize(workerConfig,null, new TestSecretProviderConfigurator(), functionAuthProvider);
         doNothing().when(factory).setupClient();
         return factory;
     }
@@ -317,4 +332,46 @@ public class KubernetesRuntimeFactoryTest {
             }
         }
     }
+
+    @Test
+    public void testDynamicConfigMapLoading() throws Exception {
+
+        String changeConfigMap = "changeMap";
+        String changeConfigNamespace = "changeConfigNamespace";
+
+        KubernetesRuntimeFactory kubernetesRuntimeFactory = getKuberentesRuntimeFactory();
+        CoreV1Api coreV1Api = Mockito.mock(CoreV1Api.class);
+        V1ConfigMap v1ConfigMap = new V1ConfigMap();
+        Mockito.doReturn(v1ConfigMap).when(coreV1Api).readNamespacedConfigMap(any(), any(), any(), any(), any());
+        KubernetesRuntimeFactory.fetchConfigMap(coreV1Api, changeConfigMap, changeConfigNamespace, kubernetesRuntimeFactory);
+        Mockito.verify(coreV1Api, Mockito.times(1)).readNamespacedConfigMap(eq(changeConfigMap), eq(changeConfigNamespace), eq(null), eq(true), eq(false));
+        KubernetesRuntimeFactory expected = getKuberentesRuntimeFactory();
+        assertEquals(kubernetesRuntimeFactory, expected);
+
+        HashMap<String, String> configs = new HashMap<>();
+        configs.put("pulsarDockerImageName", "test_dockerImage2");
+        configs.put("imagePullPolicy", "test_imagePullPolicy2");
+        v1ConfigMap.setData(configs);
+        KubernetesRuntimeFactory.fetchConfigMap(coreV1Api, changeConfigMap, changeConfigNamespace, kubernetesRuntimeFactory);
+        Mockito.verify(coreV1Api, Mockito.times(2)).readNamespacedConfigMap(eq(changeConfigMap), eq(changeConfigNamespace), eq(null), eq(true), eq(false));
+
+       assertEquals(kubernetesRuntimeFactory.getPulsarDockerImageName(), "test_dockerImage2");
+       assertEquals(kubernetesRuntimeFactory.getImagePullPolicy(), "test_imagePullPolicy2");
+    }
+
+    private KubernetesRuntimeFactory getKuberentesRuntimeFactory() {
+        KubernetesRuntimeFactory kubernetesRuntimeFactory = new KubernetesRuntimeFactory();
+        WorkerConfig workerConfig = new WorkerConfig();
+        KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
+        kubernetesRuntimeFactoryConfig.setK8Uri("test_k8uri");
+        kubernetesRuntimeFactoryConfig.setJobNamespace("test_jobNamespace");
+        kubernetesRuntimeFactoryConfig.setPulsarDockerImageName("test_dockerImage");
+        kubernetesRuntimeFactoryConfig.setImagePullPolicy("test_imagePullPolicy");
+        workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig, Map.class));
+        AuthenticationConfig authenticationConfig = AuthenticationConfig.builder().build();
+        kubernetesRuntimeFactory.initialize(workerConfig, authenticationConfig, new DefaultSecretsProviderConfigurator(), Optional.empty());
+        return kubernetesRuntimeFactory;
+    }
 }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
similarity index 91%
rename from pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
rename to pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 00d8930..eb47caf 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.kubernetes;
 
 import com.google.protobuf.util.JsonFormat;
 import io.kubernetes.client.apis.AppsV1Api;
@@ -25,13 +25,16 @@ import io.kubernetes.client.apis.CoreV1Api;
 import io.kubernetes.client.models.V1Container;
 import io.kubernetes.client.models.V1PodSpec;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -154,31 +157,36 @@ public class KubernetesRuntimeTest {
 
     KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
                                                             double cpuOverCommitRatio, double memoryOverCommitRatio) throws Exception {
-        KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory(
-            null,
-            null,
-            null,
-            null,
-            pulsarRootDir,
-            false,
-            true,
-            "myrepo",
-            "anotherrepo",
-            extraDepsDir,
-            null,
-                percentMemoryPadding,
-                cpuOverCommitRatio,
-                memoryOverCommitRatio,
-                pulsarServiceUrl,
-            pulsarAdminUrl,
-            stateStorageServiceUrl,
-            null,
-            null,
-            null,
-            null,
-null,
-                null, new TestSecretProviderConfigurator(), false,
-                Optional.empty()));
+        KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
+
+        WorkerConfig workerConfig = new WorkerConfig();
+        KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
+        kubernetesRuntimeFactoryConfig.setK8Uri(null);
+        kubernetesRuntimeFactoryConfig.setJobNamespace(null);
+        kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
+        kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
+        kubernetesRuntimeFactoryConfig.setPulsarRootDir(pulsarRootDir);
+        kubernetesRuntimeFactoryConfig.setSubmittingInsidePod(false);
+        kubernetesRuntimeFactoryConfig.setInstallUserCodeDependencies(true);
+        kubernetesRuntimeFactoryConfig.setPythonDependencyRepository("myrepo");
+        kubernetesRuntimeFactoryConfig.setPythonExtraDependencyRepository("anotherrepo");
+        kubernetesRuntimeFactoryConfig.setExtraFunctionDependenciesDir(extraDepsDir);
+        kubernetesRuntimeFactoryConfig.setCustomLabels(null);
+        kubernetesRuntimeFactoryConfig.setPercentMemoryPadding(percentMemoryPadding);
+        kubernetesRuntimeFactoryConfig.setCpuOverCommitRatio(cpuOverCommitRatio);
+        kubernetesRuntimeFactoryConfig.setMemoryOverCommitRatio(memoryOverCommitRatio);
+        kubernetesRuntimeFactoryConfig.setPulsarServiceUrl(pulsarServiceUrl);
+        kubernetesRuntimeFactoryConfig.setPulsarAdminUrl(pulsarAdminUrl);
+        kubernetesRuntimeFactoryConfig.setChangeConfigMapNamespace(null);
+        kubernetesRuntimeFactoryConfig.setChangeConfigMap(null);
+        workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig, Map.class));
+        workerConfig.setFunctionInstanceMinResources(null);
+        workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
+        workerConfig.setAuthenticationEnabled(false);
+
+        factory.initialize(workerConfig,null, new TestSecretProviderConfigurator(), Optional.empty());
         doNothing().when(factory).setupClient();
         return factory;
     }
@@ -242,8 +250,6 @@ null,
         Assert.assertEquals(containerSpec.getResources().getRequests().get("memory").getNumber().longValue(), expectedRamWithPadding);
     }
 
-
-
     @Test
     public void testJavaConstructor() throws Exception {
         InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
similarity index 90%
rename from pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
rename to pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index 0329ae8..84c6160 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.process;
 
 import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
 import static org.testng.Assert.assertEquals;
@@ -40,13 +40,16 @@ import java.util.Optional;
 import io.kubernetes.client.apis.AppsV1Api;
 import io.kubernetes.client.apis.CoreV1Api;
 import io.kubernetes.client.models.V1PodSpec;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -139,15 +142,25 @@ public class ProcessRuntimeTest {
     }
 
     private ProcessRuntimeFactory createProcessRuntimeFactory(String extraDependenciesDir) {
-        return new ProcessRuntimeFactory(
-            pulsarServiceUrl,
-            stateStorageServiceUrl,
-            null, /* auth config */
-            javaInstanceJarFile,
-            pythonInstanceFile,
-            logDirectory,
-            extraDependenciesDir, /* extra dependencies dir */
-            new TestSecretsProviderConfigurator(), false, Optional.empty());
+        ProcessRuntimeFactory processRuntimeFactory = new ProcessRuntimeFactory();
+
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setPulsarServiceUrl(pulsarServiceUrl);
+        workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
+        workerConfig.setAuthenticationEnabled(false);
+
+        ProcessRuntimeFactoryConfig processRuntimeFactoryConfig = new ProcessRuntimeFactoryConfig();
+        processRuntimeFactoryConfig.setJavaInstanceJarLocation(javaInstanceJarFile);
+        processRuntimeFactoryConfig.setPythonInstanceLocation(pythonInstanceFile);
+        processRuntimeFactoryConfig.setLogDirectory(logDirectory);
+        processRuntimeFactoryConfig.setExtraFunctionDependenciesDir(extraDependenciesDir);
+
+        workerConfig.setFunctionRuntimeFactoryClassName(ProcessRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(processRuntimeFactoryConfig, Map.class));
+        processRuntimeFactory.initialize(workerConfig, null, new TestSecretsProviderConfigurator(), Optional.empty());
+
+        return processRuntimeFactory;
     }
 
     FunctionDetails createFunctionDetails(FunctionDetails.Runtime runtime) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
index 1d89738..48644f0 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
@@ -67,7 +67,7 @@ public class Reflections {
             throw new RuntimeException("User class must be in class path", cnfe);
         }
         if (!xface.isAssignableFrom(theCls)) {
-            throw new RuntimeException(userClassName + " not " + xface.getName());
+            throw new RuntimeException(userClassName + " does not implement " + xface.getName());
         }
         Class<T> tCls = (Class<T>) theCls.asSubclass(xface);
         T result;
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
index da2a540..1ad03b1 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
@@ -149,7 +149,7 @@ public class ReflectionsTest {
             fail("Should fail to load a class that isn't assignable");
         } catch (RuntimeException re) {
             assertEquals(
-                aImplementation.class.getName() + " not " + bInterface.class.getName(),
+                aImplementation.class.getName() + " does not implement " + bInterface.class.getName(),
                 re.getMessage());
         }
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 08d412b..b0f41b0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -31,16 +31,17 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
-import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
-import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -150,7 +151,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
         }
         secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig());
 
-        Optional<FunctionAuthProvider> functionAuthProvider  = Optional.empty();
+        Optional<FunctionAuthProvider> functionAuthProvider = Optional.empty();
         AuthenticationConfig authConfig = null;
         if (workerConfig.isAuthenticationEnabled()) {
             authConfig = AuthenticationConfig.builder()
@@ -168,57 +169,28 @@ public class FunctionRuntimeManager implements AutoCloseable{
             }
         }
 
-        if (workerConfig.getThreadContainerFactory() != null) {
-            this.runtimeFactory = new ThreadRuntimeFactory(
-                    workerConfig.getThreadContainerFactory().getThreadGroupName(),
-                    workerConfig.getPulsarServiceUrl(),
-                    workerConfig.getStateStorageServiceUrl(),
-                    authConfig,
-                    new ClearTextSecretsProvider(),
-                     null, null);
-        } else if (workerConfig.getProcessContainerFactory() != null) {
-            this.runtimeFactory = new ProcessRuntimeFactory(
-                    workerConfig.getPulsarServiceUrl(),
-                    workerConfig.getStateStorageServiceUrl(),
-                    authConfig,
-                    workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(),
-                    workerConfig.getProcessContainerFactory().getPythonInstanceLocation(),
-                    workerConfig.getProcessContainerFactory().getLogDirectory(),
-                    workerConfig.getProcessContainerFactory().getExtraFunctionDependenciesDir(),
-                    secretsProviderConfigurator,
-                    workerConfig.isAuthenticationEnabled(),
-                    functionAuthProvider);
-        } else if (workerConfig.getKubernetesContainerFactory() != null){
-            this.runtimeFactory = new KubernetesRuntimeFactory(
-                    workerConfig.getKubernetesContainerFactory().getK8Uri(),
-                    workerConfig.getKubernetesContainerFactory().getJobNamespace(),
-                    workerConfig.getKubernetesContainerFactory().getPulsarDockerImageName(),
-                    workerConfig.getKubernetesContainerFactory().getImagePullPolicy(),
-                    workerConfig.getKubernetesContainerFactory().getPulsarRootDir(),
-                    workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(),
-                    workerConfig.getKubernetesContainerFactory().getInstallUserCodeDependencies(),
-                    workerConfig.getKubernetesContainerFactory().getPythonDependencyRepository(),
-                    workerConfig.getKubernetesContainerFactory().getPythonExtraDependencyRepository(),
-                    workerConfig.getKubernetesContainerFactory().getExtraFunctionDependenciesDir(),
-                    workerConfig.getKubernetesContainerFactory().getCustomLabels(),
-                    workerConfig.getKubernetesContainerFactory().getPercentMemoryPadding(),
-                    workerConfig.getKubernetesContainerFactory().getCpuOverCommitRatio(),
-                    workerConfig.getKubernetesContainerFactory().getMemoryOverCommitRatio(),
-                    StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(),
-                    StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
-                    workerConfig.getStateStorageServiceUrl(),
-                    authConfig,
-                    workerConfig.getTlsTrustChainBytes(),
-                    workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval(),
-                    workerConfig.getKubernetesContainerFactory().getChangeConfigMap(),
-                    workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace(),
-                    workerConfig.getFunctionInstanceMinResources(),
-                    secretsProviderConfigurator,
-                    workerConfig.isAuthenticationEnabled(),
-                    functionAuthProvider);
+        // initialize function runtime factory
+        if (!StringUtils.isEmpty(workerConfig.getFunctionRuntimeFactoryClassName())) {
+            this.runtimeFactory = RuntimeFactory.getFuntionRuntimeFactory(workerConfig.getFunctionRuntimeFactoryClassName());
         } else {
-            throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set");
+            if (workerConfig.getThreadContainerFactory() != null) {
+                this.runtimeFactory = new ThreadRuntimeFactory();
+                workerConfig.setFunctionRuntimeFactoryConfigs(
+                        ObjectMapperFactory.getThreadLocal().convertValue(workerConfig.getThreadContainerFactory(), Map.class));
+            } else if (workerConfig.getProcessContainerFactory() != null) {
+                this.runtimeFactory = new ProcessRuntimeFactory();
+                workerConfig.setFunctionRuntimeFactoryConfigs(
+                        ObjectMapperFactory.getThreadLocal().convertValue(workerConfig.getProcessContainerFactory(), Map.class));
+            } else if (workerConfig.getKubernetesContainerFactory() != null) {
+                this.runtimeFactory = new KubernetesRuntimeFactory();
+                workerConfig.setFunctionRuntimeFactoryConfigs(
+                        ObjectMapperFactory.getThreadLocal().convertValue(workerConfig.getKubernetesContainerFactory(), Map.class));
+            } else {
+                throw new RuntimeException("A Function Runtime Factory needs to be set");
+            }
         }
+        // initialize runtime
+        this.runtimeFactory.initialize(workerConfig, authConfig, secretsProviderConfigurator, functionAuthProvider);
 
         this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
                 dlogNamespace, connectorsManager, workerService.getBrokerAdmin());
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index cd186cc..327d694 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.functions.worker;
 
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
-import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.slf4j.Logger;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index 2374dce..b3ab7bb 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -29,15 +29,18 @@ import static org.mockito.Mockito.verify;
 
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.testng.annotations.Test;
 
+import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.pulsar.common.functions.Utils.FILE;
@@ -60,8 +63,10 @@ public class FunctionActionerTest {
 
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
-        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
 
@@ -99,7 +104,10 @@ public class FunctionActionerTest {
 
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
@@ -158,7 +166,10 @@ public class FunctionActionerTest {
     public void testFunctionAuthDisabled() throws Exception {
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 9845555..77407a1 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -28,9 +28,14 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.runtime.KubernetesRuntime;
-import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
+import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -40,6 +45,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Mockito.doNothing;
@@ -58,6 +64,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 @Slf4j
 public class FunctionRuntimeManagerTest {
@@ -67,7 +74,10 @@ public class FunctionRuntimeManagerTest {
 
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
@@ -147,7 +157,10 @@ public class FunctionRuntimeManagerTest {
 
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
 
@@ -229,7 +242,10 @@ public class FunctionRuntimeManagerTest {
     public void testProcessAssignmentUpdateModifyFunctions() throws Exception {
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
 
@@ -355,7 +371,10 @@ public class FunctionRuntimeManagerTest {
     public void testReassignment() throws Exception {
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
 
@@ -456,7 +475,10 @@ public class FunctionRuntimeManagerTest {
     public void testRuntimeManagerInitialize() throws Exception {
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
@@ -524,7 +546,6 @@ public class FunctionRuntimeManagerTest {
             }
         });
 
-
         when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
             @Override
             public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
@@ -532,7 +553,6 @@ public class FunctionRuntimeManagerTest {
             }
         });
 
-
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -581,9 +601,11 @@ public class FunctionRuntimeManagerTest {
     public void testExternallyManagedRuntimeUpdate() throws Exception {
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        workerConfig.setKubernetesContainerFactory(
-                new WorkerConfig.KubernetesContainerFactory()
-                        .setSubmittingInsidePod(false));
+        workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal()
+                        .convertValue(new KubernetesRuntimeFactoryConfig()
+                        .setSubmittingInsidePod(false), Map.class));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setPulsarFunctionsCluster("cluster");
@@ -703,4 +725,174 @@ public class FunctionRuntimeManagerTest {
 
         verify(kubernetesRuntime, times(1)).reinitialize();
     }
+
+    @Test
+    public void testFunctionRuntimeSetCorrectly() {
+
+        // Function runtime not set
+        try {
+            WorkerConfig workerConfig = new WorkerConfig();
+            workerConfig.setWorkerId("worker-1");
+            workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+            workerConfig.setStateStorageServiceUrl("foo");
+            workerConfig.setFunctionAssignmentTopicName("assignments");
+            new FunctionRuntimeManager(
+                    workerConfig,
+                    mock(WorkerService.class),
+                    mock(Namespace.class),
+                    mock(MembershipManager.class),
+                    mock(ConnectorsManager.class),
+                    mock(FunctionMetaDataManager.class));
+
+            fail();
+        } catch (Exception e) {
+            assertEquals(e.getMessage(), "A Function Runtime Factory needs to be set");
+        }
+
+        // Function runtime class not found
+        try {
+            WorkerConfig workerConfig = new WorkerConfig();
+            workerConfig.setWorkerId("worker-1");
+            workerConfig.setFunctionRuntimeFactoryClassName("foo");
+            workerConfig.setFunctionRuntimeFactoryConfigs(
+                    ObjectMapperFactory.getThreadLocal().convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
+            workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+            workerConfig.setStateStorageServiceUrl("foo");
+            workerConfig.setFunctionAssignmentTopicName("assignments");
+            new FunctionRuntimeManager(
+                    workerConfig,
+                    mock(WorkerService.class),
+                    mock(Namespace.class),
+                    mock(MembershipManager.class),
+                    mock(ConnectorsManager.class),
+                    mock(FunctionMetaDataManager.class));
+
+            fail();
+        } catch (Exception e) {
+            assertEquals(e.getCause().getClass(), ClassNotFoundException.class);
+        }
+
+        // Function runtime class does not implement correct interface
+        try {
+            WorkerConfig workerConfig = new WorkerConfig();
+            workerConfig.setWorkerId("worker-1");
+            workerConfig.setFunctionRuntimeFactoryClassName(FunctionRuntimeManagerTest.class.getName());
+            workerConfig.setFunctionRuntimeFactoryConfigs(
+                    ObjectMapperFactory.getThreadLocal().convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
+            workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+            workerConfig.setStateStorageServiceUrl("foo");
+            workerConfig.setFunctionAssignmentTopicName("assignments");
+            new FunctionRuntimeManager(
+                    workerConfig,
+                    mock(WorkerService.class),
+                    mock(Namespace.class),
+                    mock(MembershipManager.class),
+                    mock(ConnectorsManager.class),
+                    mock(FunctionMetaDataManager.class));
+
+            fail();
+        } catch (Exception e) {
+            assertEquals(e.getMessage(), "org.apache.pulsar.functions.worker.FunctionRuntimeManagerTest does not implement org.apache.pulsar.functions.runtime.RuntimeFactory");
+        }
+
+        // Correct runtime class
+        try {
+            WorkerConfig workerConfig = new WorkerConfig();
+            workerConfig.setWorkerId("worker-1");
+            workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+            workerConfig.setFunctionRuntimeFactoryConfigs(
+                    ObjectMapperFactory.getThreadLocal().convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
+            workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+            workerConfig.setStateStorageServiceUrl("foo");
+            workerConfig.setFunctionAssignmentTopicName("assignments");
+            FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
+                    workerConfig,
+                    mock(WorkerService.class),
+                    mock(Namespace.class),
+                    mock(MembershipManager.class),
+                    mock(ConnectorsManager.class),
+                    mock(FunctionMetaDataManager.class));
+
+            assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
+        } catch (Exception e) {
+            fail();
+        }
+    }
+
+    @Test
+    public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exception {
+
+        // Test kubernetes runtime
+        WorkerConfig.KubernetesContainerFactory kubernetesContainerFactory
+                = new WorkerConfig.KubernetesContainerFactory();
+        kubernetesContainerFactory.setK8Uri("k8Uri");
+        kubernetesContainerFactory.setJobNamespace("jobNamespace");
+        kubernetesContainerFactory.setPulsarDockerImageName("pulsarDockerImageName");
+        kubernetesContainerFactory.setImagePullPolicy("imagePullPolicy");
+        kubernetesContainerFactory.setPulsarRootDir("pulsarRootDir");
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setKubernetesContainerFactory(kubernetesContainerFactory);
+
+        FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
+                workerConfig,
+                mock(WorkerService.class),
+                mock(Namespace.class),
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class),
+                mock(FunctionMetaDataManager.class));
+
+        assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), KubernetesRuntimeFactory.class);
+        KubernetesRuntimeFactory kubernetesRuntimeFactory = (KubernetesRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
+        assertEquals(kubernetesRuntimeFactory.getK8Uri(), "k8Uri");
+        assertEquals(kubernetesRuntimeFactory.getJobNamespace(), "jobNamespace");
+        assertEquals(kubernetesRuntimeFactory.getPulsarDockerImageName(), "pulsarDockerImageName");
+        assertEquals(kubernetesRuntimeFactory.getImagePullPolicy(), "imagePullPolicy");
+        assertEquals(kubernetesRuntimeFactory.getPulsarRootDir(), "pulsarRootDir");
+
+        // Test process runtime
+
+        WorkerConfig.ProcessContainerFactory processContainerFactory
+                = new WorkerConfig.ProcessContainerFactory();
+        processContainerFactory.setExtraFunctionDependenciesDir("extraDependenciesDir");
+        processContainerFactory.setLogDirectory("logDirectory");
+        processContainerFactory.setPythonInstanceLocation("pythonInstanceLocation");
+        processContainerFactory.setJavaInstanceJarLocation("javaInstanceJarLocation");
+        workerConfig = new WorkerConfig();
+        workerConfig.setProcessContainerFactory(processContainerFactory);
+
+        functionRuntimeManager = new FunctionRuntimeManager(
+                workerConfig,
+                mock(WorkerService.class),
+                mock(Namespace.class),
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class),
+                mock(FunctionMetaDataManager.class));
+
+        assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ProcessRuntimeFactory.class);
+        ProcessRuntimeFactory processRuntimeFactory = (ProcessRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
+        assertEquals(processRuntimeFactory.getExtraDependenciesDir(), "extraDependenciesDir");
+        assertEquals(processRuntimeFactory.getLogDirectory(), "logDirectory/functions");
+        assertEquals(processRuntimeFactory.getPythonInstanceFile(), "pythonInstanceLocation");
+        assertEquals(processRuntimeFactory.getJavaInstanceJarFile(), "javaInstanceJarLocation");
+
+        // Test thread runtime
+
+        WorkerConfig.ThreadContainerFactory threadContainerFactory
+                = new WorkerConfig.ThreadContainerFactory();
+        threadContainerFactory.setThreadGroupName("threadGroupName");
+        workerConfig = new WorkerConfig();
+        workerConfig.setThreadContainerFactory(threadContainerFactory);
+
+        functionRuntimeManager = new FunctionRuntimeManager(
+                workerConfig,
+                mock(WorkerService.class),
+                mock(Namespace.class),
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class),
+                mock(FunctionMetaDataManager.class));
+
+        assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
+        ThreadRuntimeFactory threadRuntimeFactory = (ThreadRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
+        assertEquals(threadRuntimeFactory.getThreadGroup().getName(), "threadGroupName");
+    }
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 57eed32..2d0460b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -34,6 +34,7 @@ import static org.testng.Assert.assertTrue;
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -48,7 +49,10 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
@@ -59,7 +63,10 @@ public class MembershipManagerTest {
     public MembershipManagerTest() {
         this.workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
     }
@@ -278,7 +285,10 @@ public class MembershipManagerTest {
     public void testCheckFailuresSomeUnassigned() throws Exception {
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setRescheduleTimeoutMs(30000);
@@ -354,7 +364,10 @@ public class MembershipManagerTest {
     public void testHeartBeatFunctionWorkerDown() throws Exception {
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setRescheduleTimeoutMs(30000);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index c0444e3..6ee14dc 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -30,9 +30,11 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
@@ -86,7 +88,10 @@ public class SchedulerManagerTest {
     public void setup() {
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");