You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/07/13 04:58:51 UTC

[pulsar] branch master updated: Use classloaders to load Java functions (#4685)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ff1bba   Use classloaders to load Java functions (#4685)
6ff1bba is described below

commit 6ff1bbae0fe230eb72d62574d7685a745f884416
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Jul 12 21:58:44 2019 -0700

     Use classloaders to load Java functions (#4685)
    
    * Use classloading to load use code for functions
---
 bin/pulsar                                         |   2 +
 distribution/server/pom.xml                        |   6 -
 distribution/server/src/assemble/bin.xml           |   2 +
 .../pulsar/client/internal/ReflectionUtils.java    |  10 +-
 .../functions/instance/JavaInstanceRunnable.java   | 102 ++++--
 .../apache/pulsar/functions/sink/PulsarSink.java   |   9 +-
 .../pulsar/functions/source/PulsarSource.java      |   7 +-
 .../pulsar/functions/sink/PulsarSinkTest.java      |  18 +-
 .../pulsar/functions/source/PulsarSourceTest.java  |   8 +-
 .../org/apache/pulsar/functions/LocalRunner.java   |   4 +-
 pulsar-functions/runtime-all/pom.xml               | 370 ++-------------------
 .../functions/instance/JavaInstanceMain.java       | 150 +++++++++
 .../src/main/resources/java_instance_log4j2.xml    | 130 ++++++++
 .../main/resources/kubernetes_instance_log4j2.xml  |  60 ++++
 ...aInstanceMain.java => JavaInstanceStarter.java} |  79 ++---
 .../functions/runtime/KubernetesRuntime.java       |   2 +-
 .../pulsar/functions/runtime/ProcessRuntime.java   |   2 +-
 .../pulsar/functions/runtime/RuntimeUtils.java     |  19 +-
 .../functions/runtime/ThreadRuntimeFactory.java    |  34 +-
 .../src/main/resources/java_instance_log4j2.yml    | 111 -------
 .../main/resources/kubernetes_instance_log4j2.yml  |  53 ---
 .../functions/runtime/KubernetesRuntimeTest.java   |  13 +-
 .../functions/runtime/ProcessRuntimeTest.java      |  13 +-
 .../utils/functioncache/FunctionCacheEntry.java    |  18 +-
 .../functioncache/FunctionCacheManagerImpl.java    |   9 +-
 .../functions/worker/FunctionRuntimeManager.java   |   2 +-
 .../functions/worker/SchedulerManagerTest.java     |  18 +-
 27 files changed, 605 insertions(+), 646 deletions(-)

diff --git a/bin/pulsar b/bin/pulsar
index ad7736a..fd90aa1 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -283,6 +283,8 @@ OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
 OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
 OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
 OPTS="$OPTS -Dpulsar.functions.extra.dependencies.dir=${FUNCTIONS_EXTRA_DEPS_DIR}"
+OPTS="$OPTS -Dpulsar.functions.instance.classpath=${PULSAR_CLASSPATH}"
+
 
 ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=*"
 
diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml
index d233ea8..6fd5e5d 100644
--- a/distribution/server/pom.xml
+++ b/distribution/server/pom.xml
@@ -156,12 +156,6 @@
       <version>${project.version}</version>
       <!-- make sure the api examples are compiled before assembly -->
       <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>io.grpc</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
 
     <!-- local-runner -->
diff --git a/distribution/server/src/assemble/bin.xml b/distribution/server/src/assemble/bin.xml
index c2d5996..bb3e273 100644
--- a/distribution/server/src/assemble/bin.xml
+++ b/distribution/server/src/assemble/bin.xml
@@ -131,6 +131,8 @@
         <exclude>io.netty:netty-transport-native-epoll</exclude>
         <exclude>io.netty:netty-transport-native-unix-common</exclude>
 
+        <exclude>org.apache.pulsar:pulsar-functions-runtime-all</exclude>
+
         <!-- Already included in pulsar-zookeeper instrumented jar -->
         <exclude>org.apache.zookeeper:zookeeper</exclude>
 
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
index 4d9db6e..101c77d 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
@@ -50,7 +50,15 @@ class ReflectionUtils {
     @SuppressWarnings("unchecked")
     static <T> Class<T> newClassInstance(String className) {
         try {
-            return (Class<T>) DefaultImplementation.class.getClassLoader().loadClass(className);
+            try {
+                // when the API is loaded in the same classloader as the impl
+                return (Class<T>) DefaultImplementation.class.getClassLoader().loadClass(className);
+            } catch (Exception e) {
+                // when the API is loaded in a separate classloader as the impl
+                // the classloader that loaded the impl needs to be a child classloader of the classloader
+                // that loaded the API
+                return (Class<T>) Thread.currentThread().getContextClassLoader().loadClass(className);
+            }
         } catch (ClassNotFoundException e) {
             throw new RuntimeException(e);
         }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index b6c4fc8..66d4ba7 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -90,8 +90,6 @@ import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_ST
 @Slf4j
 public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
-    // The class loader that used for loading functions
-    private ClassLoader fnClassLoader;
     private final InstanceConfig instanceConfig;
     private final FunctionCacheManager fnCache;
     private final String jarFile;
@@ -132,6 +130,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     private final Map<String, String> properties;
 
+    private final ClassLoader instanceClassLoader;
+    private ClassLoader functionClassLoader;
+
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 FunctionCacheManager fnCache,
                                 String jarFile,
@@ -166,12 +167,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         // metrics collection especially in threaded mode
         // In process mode the JavaInstanceMain will declare a CollectorRegistry and pass it down
         this.collectorRegistry = collectorRegistry;
+
+        this.instanceClassLoader = Thread.currentThread().getContextClassLoader();
     }
 
     /**
      * NOTE: this method should be called in the instance thread, in order to make class loading work.
      */
-    JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception {
+    JavaInstance setupJavaInstance() throws Exception {
         // initialize the thread context
         ThreadContext.put("function", FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
         ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName());
@@ -181,18 +184,21 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails());
 
         // start the function thread
-        loadJars();
+        functionClassLoader = loadJars();
 
-        ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
         Object object = Reflections.createInstance(
                 instanceConfig.getFunctionDetails().getClassName(),
-                clsLoader);
+                functionClassLoader);
+
         if (!(object instanceof Function) && !(object instanceof java.util.function.Function)) {
             throw new RuntimeException("User class must either be Function or java.util.Function");
         }
 
         // start the state table
         setupStateTable();
+
+        ContextImpl contextImpl = setupContext();
+
         // start the output producer
         setupOutput(contextImpl);
         // start the input consumer
@@ -225,8 +231,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                     this.instanceCache.getScheduledExecutorService(),
                     this.componentType);
 
-            ContextImpl contextImpl = setupContext();
-            javaInstance = setupJavaInstance(contextImpl);
+            javaInstance = setupJavaInstance();
             if (null != stateTable) {
                 StateContextImpl stateContext = new StateContextImpl(stateTable);
                 javaInstance.getContext().setStateContext(stateContext);
@@ -254,7 +259,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                 stats.processTimeStart();
 
                 // process the message
+                Thread.currentThread().setContextClassLoader(functionClassLoader);
                 result = javaInstance.handleMessage(currentRecord, currentRecord.getValue());
+                Thread.currentThread().setContextClassLoader(instanceClassLoader);
 
                 // register end time
                 stats.processTimeEnd();
@@ -289,7 +296,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         }
     }
 
-    private void loadJars() throws Exception {
+    private ClassLoader loadJars() throws Exception {
+        ClassLoader fnClassLoader;
         try {
             log.info("Load JAR: {}", jarFile);
             // Let's first try to treat it as a nar archive
@@ -309,13 +317,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         log.info("Initialize function class loader for function {} at function cache manager",
                 instanceConfig.getFunctionDetails().getName());
 
-        this.fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
+        fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
         if (null == fnClassLoader) {
             throw new Exception("No function class loader available.");
         }
 
-        // make sure the function class loader is accessible thread-locally
-        Thread.currentThread().setContextClassLoader(fnClassLoader);
+        return fnClassLoader;
     }
 
     private void createStateTable(String tableNs, String tableName, StorageClientSettings settings) throws Exception {
@@ -425,23 +432,33 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     }
 
     private void sendOutputMessage(Record srcRecord, Object output) {
+        if (!(this.sink instanceof PulsarSink)) {
+            Thread.currentThread().setContextClassLoader(functionClassLoader);
+        }
         try {
             this.sink.write(new SinkRecord<>(srcRecord, output));
         } catch (Exception e) {
             log.info("Encountered exception in sink write: ", e);
             stats.incrSinkExceptions(e);
             throw new RuntimeException(e);
+        } finally {
+            Thread.currentThread().setContextClassLoader(instanceClassLoader);
         }
     }
 
     private Record readInput() {
         Record record;
+        if (!(this.source instanceof PulsarSource)) {
+            Thread.currentThread().setContextClassLoader(functionClassLoader);
+        }
         try {
             record = this.source.read();
         } catch (Exception e) {
             stats.incrSourceExceptions(e);
             log.info("Encountered exception in source read: ", e);
             throw new RuntimeException(e);
+        } finally {
+            Thread.currentThread().setContextClassLoader(instanceClassLoader);
         }
 
         // check record is valid
@@ -466,19 +483,29 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         }
 
         if (source != null) {
+            if (!(this.source instanceof PulsarSource)) {
+                Thread.currentThread().setContextClassLoader(functionClassLoader);
+            }
             try {
                 source.close();
             } catch (Throwable e) {
                 log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+            } finally {
+                Thread.currentThread().setContextClassLoader(instanceClassLoader);
             }
             source = null;
         }
 
         if (sink != null) {
+            if (!(this.sink instanceof PulsarSink)) {
+                Thread.currentThread().setContextClassLoader(functionClassLoader);
+            }
             try {
                 sink.close();
             } catch (Throwable e) {
                 log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+            } finally {
+                Thread.currentThread().setContextClassLoader(instanceClassLoader);
             }
             sink = null;
         }
@@ -667,11 +694,11 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                 pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
                 pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
             }
-            object = new PulsarSource(this.client, pulsarSourceConfig, this.properties);
+            object = new PulsarSource(this.client, pulsarSourceConfig, this.properties, this.functionClassLoader);
         } else {
             object = Reflections.createInstance(
                     sourceSpec.getClassName(),
-                    Thread.currentThread().getContextClassLoader());
+                    this.functionClassLoader);
         }
 
         Class<?>[] typeArgs;
@@ -683,11 +710,22 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         }
         this.source = (Source<?>) object;
 
-        if (sourceSpec.getConfigs().isEmpty()) {
-            this.source.open(new HashMap<>(), contextImpl);
-        } else {
-            this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
-                    new TypeToken<Map<String, Object>>(){}.getType()), contextImpl);
+        if (!(this.source instanceof PulsarSource)) {
+            Thread.currentThread().setContextClassLoader(this.functionClassLoader);
+        }
+        try {
+            if (sourceSpec.getConfigs().isEmpty()) {
+                this.source.open(new HashMap<>(), contextImpl);
+            } else {
+                this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
+                        new TypeToken<Map<String, Object>>() {
+                        }.getType()), contextImpl);
+            }
+        } catch (Exception e) {
+            log.error("Source open produced uncaught exception: ", e);
+            throw e;
+        } finally {
+            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
         }
     }
 
@@ -713,12 +751,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
                 pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
 
-                object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats);
+                object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, this.functionClassLoader);
             }
         } else {
             object = Reflections.createInstance(
                     sinkSpec.getClassName(),
-                    Thread.currentThread().getContextClassLoader());
+                    this.functionClassLoader);
         }
 
         if (object instanceof Sink) {
@@ -726,11 +764,23 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         } else {
             throw new RuntimeException("Sink does not implement correct interface");
         }
-        if (sinkSpec.getConfigs().isEmpty()) {
-            this.sink.open(new HashMap<>(), contextImpl);
-        } else {
-            this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(),
-                    new TypeToken<Map<String, Object>>() {}.getType()), contextImpl);
+
+        if (!(this.sink instanceof PulsarSink)) {
+            Thread.currentThread().setContextClassLoader(this.functionClassLoader);
+        }
+        try {
+            if (sinkSpec.getConfigs().isEmpty()) {
+                this.sink.open(new HashMap<>(), contextImpl);
+            } else {
+                this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(),
+                        new TypeToken<Map<String, Object>>() {
+                        }.getType()), contextImpl);
+            }
+        } catch (Exception e) {
+            log.error("Sink open produced uncaught exception: ", e);
+            throw e;
+        } finally {
+            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
         }
     }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 3f0e1a9..583fe50 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -58,6 +58,7 @@ public class PulsarSink<T> implements Sink<T> {
     private final PulsarClient client;
     private final PulsarSinkConfig pulsarSinkConfig;
     private final Map<String, String> properties;
+    private final ClassLoader functionClassLoader;
     private ComponentStatsManager stats;
 
     @VisibleForTesting
@@ -237,12 +238,14 @@ public class PulsarSink<T> implements Sink<T> {
         }
     }
 
-    public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties, ComponentStatsManager stats) {
+    public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties,
+                      ComponentStatsManager stats, ClassLoader functionClassLoader) {
         this.client = client;
         this.pulsarSinkConfig = pulsarSinkConfig;
         this.topicSchema = new TopicSchema(client);
         this.properties = properties;
         this.stats = stats;
+        this.functionClassLoader = functionClassLoader;
     }
 
     @Override
@@ -314,9 +317,7 @@ public class PulsarSink<T> implements Sink<T> {
             return (Schema<T>) Schema.BYTES;
         }
 
-        Class<?> typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(),
-                Thread.currentThread().getContextClassLoader());
-
+        Class<?> typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), functionClassLoader);
         if (Void.class.equals(typeArg)) {
             // return type is 'void', so there's no schema to check
             return null;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 4843362..cba90ff 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -45,15 +45,18 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
     private final PulsarClient pulsarClient;
     private final PulsarSourceConfig pulsarSourceConfig;
     private final Map<String, String> properties;
+    private final ClassLoader functionClassLoader;
     private List<String> inputTopics;
     private List<Consumer<T>> inputConsumers = Collections.emptyList();
     private final TopicSchema topicSchema;
 
-    public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties) {
+    public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties,
+                        ClassLoader functionClassLoader) {
         this.pulsarClient = pulsarClient;
         this.pulsarSourceConfig = pulsarConfig;
         this.topicSchema = new TopicSchema(pulsarClient);
         this.properties = properties;
+        this.functionClassLoader = functionClassLoader;
     }
 
     @Override
@@ -147,7 +150,7 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
         Map<String, ConsumerConfig<T>> configs = new TreeMap<>();
 
         Class<?> typeArg = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(),
-                Thread.currentThread().getContextClassLoader());
+                this.functionClassLoader);
 
         checkArgument(!Void.class.equals(typeArg), "Input type of Pulsar Function cannot be Void");
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 700267e..fcea8a2 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -166,7 +166,7 @@ public class PulsarSinkTest {
         PulsarSinkConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(Void.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
 
         try {
             Schema schema = pulsarSink.initializeSchema();
@@ -184,7 +184,7 @@ public class PulsarSinkTest {
         // set type to be inconsistent to that of SerDe
         pulsarConfig.setTypeClassName(Integer.class.getName());
         pulsarConfig.setSerdeClassName(TestSerDe.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
         try {
             pulsarSink.initializeSchema();
             fail("Should fail constructing java instance if function type is inconsistent with serde type");
@@ -206,7 +206,7 @@ public class PulsarSinkTest {
         PulsarSinkConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(String.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
 
         try {
             pulsarSink.initializeSchema();
@@ -225,7 +225,7 @@ public class PulsarSinkTest {
         // set type to void
         pulsarConfig.setTypeClassName(String.class.getName());
         pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE);
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
 
         try {
             pulsarSink.initializeSchema();
@@ -241,7 +241,7 @@ public class PulsarSinkTest {
         // set type to void
         pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
         pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
 
         try {
             pulsarSink.initializeSchema();
@@ -263,7 +263,7 @@ public class PulsarSinkTest {
         /** test At-least-once **/
         pulsarClient = getPulsarClient();
         pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
-        PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+        PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
 
         pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
 
@@ -309,7 +309,8 @@ public class PulsarSinkTest {
         /** test At-most-once **/
         pulsarClient = getPulsarClient();
         pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE);
-        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class),
+                Thread.currentThread().getContextClassLoader());
 
         pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
 
@@ -351,7 +352,8 @@ public class PulsarSinkTest {
         /** test Effectively-once **/
         pulsarClient = getPulsarClient();
         pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
-        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class),
+                Thread.currentThread().getContextClassLoader());
 
         pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 4438bf0..99e9d91 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -127,7 +127,7 @@ public class PulsarSourceTest {
         pulsarConfig.setTypeClassName(Void.class.getName());
 
         @Cleanup
-        PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
+        PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), Thread.currentThread().getContextClassLoader());
 
         try {
             pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
@@ -155,7 +155,7 @@ public class PulsarSourceTest {
         pulsarConfig.setTopicSchema(topicSerdeClassNameMap);
 
         @Cleanup
-        PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
+        PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), Thread.currentThread().getContextClassLoader());
         try {
             pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
             fail("Should fail constructing java instance if function type is inconsistent with serde type");
@@ -182,7 +182,7 @@ public class PulsarSourceTest {
         pulsarConfig.setTopicSchema(consumerConfigs);
 
         @Cleanup
-        PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
+        PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), Thread.currentThread().getContextClassLoader());
 
         pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
     }
@@ -197,7 +197,7 @@ public class PulsarSourceTest {
         pulsarConfig.setTopicSchema(consumerConfigs);
 
         @Cleanup
-        PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
+        PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), Thread.currentThread().getContextClassLoader());
 
         pulsarSource.setupConsumerConfigs();
     }
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index f4c6eca..ee60811 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -25,7 +25,6 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonParser;
 import lombok.Builder;
-import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.io.SinkConfig;
@@ -61,7 +60,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
 import static org.apache.pulsar.functions.utils.FunctionCommon.extractClassLoader;
-import static org.apache.pulsar.functions.utils.FunctionCommon.loadJar;
 
 @Slf4j
 public class LocalRunner {
@@ -407,7 +405,7 @@ public class LocalRunner {
                 serviceUrl,
                 stateStorageServiceUrl,
                 authConfig,
-                new ClearTextSecretsProvider(), null);
+                new ClearTextSecretsProvider(), null, null);
         for (int i = 0; i < parallelism; ++i) {
             InstanceConfig instanceConfig = new InstanceConfig();
             instanceConfig.setFunctionDetails(functionDetails);
diff --git a/pulsar-functions/runtime-all/pom.xml b/pulsar-functions/runtime-all/pom.xml
index 2393128..314efde 100644
--- a/pulsar-functions/runtime-all/pom.xml
+++ b/pulsar-functions/runtime-all/pom.xml
@@ -35,39 +35,43 @@
 
   <dependencies>
     <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-functions-runtime</artifactId>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-io-core</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
 
-    <!-- logging -->
     <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-slf4j-impl</artifactId>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-api</artifactId>
+      <version>${project.parent.version}</version>
     </dependency>
+
     <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-api</artifactId>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-api</artifactId>
+      <version>${project.parent.version}</version>
     </dependency>
+
+    <!-- logging -->
+
     <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-core</artifactId>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-client-original</artifactId>
-      <version>${project.parent.version}</version>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-all</artifactId>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-all</artifactId>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
     </dependency>
   </dependencies>
 
@@ -75,333 +79,23 @@
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <!-- get all project dependencies -->
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+          <finalName>java-instance</finalName>
+          <appendAssemblyId>false</appendAssemblyId>
+        </configuration>
         <executions>
           <execution>
+            <id>make-assembly</id>
+            <!-- bind to the packaging phase -->
             <phase>package</phase>
             <goals>
-              <goal>shade</goal>
+              <goal>single</goal>
             </goals>
-            <configuration>
-              <finalName>java-instance</finalName>
-              <minimizeJar>false</minimizeJar>
-              <shadedArtifactAttached>true</shadedArtifactAttached>
-              <transformers>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
-                <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
-              </transformers>
-              <artifactSet>
-                <excludes>
-                  <exclude>io.netty:netty-common</exclude>
-                  <exclude>io.netty:netty-buffer</exclude>
-                  <exclude>io.netty:netty-codec-http2</exclude>
-                  <exclude>io.netty:netty-codec-http</exclude>
-                  <exclude>io.netty:netty-codec-socks</exclude>
-                  <exclude>io.netty:netty-codec</exclude>
-                  <exclude>io.netty:netty-handler</exclude>
-                  <exclude>io.netty:netty-handler-proxy</exclude>
-                  <exclude>io.netty:netty-transport</exclude>
-                  <exclude>io.netty:netty-resolver</exclude>
-                </excludes>
-              </artifactSet>
-              <filters>
-                <filter>
-                  <!-- Shading signed JARs will fail without
-                      this. http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar -->
-                  <artifact>*:*</artifact>
-                  <excludes>
-                    <exclude>META-INF/*.SF</exclude>
-                    <exclude>META-INF/*.DSA</exclude>
-                    <exclude>META-INF/*.RSA</exclude>
-                  </excludes>
-                </filter>
-              </filters>
-              <relocations>
-                <relocation>
-                  <pattern>com.typesafe.netty</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.typesafe.netty</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>com.google</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.google</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.apache.http</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.http</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.apache.jute</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.jute</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>javax.servlet</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.servlet</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.junit</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.junit</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>junit</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.junit</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>net.jodah</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.jodah</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.lz4</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.lz4</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.reactivestreams</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.reactivestreams</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.apache.commons</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.commons</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>io.swagger</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.swagger</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.yaml</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.yaml</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.jctools</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.jctools</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>com.squareup.okhttp</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.squareup.okhttp</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>io.grpc</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.grpc</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.joda</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.joda</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>javax.ws.rs</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.ws.rs</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>io.kubernetes</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.kubernetes</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>io.opencensus</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.opencensus</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>net.jpountz</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.jpountz</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.aspectj</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.aspectj</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>commons-configuration</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-configuration</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.tukaani</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.tukaani</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>com.github</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.github</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>commons-io</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-io</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.apache.distributedlog</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>com.fasterxml</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.fasterxml</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.inferred</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.inferred</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.apache.bookkeeper</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.bookkeeper</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.bookkeeper</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>dlshade</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.dlshade</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.codehaus.jackson</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.jackson</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>net.java.dev.jna</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.java.dev.jna</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.apache.curator</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.curator</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>javax.validation</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.validation</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>javax.activation</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.activation</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>io.prometheus</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.prometheus</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.apache.zookeeper</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>io.jsonwebtoken</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.jsonwebtoken</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>commons-codec</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-codec</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>com.thoughtworks.paranamer</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.thoughtworks.paranamer</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.codehaus.mojo</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.mojo</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>com.github.luben</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.github.luben</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>jline</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.jline</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>commons-logging</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-logging</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.bouncycastle</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.bouncycastle</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.xerial.snappy</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.xerial.snappy</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>javax.annotation</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.annotation</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.checkerframework</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.checkerframework</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.apache.yetus</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.yetus</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>commons-cli</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-cli</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>commons-lang</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-lang</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>com.squareup.okio</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.squareup.okio</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.rocksdb</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.rocksdb</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.objenesis</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.objenesis</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.eclipse.jetty</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.eclipse.jetty</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.apache.avro</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.avro</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>avro.shaded</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.avo.shaded</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>com.yahoo</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.yahoo</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>com.beust</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.beust</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>io.netty</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.netty</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>org.hamcrest</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.hamcrest</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>aj.org</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.aj.org</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>com.scurrilous</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.scurrilous</shadedPattern>
-                </relocation>
-                <relocation>
-                  <pattern>okio</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.okio</shadedPattern>
-                </relocation>
-                <!--
-                    asynchttpclient can only be shaded to be under `org.apache.pulsar.shade`
-                    see {@link https://github.com/apache/incubator-pulsar/pull/390}
-                    and {@link https://github.com/apache/incubator-pulsar/blob/master/pulsar-client/src/main/resources/ahc.properties}
-                -->
-                <relocation>
-                  <pattern>org.asynchttpclient</pattern>
-                  <shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
-                </relocation>
-                <!-- DONT ever shade log4j, otherwise logging won't work anymore in running functions in process mode
-                <relocation>
-                  <pattern>org.apache.logging</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.logging</shadedPattern>
-                </relocation>
-                -->
-                <relocation>
-                  <pattern>io.swagger</pattern>
-                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.swagger</shadedPattern>
-                </relocation>
-              </relocations>
-            </configuration>
           </execution>
         </executions>
       </plugin>
diff --git a/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java b/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java
new file mode 100644
index 0000000..7b96927
--- /dev/null
+++ b/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.instance;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This is the initial class that gets called when starting a Java Function instance.
+ * Multiple class loaders are used to separate function instance dependencies from user code dependencies
+ * This class will create three classloaders:
+ *      1. The root classloader that will share interfaces between the function instance
+ *      classloader and user code classloader. This classloader will contain the following dependencies
+ *          - pulsar-functions-api
+ *          - pulsar-client-api
+ *          - log4j-slf4j-impl
+ *          - slf4j-api
+ *          - log4j-core
+ *          - log4j-api
+ *
+ *      2. The Function instance classloader, a child of the root classloader, that loads all pulsar broker/worker dependencies
+ *      3. The user code classloader, a child of the root classloader, that loads all user code dependencies
+ *
+ * This class should not use any other dependencies!
+ *
+ */
+public class JavaInstanceMain {
+
+    private static final String FUNCTIONS_INSTANCE_CLASSPATH = "pulsar.functions.instance.classpath";
+
+    public JavaInstanceMain() { }
+
+    public static void main(String[] args) throws Exception {
+
+        // Set root classloader to current classpath
+        ClassLoader root = Thread.currentThread().getContextClassLoader();
+
+        // Get classpath for function instance
+        String functionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH);
+        if (functionInstanceClasspath == null) {
+            throw new IllegalArgumentException("Propery " + FUNCTIONS_INSTANCE_CLASSPATH + " is not set!");
+        }
+
+        List<File> files = new LinkedList<>();
+        for (String entry: functionInstanceClasspath.split(":")) {
+            if (isBlank(entry)) {
+                continue;
+            }
+            // replace any asterisks i.e. wildcards as they don't work with url classloader
+            File f = new File(entry.replace("*", ""));
+            if (f.exists()) {
+                if (f.isDirectory()) {
+                    files.addAll(Arrays.asList(f.listFiles()));
+                } else {
+                    files.add(new File(entry));
+                }
+            } else {
+                System.out.println(String.format("[WARN] %s on functions instance classpath does not exist", f.getAbsolutePath()));
+            }
+        }
+
+        ClassLoader functionInstanceClsLoader = loadJar(root, files.toArray(new File[files.size()]));
+
+        System.out.println("Using function root classloader: " + root);
+        System.out.println("Using function instance classloader: " + functionInstanceClsLoader);
+
+        // use the function instance classloader to create org.apache.pulsar.functions.runtime.JavaInstanceStarter
+        Object main = createInstance("org.apache.pulsar.functions.runtime.JavaInstanceStarter", functionInstanceClsLoader);
+
+        // Invoke start method of JavaInstanceStarter to start the function instance code
+        Method method = main.getClass().getDeclaredMethod("start", String[].class, ClassLoader.class, ClassLoader.class);
+
+        System.out.println("Starting function instance...");
+        method.invoke(main, args, functionInstanceClsLoader, root);
+    }
+
+    public static Object createInstance(String userClassName,
+                                        ClassLoader classLoader) {
+        Class<?> theCls;
+        try {
+            theCls = Class.forName(userClassName, true, classLoader);
+        } catch (ClassNotFoundException cnfe) {
+            throw new RuntimeException("Class " + userClassName + " must be in class path", cnfe);
+        }
+        Object result;
+        try {
+            Constructor<?> meth = theCls.getDeclaredConstructor();
+            meth.setAccessible(true);
+
+            result = meth.newInstance();
+        } catch (InstantiationException ie) {
+            throw new RuntimeException("User class must be concrete", ie);
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException("Class " + userClassName + " doesn't have such method", e);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException("Class " + userClassName + " must have a no-arg constructor", e);
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException("Class " + userClassName + " constructor throws exception", e);
+        }
+        return result;
+    }
+
+    public static ClassLoader loadJar(ClassLoader parent, File[] jars) throws MalformedURLException {
+        URL[] urls = new URL[jars.length];
+        for (int i = 0; i < jars.length; i++) {
+            urls[i] = jars[i].toURI().toURL();
+        }
+        return new URLClassLoader(urls, parent);
+    }
+
+    public static boolean isBlank(String str) {
+        int strLen;
+        if (str != null && (strLen = str.length()) != 0) {
+            for(int i = 0; i < strLen; ++i) {
+                if (!Character.isWhitespace(str.charAt(i))) {
+                    return false;
+                }
+            }
+
+            return true;
+        } else {
+            return true;
+        }
+    }
+}
diff --git a/pulsar-functions/runtime-all/src/main/resources/java_instance_log4j2.xml b/pulsar-functions/runtime-all/src/main/resources/java_instance_log4j2.xml
new file mode 100644
index 0000000..843ad9e
--- /dev/null
+++ b/pulsar-functions/runtime-all/src/main/resources/java_instance_log4j2.xml
@@ -0,0 +1,130 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<Configuration>
+    <name>pulsar-functions-instance</name>
+    <monitorInterval>30</monitorInterval>
+    <Properties>
+        <Property>
+            <name>pulsar.log.appender</name>
+            <value>RollingFile</value>
+        </Property>
+        <Property>
+            <name>pulsar.log.level</name>
+            <value>info</value>
+        </Property>
+        <Property>
+            <name>bk.log.level</name>
+            <value>info</value>
+        </Property>
+    </Properties>
+    <Appenders>
+        <Console>
+            <name>Console</name>
+            <target>SYSTEM_OUT</target>
+            <PatternLayout>
+                <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
+            </PatternLayout>
+        </Console>
+        <RollingFile>
+            <name>RollingFile</name>
+            <fileName>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log</fileName>
+            <filePattern>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz</filePattern>
+            <immediateFlush>true</immediateFlush>
+            <PatternLayout>
+                <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
+            </PatternLayout>
+            <Policies>
+                <TimeBasedTriggeringPolicy>
+                    <interval>1</interval>
+                    <modulate>true</modulate>
+                </TimeBasedTriggeringPolicy>
+                <SizeBasedTriggeringPolicy>
+                    <size>1 GB</size>
+                </SizeBasedTriggeringPolicy>
+                <CronTriggeringPolicy>
+                    <schedule>0 0 0 * * ?</schedule>
+                </CronTriggeringPolicy>
+            </Policies>
+            <DefaultRolloverStrategy>
+                <Delete>
+                    <basePath>${sys:pulsar.function.log.dir}</basePath>
+                    <maxDepth>2</maxDepth>
+                    <IfFileName>
+                        <glob>*/${sys:pulsar.function.log.file}*log.gz</glob>
+                    </IfFileName>
+                    <IfLastModified>
+                        <age>30d</age>
+                    </IfLastModified>
+                </Delete>
+            </DefaultRolloverStrategy>
+        </RollingFile>
+        <RollingRandomAccessFile>
+            <name>BkRollingFile</name>
+            <fileName>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk</fileName>
+            <filePattern>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz</filePattern>
+            <immediateFlush>true</immediateFlush>
+            <PatternLayout>
+                <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
+            </PatternLayout>
+            <Policies>
+                <TimeBasedTriggeringPolicy>
+                    <interval>1</interval>
+                    <modulate>true</modulate>
+                </TimeBasedTriggeringPolicy>
+                <SizeBasedTriggeringPolicy>
+                    <size>1 GB</size>
+                </SizeBasedTriggeringPolicy>
+                <CronTriggeringPolicy>
+                    <schedule>0 0 0 * * ?</schedule>
+                </CronTriggeringPolicy>
+            </Policies>
+            <DefaultRolloverStrategy>
+                <Delete>
+                    <basePath>${sys:pulsar.function.log.dir}</basePath>
+                    <maxDepth>2</maxDepth>
+                    <IfFileName>
+                        <glob>*/${sys:pulsar.function.log.file}.bk*log.gz</glob>
+                    </IfFileName>
+                    <IfLastModified>
+                        <age>30d</age>
+                    </IfLastModified>
+                </Delete>
+            </DefaultRolloverStrategy>
+        </RollingRandomAccessFile>
+    </Appenders>
+    <Loggers>
+        <Logger>
+            <name>org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper</name>
+            <level>${sys:bk.log.level}</level>
+            <additivity>false</additivity>
+            <AppenderRef>
+                <ref>BkRollingFile</ref>
+            </AppenderRef>
+        </Logger>
+        <Root>
+            <level>info</level>
+            <AppenderRef>
+                <ref>${sys:pulsar.log.appender}</ref>
+                <level>${sys:pulsar.log.level}</level>
+            </AppenderRef>
+        </Root>
+    </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/pulsar-functions/runtime-all/src/main/resources/kubernetes_instance_log4j2.xml b/pulsar-functions/runtime-all/src/main/resources/kubernetes_instance_log4j2.xml
new file mode 100644
index 0000000..8e17a63
--- /dev/null
+++ b/pulsar-functions/runtime-all/src/main/resources/kubernetes_instance_log4j2.xml
@@ -0,0 +1,60 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<Configuration>
+    <name>pulsar-functions-kubernetes-instance</name>
+    <monitorInterval>30</monitorInterval>
+    <Properties>
+        <Property>
+            <name>pulsar.log.level</name>
+            <value>info</value>
+        </Property>
+        <Property>
+            <name>bk.log.level</name>
+            <value>info</value>
+        </Property>
+    </Properties>
+    <Appenders>
+        <Console>
+            <name>Console</name>
+            <target>SYSTEM_OUT</target>
+            <PatternLayout>
+                <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
+            </PatternLayout>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Logger>
+            <name>org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper</name>
+            <level>${sys:bk.log.level}</level>
+            <additivity>false</additivity>
+            <AppenderRef>
+                <ref>Console</ref>
+            </AppenderRef>
+        </Logger>
+        <Root>
+            <level>info</level>
+            <AppenderRef>
+                <ref>Console</ref>
+                <level>${sys:pulsar.log.level}</level>
+            </AppenderRef>
+        </Root>
+    </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
similarity index 87%
rename from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index 15af660..a26f287 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -36,7 +36,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
@@ -50,72 +50,70 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-/**
- * A function container implemented using java thread.
- */
+
 @Slf4j
-public class JavaInstanceMain implements AutoCloseable {
+public class JavaInstanceStarter implements AutoCloseable {
     @Parameter(names = "--function_details", description = "Function details json\n", required = true)
-    protected String functionDetailsJsonString;
+    public String functionDetailsJsonString;
     @Parameter(
             names = "--jar",
             description = "Path to Jar\n",
             listConverter = StringConverter.class)
-    protected String jarFile;
+    public String jarFile;
 
     @Parameter(names = "--instance_id", description = "Instance Id\n", required = true)
-    protected int instanceId;
+    public int instanceId;
 
     @Parameter(names = "--function_id", description = "Function Id\n", required = true)
-    protected String functionId;
+    public String functionId;
 
     @Parameter(names = "--function_version", description = "Function Version\n", required = true)
-    protected String functionVersion;
+    public String functionVersion;
 
     @Parameter(names = "--pulsar_serviceurl", description = "Pulsar Service Url\n", required = true)
-    protected String pulsarServiceUrl;
+    public String pulsarServiceUrl;
 
     @Parameter(names = "--client_auth_plugin", description = "Client auth plugin name\n")
-    protected String clientAuthenticationPlugin;
+    public String clientAuthenticationPlugin;
 
     @Parameter(names = "--client_auth_params", description = "Client auth param\n")
-    protected String clientAuthenticationParameters;
+    public String clientAuthenticationParameters;
 
     @Parameter(names = "--use_tls", description = "Use tls connection\n")
-    protected String useTls = Boolean.FALSE.toString();
+    public String useTls = Boolean.FALSE.toString();
 
     @Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
-    protected String tlsAllowInsecureConnection = Boolean.TRUE.toString();
+    public String tlsAllowInsecureConnection = Boolean.TRUE.toString();
 
     @Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification")
-    protected String tlsHostNameVerificationEnabled = Boolean.FALSE.toString();
+    public String tlsHostNameVerificationEnabled = Boolean.FALSE.toString();
 
     @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path")
-    protected String tlsTrustCertFilePath;
+    public String tlsTrustCertFilePath;
 
     @Parameter(names = "--state_storage_serviceurl", description = "State Storage Service Url\n", required= false)
-    protected String stateStorageServiceUrl;
+    public String stateStorageServiceUrl;
 
     @Parameter(names = "--port", description = "Port to listen on\n", required = true)
-    protected int port;
+    public int port;
 
     @Parameter(names = "--metrics_port", description = "Port metrics will be exposed on\n", required = true)
-    protected int metrics_port;
+    public int metrics_port;
 
     @Parameter(names = "--max_buffered_tuples", description = "Maximum number of tuples to buffer\n", required = true)
-    protected int maxBufferedTuples;
+    public int maxBufferedTuples;
 
     @Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in seconds between healtchecks", required = true)
-    protected int expectedHealthCheckInterval;
+    public int expectedHealthCheckInterval;
 
     @Parameter(names = "--secrets_provider", description = "The classname of the secrets provider", required = false)
-    protected String secretsProviderClassName;
+    public String secretsProviderClassName;
 
     @Parameter(names = "--secrets_provider_config", description = "The config that needs to be passed to secrets provider", required = false)
-    protected String secretsProviderConfig;
+    public String secretsProviderConfig;
 
     @Parameter(names = "--cluster_name", description = "The name of the cluster this instance is running on", required = true)
-    protected String clusterName;
+    public String clusterName;
 
     private Server server;
     private RuntimeSpawner runtimeSpawner;
@@ -124,17 +122,22 @@ public class JavaInstanceMain implements AutoCloseable {
     private HTTPServer metricsServer;
     private ScheduledFuture healthCheckTimer;
 
-    public JavaInstanceMain() { }
+    public JavaInstanceStarter() { }
 
+    public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassLoader rootClassLoader) throws Exception {
+        Thread.currentThread().setContextClassLoader(functionInstanceClassLoader);
+
+        JCommander jcommander = new JCommander(this);
+        // parse args by JCommander
+        jcommander.parse(args);
 
-    public void start() throws Exception {
         InstanceConfig instanceConfig = new InstanceConfig();
         instanceConfig.setFunctionId(functionId);
         instanceConfig.setFunctionVersion(functionVersion);
         instanceConfig.setInstanceId(instanceId);
         instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
         instanceConfig.setClusterName(clusterName);
-        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+        Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
         if (functionDetailsJsonString.charAt(0) == '\'') {
             functionDetailsJsonString = functionDetailsJsonString.substring(1);
         }
@@ -142,7 +145,7 @@ public class JavaInstanceMain implements AutoCloseable {
             functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1);
         }
         JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder);
-        FunctionDetails functionDetails = functionDetailsBuilder.build();
+        Function.FunctionDetails functionDetails = functionDetailsBuilder.build();
         instanceConfig.setFunctionDetails(functionDetails);
         instanceConfig.setPort(port);
 
@@ -164,7 +167,7 @@ public class JavaInstanceMain implements AutoCloseable {
 
         SecretsProvider secretsProvider;
         try {
-            secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, ClassLoader.getSystemClassLoader());
+            secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, functionInstanceClassLoader);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -180,7 +183,7 @@ public class JavaInstanceMain implements AutoCloseable {
                         .tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection))
                         .tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
                         .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
-                secretsProvider, collectorRegistry);
+                secretsProvider, collectorRegistry, rootClassLoader);
         runtimeSpawner = new RuntimeSpawner(
                 instanceConfig,
                 jarFile,
@@ -234,16 +237,6 @@ public class JavaInstanceMain implements AutoCloseable {
         return Boolean.TRUE.toString().equals(param);
     }
 
-    public static void main(String[] args) throws Exception {
-        JavaInstanceMain javaInstanceMain = new JavaInstanceMain();
-        JCommander jcommander = new JCommander(javaInstanceMain);
-        jcommander.setProgramName("JavaInstanceMain");
-
-        // parse args by JCommander
-        jcommander.parse(args);
-        javaInstanceMain.start();
-    }
-
     @Override
     public void close() {
         try {
@@ -309,7 +302,7 @@ public class JavaInstanceMain implements AutoCloseable {
 
         @Override
         public void getMetrics(com.google.protobuf.Empty request,
-                                       io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData> responseObserver) {
+                               io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData> responseObserver) {
             Runtime runtime = runtimeSpawner.getRuntime();
             if (runtime != null) {
                 try {
@@ -324,7 +317,7 @@ public class JavaInstanceMain implements AutoCloseable {
         }
 
         public void resetMetrics(com.google.protobuf.Empty request,
-                io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
+                                 io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
             Runtime runtime = runtimeSpawner.getRuntime();
             if (runtime != null) {
                 try {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index e2862ec..b06e405 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -185,7 +185,7 @@ public class KubernetesRuntime implements Runtime {
         }
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
             case JAVA:
-                logConfigFile = "kubernetes_instance_log4j2.yml";
+                logConfigFile = "kubernetes_instance_log4j2.xml";
                 break;
             case PYTHON:
                 logConfigFile = pulsarRootDir + "/conf/functions-logging/console_logging_config.ini";
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 43e74e8..311934e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -97,7 +97,7 @@ class ProcessRuntime implements Runtime {
         }
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
             case JAVA:
-                logConfigFile = "java_instance_log4j2.yml";
+                logConfigFile = "java_instance_log4j2.xml";
                 break;
             case PYTHON:
                 logConfigFile = System.getenv("PULSAR_HOME") + "/conf/functions-logging/logging_config.ini";
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index fca3e22..ede5c48 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -39,7 +39,6 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.instance.go.GoInstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.utils.FunctionCommon;
-import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -51,6 +50,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 public class RuntimeUtils {
 
     private static final String FUNCTIONS_EXTRA_DEPS_PROPERTY = "pulsar.functions.extra.dependencies.dir";
+    static final String FUNCTIONS_INSTANCE_CLASSPATH = "pulsar.functions.instance.classpath";
 
     public static List<String> composeCmd(InstanceConfig instanceConfig,
                                           String instanceFile,
@@ -255,17 +255,25 @@ public class RuntimeUtils {
             args.add("-cp");
 
             String classpath = instanceFile;
+
             if (StringUtils.isNotEmpty(extraDependenciesDir)) {
                 classpath = classpath + ":" + extraDependenciesDir + "/*";
             }
             args.add(classpath);
 
-            // Keep the same env property pointing to the Java instance file so that it can be picked up
-            // by the child process and manually added to classpath
-            args.add(String.format("-D%s=%s", FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile));
             if (StringUtils.isNotEmpty(extraDependenciesDir)) {
                 args.add(String.format("-D%s=%s", FUNCTIONS_EXTRA_DEPS_PROPERTY, extraDependenciesDir));
             }
+
+            // add complete classpath for broker/worker so that the function instance can load
+            // the functions instance dependencies separately from user code dependencies
+            String functionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH);
+            if (functionInstanceClasspath == null) {
+                log.warn("Property {} is not set.  Falling back to using classpath of current JVM", FUNCTIONS_INSTANCE_CLASSPATH);
+                functionInstanceClasspath = System.getProperty("java.class.path");
+            }
+            args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, functionInstanceClasspath));
+
             args.add("-Dlog4j.configurationFile=" + logConfigFile);
             args.add("-Dpulsar.function.log.dir=" + genFunctionLogFolder(logDirectory, instanceConfig));
             args.add("-Dpulsar.function.log.file=" + String.format(
@@ -283,7 +291,8 @@ public class RuntimeUtils {
                     args.add("-Xmx" + String.valueOf(resources.getRam()));
                 }
             }
-            args.add(JavaInstanceMain.class.getName());
+            args.add("org.apache.pulsar.functions.instance.JavaInstanceMain");
+
             args.add("--jar");
             args.add(originalCodeFileName);
         } else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index a3fd850..49bee87 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -20,12 +20,8 @@
 package org.apache.pulsar.functions.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import io.prometheus.client.CollectorRegistry;
 import lombok.extern.slf4j.Slf4j;
-
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -36,6 +32,13 @@ import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
 
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
 /**
  * Thread based function container factory implementation.
  */
@@ -51,21 +54,36 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     private volatile boolean closed;
 
     public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl,
-                                AuthenticationConfig authConfig, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry) throws Exception {
-        this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig), storageServiceUrl, secretsProvider, collectorRegistry);
+                                AuthenticationConfig authConfig, SecretsProvider secretsProvider,
+                                CollectorRegistry collectorRegistry, ClassLoader rootClassLoader) throws Exception {
+        this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig),
+                storageServiceUrl, secretsProvider, collectorRegistry, rootClassLoader);
     }
 
     @VisibleForTesting
     public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
-                                SecretsProvider secretsProvider, CollectorRegistry collectorRegistry) {
+                                SecretsProvider secretsProvider, CollectorRegistry collectorRegistry,
+                                ClassLoader rootClassLoader) {
+        if (rootClassLoader == null) {
+            rootClassLoader = Thread.currentThread().getContextClassLoader();
+        }
+
         this.secretsProvider = secretsProvider;
-        this.fnCache = new FunctionCacheManagerImpl();
+        this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
         this.threadGroup = new ThreadGroup(threadGroupName);
         this.pulsarClient = pulsarClient;
         this.storageServiceUrl = storageServiceUrl;
         this.collectorRegistry = collectorRegistry;
     }
 
+    public static ClassLoader loadJar(ClassLoader parent, File[] jars) throws MalformedURLException {
+        URL[] urls = new URL[jars.length];
+        for (int i = 0; i < jars.length; i++) {
+            urls[i] = jars[i].toURI().toURL();
+        }
+        return new URLClassLoader(urls, parent);
+    }
+
     private static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
             throws PulsarClientException {
         ClientBuilder clientBuilder = null;
diff --git a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml b/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml
deleted file mode 100644
index 9846f05..0000000
--- a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml
+++ /dev/null
@@ -1,111 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-Configuration:
-  name: pulsar-functions-instance
-  monitorInterval: 30
-
-  Properties:
-    Property:
-      - name: "pulsar.log.appender"
-        value: "RollingFile"
-      - name: "pulsar.log.level"
-        value: "info"
-      - name: "bk.log.level"
-        value: "info"
-
-  Appenders:
-
-    # Console
-    Console:
-      name: Console
-      target: SYSTEM_OUT
-      PatternLayout:
-        Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
-
-    # Rolling file appender configuration
-    RollingFile:
-      name: RollingFile
-      fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log"
-      filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
-      immediateFlush: true
-      PatternLayout:
-        Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
-      Policies:
-        TimeBasedTriggeringPolicy:
-          interval: 1
-          modulate: true
-        SizeBasedTriggeringPolicy:
-          size: 1 GB
-        # Trigger every day at midnight that also scan
-        # roll-over strategy that deletes older file
-        CronTriggeringPolicy:
-          schedule: "0 0 0 * * ?"
-      # Delete file older than 30days
-      DefaultRolloverStrategy:
-          Delete:
-            basePath: ${sys:pulsar.function.log.dir}
-            maxDepth: 2
-            IfFileName:
-              glob: "*/${sys:pulsar.function.log.file}*log.gz"
-            IfLastModified:
-              age: 30d
-
-    # Rolling file appender configuration for bk
-    RollingRandomAccessFile:
-      name: BkRollingFile
-      fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk"
-      filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz"
-      immediateFlush: true
-      PatternLayout:
-        Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
-      Policies:
-        TimeBasedTriggeringPolicy:
-          interval: 1
-          modulate: true
-        SizeBasedTriggeringPolicy:
-          size: 1 GB
-        # Trigger every day at midnight that also scan
-        # roll-over strategy that deletes older file
-        CronTriggeringPolicy:
-          schedule: "0 0 0 * * ?"
-      # Delete file older than 30days
-      DefaultRolloverStrategy:
-          Delete:
-            basePath: ${sys:pulsar.function.log.dir}
-            maxDepth: 2
-            IfFileName:
-              glob: "*/${sys:pulsar.function.log.file}.bk*log.gz"
-            IfLastModified:
-              age: 30d
-
-  Loggers:
-
-    Logger:
-      name: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper
-      level: "${sys:bk.log.level}"
-      additivity: false
-      AppenderRef:
-        - ref: BkRollingFile
-
-    Root:
-      level: info
-      AppenderRef:
-        - ref: "${sys:pulsar.log.appender}"
-          level: "${sys:pulsar.log.level}"
diff --git a/pulsar-functions/runtime/src/main/resources/kubernetes_instance_log4j2.yml b/pulsar-functions/runtime/src/main/resources/kubernetes_instance_log4j2.yml
deleted file mode 100644
index 98ce897..0000000
--- a/pulsar-functions/runtime/src/main/resources/kubernetes_instance_log4j2.yml
+++ /dev/null
@@ -1,53 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-Configuration:
-  name: pulsar-functions-kubernetes-instance
-  monitorInterval: 30
-
-  Properties:
-    Property:
-      - name: "pulsar.log.level"
-        value: "info"
-      - name: "bk.log.level"
-        value: "info"
-
-  Appenders:
-
-    # Console
-    Console:
-      name: Console
-      target: SYSTEM_OUT
-      PatternLayout:
-        Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
-
-  Loggers:
-
-    Logger:
-      name: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper
-      level: "${sys:bk.log.level}"
-      additivity: false
-      AppenderRef:
-        - ref: Console
-
-    Root:
-      level: info
-      AppenderRef:
-        - ref: Console
-          level: "${sys:pulsar.log.level}"
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 5128ff3..12390dc 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderCo
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.lang.reflect.Type;
@@ -43,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
 import static org.powermock.api.mockito.PowerMockito.doNothing;
 import static org.powermock.api.mockito.PowerMockito.spy;
 import static org.testng.Assert.assertEquals;
@@ -135,6 +137,11 @@ public class KubernetesRuntimeTest {
         this.logDirectory = "logs/functions";
     }
 
+    @BeforeClass
+    public void setup() {
+        System.setProperty(FUNCTIONS_INSTANCE_CLASSPATH, "/pulsar/lib/*");
+    }
+
     @AfterMethod
     public void tearDown() {
         if (null != this.factory) {
@@ -288,13 +295,13 @@ public class KubernetesRuntimeTest {
             "Actual args : " + StringUtils.join(args, " "));
 
         String expectedArgs = "exec java -cp " + classpath
-                + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
                 + extraDepsEnv
-                + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.yml "
+                + " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+                + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml "
                 + "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
                 + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
                 + " -Xmx" + String.valueOf(RESOURCES.getRam())
-                + " org.apache.pulsar.functions.runtime.JavaInstanceMain"
+                + " org.apache.pulsar.functions.instance.JavaInstanceMain"
                 + " --jar " + pulsarRootDir + "/" + userJarFile + " --instance_id "
                 + "$SHARD_ID" + " --function_id " + config.getFunctionId()
                 + " --function_version " + config.getFunctionVersion()
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 842b0a3..bdb2976 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.functions.runtime;
 
+import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
@@ -46,6 +47,7 @@ import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 /**
@@ -123,6 +125,11 @@ public class ProcessRuntimeTest {
         this.logDirectory = "Users/user/logs";
     }
 
+    @BeforeClass
+    public void setup() {
+        System.setProperty(FUNCTIONS_INSTANCE_CLASSPATH, "/pulsar/lib/*");
+    }
+
     @AfterMethod
     public void tearDown() {
         if (null != this.factory) {
@@ -266,12 +273,12 @@ public class ProcessRuntimeTest {
         }
 
         String expectedArgs = "java -cp " + classpath
-                + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
                 + extraDepsEnv
-                + " -Dlog4j.configurationFile=java_instance_log4j2.yml "
+                + " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+                + " -Dlog4j.configurationFile=java_instance_log4j2.xml "
                 + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
                 + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId()
-                + " org.apache.pulsar.functions.runtime.JavaInstanceMain"
+                + " org.apache.pulsar.functions.instance.JavaInstanceMain"
                 + " --jar " + userJarFile + " --instance_id "
                 + config.getInstanceId() + " --function_id " + config.getFunctionId()
                 + " --function_version " + config.getFunctionVersion()
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
index c55111f..5dbb804 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
@@ -61,8 +61,9 @@ public class FunctionCacheEntry implements AutoCloseable {
     FunctionCacheEntry(Collection<String> requiredJarFiles,
                        Collection<URL> requiredClasspaths,
                        URL[] libraryURLs,
-                       String initialInstanceId) {
-        this.classLoader = FunctionClassLoaders.create(libraryURLs, FunctionClassLoaders.class.getClassLoader());
+                       String initialInstanceId, ClassLoader rootClassLoader) {
+        this.classLoader = FunctionClassLoaders.create(libraryURLs, rootClassLoader);
+
         this.classpaths = requiredClasspaths.stream()
             .map(URL::toString)
             .collect(Collectors.toSet());
@@ -70,17 +71,8 @@ public class FunctionCacheEntry implements AutoCloseable {
         this.executionHolders = new HashSet<>(Collections.singleton(initialInstanceId));
     }
 
-    private static final Set<String> JAVA_INSTANCE_ADDITIONAL_JARS = isNoneBlank(
-            System.getProperty(JAVA_INSTANCE_JAR_PROPERTY))
-                    ? Collections.singleton(System.getProperty(JAVA_INSTANCE_JAR_PROPERTY))
-                    : Collections.emptySet();
-
-    FunctionCacheEntry(String narArchive, String initialInstanceId) throws IOException {
-        if (JAVA_INSTANCE_ADDITIONAL_JARS.isEmpty()) {
-            log.warn("java-instance jar path not set in system-property= {} ", JAVA_INSTANCE_JAR_PROPERTY);
-            throw new IllegalStateException(JAVA_INSTANCE_JAR_PROPERTY + " system property not set");
-        }
-        this.classLoader = NarClassLoader.getFromArchive(new File(narArchive), JAVA_INSTANCE_ADDITIONAL_JARS);
+    FunctionCacheEntry(String narArchive, String initialInstanceId, ClassLoader rootClassLoader) throws IOException {
+        this.classLoader = NarClassLoader.getFromArchive(new File(narArchive), Collections.emptySet(), rootClassLoader);
         this.classpaths = Collections.emptySet();
         this.jarFiles = Collections.singleton(narArchive);
         this.executionHolders = new HashSet<>(Collections.singleton(initialInstanceId));
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
index f2dcbe6..b5f7e42 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
@@ -37,8 +37,11 @@ public class FunctionCacheManagerImpl implements FunctionCacheManager {
     /** Registered Functions **/
     private final Map<String, FunctionCacheEntry> cacheFunctions;
 
-    public FunctionCacheManagerImpl() {
+    private ClassLoader rootClassLoader;
+
+    public FunctionCacheManagerImpl(ClassLoader rootClassLoader) {
         this.cacheFunctions = new ConcurrentHashMap<>();
+        this.rootClassLoader = rootClassLoader;
     }
 
     Map<String, FunctionCacheEntry> getCacheFunctions() {
@@ -93,7 +96,7 @@ public class FunctionCacheManagerImpl implements FunctionCacheManager {
                             requiredJarFiles,
                             requiredClasspaths,
                             urls,
-                            eid));
+                            eid, rootClassLoader));
                 } catch (Throwable cause) {
                     Exceptions.rethrowIOException(cause);
                 }
@@ -122,7 +125,7 @@ public class FunctionCacheManagerImpl implements FunctionCacheManager {
 
             // Create new cache entry
             try {
-                cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid));
+                cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader));
             } catch (Throwable cause) {
                 Exceptions.rethrowIOException(cause);
             }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 534710d..eb7272e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -140,7 +140,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
                     workerConfig.getStateStorageServiceUrl(),
                     authConfig,
                     new ClearTextSecretsProvider(),
-                     null);
+                     null, null);
         } else if (workerConfig.getProcessContainerFactory() != null) {
             this.runtimeFactory = new ProcessRuntimeFactory(
                     workerConfig.getPulsarServiceUrl(),
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 1511780..c0444e3 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -143,7 +143,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -189,7 +189,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -236,7 +236,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -296,7 +296,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -362,7 +362,7 @@ public class SchedulerManagerTest {
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
         ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider
-                (), new CollectorRegistry());
+                (), new CollectorRegistry(), null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -473,7 +473,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -600,7 +600,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
@@ -654,7 +654,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -787,7 +787,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments