You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/05/11 19:15:03 UTC

[pulsar] branch master updated: Make Nar Extraction Directory configurable (#6933)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 54f8d13  Make Nar Extraction Directory configurable (#6933)
54f8d13 is described below

commit 54f8d13ba2688797caf2f553062e97dfb1b66bec
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon May 11 12:14:43 2020 -0700

    Make Nar Extraction Directory configurable (#6933)
    
    * Make Nar Extraction Directory configurable
    
    * Fixed unittests
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../bookkeeper/mledger/offload/OffloaderUtils.java     | 15 ++++++++-------
 .../org/apache/pulsar/broker/ServiceConfiguration.java |  7 +++++++
 .../java/org/apache/pulsar/broker/PulsarService.java   |  2 +-
 .../pulsar/broker/protocol/ProtocolHandlerUtils.java   | 14 ++++++++------
 .../pulsar/broker/protocol/ProtocolHandlers.java       |  4 ++--
 .../broker/protocol/ProtocolHandlerUtilsTest.java      | 15 +++++++++------
 .../org/apache/pulsar/common/nar/NarClassLoader.java   | 16 +++++++++++-----
 .../apache/pulsar/common/util/SearchBcNarUtils.java    |  2 +-
 .../functions/instance/JavaInstanceRunnable.java       |  7 +++++--
 .../functions/instance/JavaInstanceRunnableTest.java   |  2 +-
 .../java/org/apache/pulsar/functions/LocalRunner.java  | 15 +++++++++------
 .../pulsar/functions/runtime/JavaInstanceStarter.java  |  6 +++++-
 .../apache/pulsar/functions/runtime/RuntimeUtils.java  | 15 ++++++++++++---
 .../runtime/kubernetes/KubernetesRuntime.java          |  6 +++++-
 .../runtime/kubernetes/KubernetesRuntimeFactory.java   |  3 +++
 .../kubernetes/KubernetesRuntimeFactoryConfig.java     |  7 +++++++
 .../functions/runtime/process/ProcessRuntime.java      |  5 ++++-
 .../runtime/process/ProcessRuntimeFactory.java         |  8 +++++++-
 .../pulsar/functions/runtime/thread/ThreadRuntime.java | 11 ++++++++---
 .../functions/runtime/thread/ThreadRuntimeFactory.java | 18 +++++++++++-------
 .../apache/pulsar/functions/worker/WorkerConfig.java   |  6 ++++++
 .../runtime/kubernetes/KubernetesRuntimeTest.java      |  8 +++++---
 .../functions/runtime/process/ProcessRuntimeTest.java  |  8 +++++---
 .../apache/pulsar/functions/utils/FunctionCommon.java  |  7 ++++---
 .../apache/pulsar/functions/utils/SinkConfigUtils.java |  4 ++--
 .../pulsar/functions/utils/SourceConfigUtils.java      |  5 +++--
 .../utils/functioncache/FunctionCacheEntry.java        |  6 ++++--
 .../utils/functioncache/FunctionCacheManager.java      |  4 +++-
 .../utils/functioncache/FunctionCacheManagerImpl.java  |  5 +++--
 .../pulsar/functions/utils/io/ConnectorUtils.java      |  8 ++++----
 .../pulsar/functions/worker/ConnectorsManager.java     |  4 ++--
 .../pulsar/functions/worker/FunctionActioner.java      |  8 ++++----
 .../pulsar/functions/worker/rest/api/SinksImpl.java    |  3 ++-
 .../pulsar/functions/worker/rest/api/SourcesImpl.java  |  3 ++-
 .../pulsar/functions/worker/SchedulerManagerTest.java  | 18 +++++++++---------
 .../functions/worker/rest/api/FunctionsImplTest.java   |  4 ++--
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java      |  6 +++---
 .../worker/rest/api/v3/SourceApiV3ResourceTest.java    |  6 +++---
 .../apache/pulsar/sql/presto/PulsarConnectorCache.java | 13 ++++++++-----
 .../pulsar/sql/presto/PulsarConnectorConfig.java       | 15 +++++++++++++++
 .../apache/pulsar/sql/presto/PulsarRecordCursor.java   |  3 ++-
 41 files changed, 215 insertions(+), 107 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
index 845b53f..5243691 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
@@ -48,11 +48,12 @@ public class OffloaderUtils {
      * @return the offloader class name
      * @throws IOException when fail to retrieve the pulsar offloader class
      */
-    static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String narPath) throws IOException {
+    static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String narPath, String narExtractionDirectory) throws IOException {
         // need to load offloader NAR to the classloader that also loaded LedgerOffloaderFactory in case
         // LedgerOffloaderFactory is loaded by a classloader that is not the default classloader
         // as is the case for the pulsar presto plugin
-        NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), LedgerOffloaderFactory.class.getClassLoader());
+        NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(),
+                LedgerOffloaderFactory.class.getClassLoader(), narExtractionDirectory);
         String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME);
 
         OffloaderDefinition conf = ObjectMapperFactory.getThreadLocalYaml()
@@ -105,15 +106,15 @@ public class OffloaderUtils {
         }
     }
 
-    public static OffloaderDefinition getOffloaderDefinition(String narPath) throws IOException {
-        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
+    public static OffloaderDefinition getOffloaderDefinition(String narPath, String narExtractionDirectory) throws IOException {
+        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
             String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME);
 
             return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, OffloaderDefinition.class);
         }
     }
 
-    public static Offloaders searchForOffloaders(String connectorsDirectory) throws IOException {
+    public static Offloaders searchForOffloaders(String connectorsDirectory, String narExtractionDirectory) throws IOException {
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for offloaders in {}", path);
 
@@ -127,13 +128,13 @@ public class OffloaderUtils {
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             stream.forEach(archive -> {
                 try {
-                    OffloaderDefinition definition = getOffloaderDefinition(archive.toString());
+                    OffloaderDefinition definition = getOffloaderDefinition(archive.toString(), narExtractionDirectory);
                     log.info("Found offloader {} from {}", definition, archive);
 
                     if (!StringUtils.isEmpty(definition.getOffloaderFactoryClass())) {
                         // Validate offloader factory class to be present and of the right type
                         Pair<NarClassLoader,  LedgerOffloaderFactory> offloaderFactoryPair =
-                            getOffloaderFactory(archive.toString());
+                            getOffloaderFactory(archive.toString(), narExtractionDirectory);
                         if (null != offloaderFactoryPair) {
                             offloaders.getOffloaders().add(offloaderFactoryPair);
                         }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 1d10b8d..34163b4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -34,6 +34,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.protocol.Commands;
@@ -1564,6 +1565,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private int managedLedgerOffloadMaxThreads = 2;
 
     @FieldContext(
+        category = CATEGORY_STORAGE_OFFLOADING,
+        doc = "The directory where nar Extraction of offloaders happens"
+    )
+    private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
+
+    @FieldContext(
             category = CATEGORY_STORAGE_OFFLOADING,
             doc = "Maximum prefetch rounds for ledger reading for offloading"
     )
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 30d1dde..21a73a0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -838,7 +838,7 @@ public class PulsarService implements AutoCloseable {
                 checkNotNull(offloadPolicies.getOffloadersDirectory(),
                     "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
                         offloadPolicies.getManagedLedgerOffloadDriver());
-                this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory());
+                this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
                 LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
                         offloadPolicies.getManagedLedgerOffloadDriver());
                 try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtils.java
index 4a64ebf..6ccfff9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtils.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtils.java
@@ -49,8 +49,8 @@ class ProtocolHandlerUtils {
      * @return the protocol handler definition
      * @throws IOException when fail to load the protocol handler or get the definition
      */
-    public static ProtocolHandlerDefinition getProtocolHandlerDefinition(String narPath) throws IOException {
-        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
+    public static ProtocolHandlerDefinition getProtocolHandlerDefinition(String narPath, String narExtractionDirectory) throws IOException {
+        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
             return getProtocolHandlerDefinition(ncl);
         }
     }
@@ -70,7 +70,8 @@ class ProtocolHandlerUtils {
      * @return a collection of protocol handlers
      * @throws IOException when fail to load the available protocol handlers from the provided directory.
      */
-    public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirectory) throws IOException {
+    public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirectory,
+                                                               String narExtractionDirectory) throws IOException {
         Path path = Paths.get(handlersDirectory).toAbsolutePath();
         log.info("Searching for protocol handlers in {}", path);
 
@@ -84,7 +85,7 @@ class ProtocolHandlerUtils {
             for (Path archive : stream) {
                 try {
                     ProtocolHandlerDefinition phDef =
-                        ProtocolHandlerUtils.getProtocolHandlerDefinition(archive.toString());
+                        ProtocolHandlerUtils.getProtocolHandlerDefinition(archive.toString(), narExtractionDirectory);
                     log.info("Found protocol handler from {} : {}", archive, phDef);
 
                     checkArgument(StringUtils.isNotBlank(phDef.getName()));
@@ -113,11 +114,12 @@ class ProtocolHandlerUtils {
      * @param metadata the protocol handler definition.
      * @return
      */
-    static ProtocolHandlerWithClassLoader load(ProtocolHandlerMetadata metadata) throws IOException {
+    static ProtocolHandlerWithClassLoader load(ProtocolHandlerMetadata metadata,
+                                               String narExtractionDirectory) throws IOException {
         NarClassLoader ncl = NarClassLoader.getFromArchive(
             metadata.getArchivePath().toAbsolutePath().toFile(),
             Collections.emptySet(),
-            ProtocolHandler.class.getClassLoader());
+            ProtocolHandler.class.getClassLoader(), narExtractionDirectory);
 
         ProtocolHandlerDefinition phDef = getProtocolHandlerDefinition(ncl);
         if (StringUtils.isBlank(phDef.getHandlerClass())) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java
index b09912b..af0d726 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java
@@ -46,7 +46,7 @@ public class ProtocolHandlers implements AutoCloseable {
      */
     public static ProtocolHandlers load(ServiceConfiguration conf) throws IOException {
         ProtocolHandlerDefinitions definitions =
-            ProtocolHandlerUtils.searchForHandlers(conf.getProtocolHandlerDirectory());
+            ProtocolHandlerUtils.searchForHandlers(conf.getProtocolHandlerDirectory(), conf.getNarExtractionDirectory());
 
         ImmutableMap.Builder<String, ProtocolHandlerWithClassLoader> handlersBuilder = ImmutableMap.builder();
 
@@ -60,7 +60,7 @@ public class ProtocolHandlers implements AutoCloseable {
 
             ProtocolHandlerWithClassLoader handler;
             try {
-                handler = ProtocolHandlerUtils.load(definition);
+                handler = ProtocolHandlerUtils.load(definition, conf.getNarExtractionDirectory());
             } catch (IOException e) {
                 log.error("Failed to load the protocol handler for protocol `" + protocol + "`", e);
                 throw new RuntimeException("Failed to load the protocol handler for protocol `" + protocol + "`");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtilsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtilsTest.java
index e7910dc..3fda508 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtilsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtilsTest.java
@@ -75,10 +75,11 @@ public class ProtocolHandlerUtilsTest {
         PowerMockito.when(NarClassLoader.getFromArchive(
             any(File.class),
             any(Set.class),
-            any(ClassLoader.class)
+            any(ClassLoader.class),
+            any(String.class)
         )).thenReturn(mockLoader);
 
-        ProtocolHandlerWithClassLoader returnedPhWithCL = ProtocolHandlerUtils.load(metadata);
+        ProtocolHandlerWithClassLoader returnedPhWithCL = ProtocolHandlerUtils.load(metadata, "");
         ProtocolHandler returnedPh = returnedPhWithCL.getHandler();
 
         assertSame(mockLoader, returnedPhWithCL.getClassLoader());
@@ -107,11 +108,12 @@ public class ProtocolHandlerUtilsTest {
         PowerMockito.when(NarClassLoader.getFromArchive(
             any(File.class),
             any(Set.class),
-            any(ClassLoader.class)
+            any(ClassLoader.class),
+            any(String.class)
         )).thenReturn(mockLoader);
 
         try {
-            ProtocolHandlerUtils.load(metadata);
+            ProtocolHandlerUtils.load(metadata, "");
             fail("Should not reach here");
         } catch (IOException ioe) {
             // expected
@@ -141,11 +143,12 @@ public class ProtocolHandlerUtilsTest {
         PowerMockito.when(NarClassLoader.getFromArchive(
             any(File.class),
             any(Set.class),
-            any(ClassLoader.class)
+            any(ClassLoader.class),
+            any(String.class)
         )).thenReturn(mockLoader);
 
         try {
-            ProtocolHandlerUtils.load(metadata);
+            ProtocolHandlerUtils.load(metadata, "");
             fail("Should not reach here");
         } catch (IOException ioe) {
             // expected
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index 3fdfef0..e33cbfa 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -135,10 +135,11 @@ public class NarClassLoader extends URLClassLoader {
 
     private static final String TMP_DIR_PREFIX = "pulsar-nar";
 
-    private static final File NAR_CACHE_DIR = new File(System.getProperty("java.io.tmpdir") + "/" + TMP_DIR_PREFIX);
+    public static final String DEFAULT_NAR_EXTRACTION_DIR = System.getProperty("java.io.tmpdir");
 
-    public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars) throws IOException {
-        File unpacked = NarUnpacker.unpackNar(narPath, NAR_CACHE_DIR);
+    public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars,
+                                                String narExtractionDirectory) throws IOException {
+        File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
         try {
             return new NarClassLoader(unpacked, additionalJars, NarClassLoader.class.getClassLoader());
         } catch (ClassNotFoundException | NoClassDefFoundError e) {
@@ -146,9 +147,10 @@ public class NarClassLoader extends URLClassLoader {
         }
     }
 
-    public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars, ClassLoader parent)
+    public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars, ClassLoader parent,
+                                                String narExtractionDirectory)
         throws IOException {
-        File unpacked = NarUnpacker.unpackNar(narPath, NAR_CACHE_DIR);
+        File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
         try {
             return new NarClassLoader(unpacked, additionalJars, parent);
         } catch (ClassNotFoundException | NoClassDefFoundError e) {
@@ -156,6 +158,10 @@ public class NarClassLoader extends URLClassLoader {
         }
     }
 
+    private static File getNarExtractionDirectory(String configuredDirectory) {
+        return new File(configuredDirectory + "/" + TMP_DIR_PREFIX);
+    }
+
     /**
      * Construct a nar class loader.
      *
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SearchBcNarUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SearchBcNarUtils.java
index 0dfa469..59bd652 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SearchBcNarUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SearchBcNarUtils.java
@@ -65,7 +65,7 @@ public class SearchBcNarUtils {
                 NarClassLoader ncl = NarClassLoader.getFromArchive(
                         new File(narPath),
                         Collections.emptySet(),
-                        BCLoader.class.getClassLoader());
+                        BCLoader.class.getClassLoader(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);
                 String configStr = ncl.getServiceDefinition(BC_DEF_NAME);
 
                 BcNarDefinition nar = ObjectMapperFactory.getThreadLocalYaml()
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 66cdb60..0fa4fc6 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -133,6 +133,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     private final ClassLoader instanceClassLoader;
     private ClassLoader functionClassLoader;
+    private String narExtractionDirectory;
 
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 FunctionCacheManager fnCache,
@@ -140,7 +141,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                                 PulsarClient pulsarClient,
                                 String stateStorageServiceUrl,
                                 SecretsProvider secretsProvider,
-                                CollectorRegistry collectorRegistry) {
+                                CollectorRegistry collectorRegistry,
+                                String narExtractionDirectory) {
         this.instanceConfig = instanceConfig;
         this.fnCache = fnCache;
         this.jarFile = jarFile;
@@ -148,6 +150,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.secretsProvider = secretsProvider;
         this.collectorRegistry = collectorRegistry;
+        this.narExtractionDirectory = narExtractionDirectory;
         this.metricsLabels = new String[]{
                 instanceConfig.getFunctionDetails().getTenant(),
                 String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(),
@@ -304,7 +307,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             fnCache.registerFunctionInstanceWithArchive(
                 instanceConfig.getFunctionId(),
                 instanceConfig.getInstanceName(),
-                jarFile);
+                jarFile, narExtractionDirectory);
         } catch (FileNotFoundException e) {
             // create the function class loader
             fnCache.registerFunctionInstance(
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 56d80ae..691b1fe 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -59,7 +59,7 @@ public class JavaInstanceRunnableTest {
     private JavaInstanceRunnable createRunnable(String outputSerde) throws Exception {
         InstanceConfig config = createInstanceConfig(outputSerde);
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                config, null, null, null, null, null, null);
+                config, null, null, null, null, null, null, null);
         return javaInstanceRunnable;
     }
 
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 2d8b72f..57209de 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -67,6 +68,7 @@ public class LocalRunner {
 
     private final AtomicBoolean running = new AtomicBoolean(false);
     private final List<RuntimeSpawner> spawners = new LinkedList<>();
+    private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
 
     public enum RuntimeEnv {
         THREAD,
@@ -256,14 +258,14 @@ public class LocalRunner {
 
                 if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
                     File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
-                    functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file));
+                    functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory));
 
                 } else {
                     File file = new File(userCodeFile);
                     if (!file.exists()) {
                         throw new RuntimeException("Source archive (" + userCodeFile + ") does not exist");
                     }
-                    functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file));
+                    functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory));
                 }
             } else if (sinkConfig != null) {
                 inferMissingArguments(sinkConfig);
@@ -284,13 +286,13 @@ public class LocalRunner {
 
                 if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
                     File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
-                    functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file));
+                    functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory));
                 } else {
                     File file = new File(userCodeFile);
                     if (!file.exists()) {
                         throw new RuntimeException("Sink archive does not exist");
                     }
-                    functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file));
+                    functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory));
                 }
             } else {
                 throw new IllegalArgumentException("Must specify Function, Source or Sink config");
@@ -346,6 +348,7 @@ public class LocalRunner {
                 null, /* python instance file */
                 null, /* log directory */
                 null, /* extra dependencies dir */
+                narExtractionDirectory, /* nar extraction dir */
                 new DefaultSecretsProviderConfigurator(), false, Optional.empty(), Optional.empty())) {
 
             for (int i = 0; i < parallelism; ++i) {
@@ -407,7 +410,7 @@ public class LocalRunner {
                 serviceUrl,
                 stateStorageServiceUrl,
                 authConfig,
-                new ClearTextSecretsProvider(), null, null);
+                new ClearTextSecretsProvider(), null, narExtractionDirectory, null);
         for (int i = 0; i < parallelism; ++i) {
             InstanceConfig instanceConfig = new InstanceConfig();
             instanceConfig.setFunctionDetails(functionDetails);
@@ -461,6 +464,6 @@ public class LocalRunner {
             pulsarHome = Paths.get("").toAbsolutePath().toString();
         }
         String connectorsDir = Paths.get(pulsarHome, "connectors").toString();
-        return ConnectorUtils.searchForConnectors(connectorsDir);
+        return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory);
     }
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index 970047f..f7104df 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -41,6 +41,7 @@ import io.prometheus.client.hotspot.VersionInfoExports;
 import io.prometheus.jmx.JmxCollector;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -125,6 +126,9 @@ public class JavaInstanceStarter implements AutoCloseable {
     @Parameter(names = "--cluster_name", description = "The name of the cluster this instance is running on", required = true)
     public String clusterName;
 
+    @Parameter(names = "--nar_extraction_directory", description = "The directory where extraction of nar packages happen", required = false)
+    public String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
+
     @Parameter(names = "--pending_async_requests", description = "Max pending async requests per instance", required = false)
     public int maxPendingAsyncRequests = 1000;
 
@@ -198,7 +202,7 @@ public class JavaInstanceStarter implements AutoCloseable {
                         .tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection))
                         .tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
                         .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
-                secretsProvider, collectorRegistry, rootClassLoader);
+                secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
         runtimeSpawner = new RuntimeSpawner(
                 instanceConfig,
                 jarFile,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 351fb67..ca70b42 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -70,7 +70,8 @@ public class RuntimeUtils {
                                           Boolean installUserCodeDependencies,
                                           String pythonDependencyRepository,
                                           String pythonExtraDependencyRepository,
-                                          int metricsPort) throws Exception {
+                                          int metricsPort,
+                                          String narExtractionDirectory) throws Exception {
 
         final List<String> cmd = getArgsBeforeCmd(instanceConfig, extraDependenciesDir);
 
@@ -79,7 +80,7 @@ public class RuntimeUtils {
                 authConfig, shardId, grpcPort, expectedHealthCheckInterval,
                 logConfigFile, secretsProviderClassName, secretsProviderConfig,
                 installUserCodeDependencies, pythonDependencyRepository,
-                pythonExtraDependencyRepository, metricsPort));
+                pythonExtraDependencyRepository, metricsPort, narExtractionDirectory));
         return cmd;
     }
 
@@ -244,7 +245,8 @@ public class RuntimeUtils {
                                       Boolean installUserCodeDependencies,
                                       String pythonDependencyRepository,
                                       String pythonExtraDependencyRepository,
-                                      int metricsPort) throws Exception {
+                                      int metricsPort,
+                                      String narExtractionDirectory) throws Exception {
         final List<String> args = new LinkedList<>();
 
         if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
@@ -386,6 +388,13 @@ public class RuntimeUtils {
 
         args.add("--cluster_name");
         args.add(instanceConfig.getClusterName());
+
+        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
+            if (!StringUtils.isEmpty(narExtractionDirectory)) {
+                args.add("--nar_extraction_directory");
+                args.add(narExtractionDirectory);
+            }
+        }
         return args;
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index ca6c353..56f6aaf 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -145,6 +145,7 @@ public class KubernetesRuntime implements Runtime {
     private final AuthenticationConfig authConfig;
     private Integer grpcPort;
     private Integer metricsPort;
+    private String narExtractionDirectory;
     private final Optional<KubernetesManifestCustomizer> manifestCustomizer;
 
     KubernetesRuntime(AppsV1Api appsClient,
@@ -177,6 +178,7 @@ public class KubernetesRuntime implements Runtime {
                       boolean authenticationEnabled,
                       Integer grpcPort,
                       Integer metricsPort,
+                      String narExtractionDirectory,
                       Optional<KubernetesManifestCustomizer> manifestCustomizer) throws Exception {
         this.appsClient = appsClient;
         this.coreClient = coreClient;
@@ -219,6 +221,7 @@ public class KubernetesRuntime implements Runtime {
 
         this.grpcPort = grpcPort;
         this.metricsPort = metricsPort;
+        this.narExtractionDirectory = narExtractionDirectory;
 
         this.processArgs = new LinkedList<>();
         this.processArgs.addAll(RuntimeUtils.getArgsBeforeCmd(instanceConfig, extraDependenciesDir));
@@ -245,7 +248,8 @@ public class KubernetesRuntime implements Runtime {
                         installUserCodeDependencies,
                         pythonDependencyRepository,
                         pythonExtraDependencyRepository,
-                        metricsPort));
+                        metricsPort,
+                        narExtractionDirectory));
 
         doChecks(instanceConfig.getFunctionDetails());
     }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index 46a3216..6414783 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -93,6 +93,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
     private boolean authenticationEnabled;
     private Integer grpcPort;
     private Integer metricsPort;
+    private String narExtractionDirectory;
 
     @ToString.Exclude
     @EqualsAndHashCode.Exclude
@@ -229,6 +230,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
 
         this.grpcPort = factoryConfig.getGrpcPort();
         this.metricsPort = factoryConfig.getMetricsPort();
+        this.narExtractionDirectory = factoryConfig.getNarExtractionDirectory();
     }
 
     @Override
@@ -292,6 +294,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
             authenticationEnabled,
             grpcPort,
             metricsPort,
+            narExtractionDirectory,
             manifestCustomizer);
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
index f017342..61b1f10 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.runtime.kubernetes;
 import lombok.Data;
 import lombok.experimental.Accessors;
 import org.apache.pulsar.common.configuration.FieldContext;
+import org.apache.pulsar.common.nar.NarClassLoader;
 
 import java.util.Map;
 
@@ -139,4 +140,10 @@ public class KubernetesRuntimeFactoryConfig {
       doc = "The port inside the function pod on which prometheus metrics are exposed"
     )
     private Integer metricsPort = 9094;
+
+    @FieldContext(
+       doc = "The directory inside  the function pod where nar packages will be extracted"
+    )
+    private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
+
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
index 91e2e08..db2d8c1 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
@@ -76,12 +76,14 @@ class ProcessRuntime implements Runtime {
     private final Long expectedHealthCheckInterval;
     private final SecretsProviderConfigurator secretsProviderConfigurator;
     private final String extraDependenciesDir;
+    private final String narExtractionDirectory;
     private static final long GRPC_TIMEOUT_SECS = 5;
     private final String funcLogDir;
 
     ProcessRuntime(InstanceConfig instanceConfig,
                    String instanceFile,
                    String extraDependenciesDir,
+                   String narExtractionDirectory,
                    String logDirectory,
                    String codeFile,
                    String pulsarServiceUrl,
@@ -112,6 +114,7 @@ class ProcessRuntime implements Runtime {
                 break;
         }
         this.extraDependenciesDir = extraDependenciesDir;
+        this.narExtractionDirectory = narExtractionDirectory;
         this.processArgs = RuntimeUtils.composeCmd(
             instanceConfig,
             instanceFile,
@@ -133,7 +136,7 @@ class ProcessRuntime implements Runtime {
             false,
             null,
             null,
-                this.metricsPort);
+                this.metricsPort, narExtractionDirectory);
     }
 
     /**
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
index f0c0db2..6892f8f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
@@ -58,6 +58,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
     private String pythonInstanceFile;
     private String logDirectory;
     private String extraDependenciesDir;
+    private String narExtractionDirectory;
 
     @ToString.Exclude
     @EqualsAndHashCode.Exclude
@@ -78,13 +79,14 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
                                  String pythonInstanceFile,
                                  String logDirectory,
                                  String extraDependenciesDir,
+                                 String narExtractionDirectory,
                                  SecretsProviderConfigurator secretsProviderConfigurator,
                                  boolean authenticationEnabled,
                                  Optional<FunctionAuthProvider> functionAuthProvider,
                                  Optional<RuntimeCustomizer> runtimeCustomizer) {
 
         initialize(pulsarServiceUrl, stateStorageServiceUrl, authConfig, javaInstanceJarFile,
-                pythonInstanceFile, logDirectory, extraDependenciesDir,
+                pythonInstanceFile, logDirectory, extraDependenciesDir, narExtractionDirectory,
                 secretsProviderConfigurator, authenticationEnabled, functionAuthProvider, runtimeCustomizer);
     }
 
@@ -103,6 +105,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
                 factoryConfig.getPythonInstanceLocation(),
                 factoryConfig.getLogDirectory(),
                 factoryConfig.getExtraFunctionDependenciesDir(),
+                workerConfig.getNarExtractionDirectory(),
                 secretsProviderConfigurator,
                 workerConfig.isAuthenticationEnabled(),
                 authProvider,
@@ -116,6 +119,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
                             String pythonInstanceFile,
                             String logDirectory,
                             String extraDependenciesDir,
+                            String narExtractionDirectory,
                             SecretsProviderConfigurator secretsProviderConfigurator,
                             boolean authenticationEnabled,
                             Optional<FunctionAuthProvider> functionAuthProvider,
@@ -127,6 +131,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
         this.javaInstanceJarFile = javaInstanceJarFile;
         this.pythonInstanceFile = pythonInstanceFile;
         this.extraDependenciesDir = extraDependenciesDir;
+        this.narExtractionDirectory = narExtractionDirectory;
         this.logDirectory = logDirectory;
         this.authenticationEnabled = authenticationEnabled;
 
@@ -209,6 +214,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
             instanceConfig,
             instanceFile,
             extraDependenciesDir,
+            narExtractionDirectory,
             logDirectory,
             codeFile,
             pulsarServiceUrl,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index 3dea623..ffbed6b 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -57,6 +57,7 @@ public class ThreadRuntime implements Runtime {
     private String stateStorageServiceUrl;
     private SecretsProvider secretsProvider;
     private CollectorRegistry collectorRegistry;
+    private String narExtractionDirectory;
     ThreadRuntime(InstanceConfig instanceConfig,
                   FunctionCacheManager fnCache,
                   ThreadGroup threadGroup,
@@ -64,7 +65,8 @@ public class ThreadRuntime implements Runtime {
                   PulsarClient pulsarClient,
                   String stateStorageServiceUrl,
                   SecretsProvider secretsProvider,
-                  CollectorRegistry collectorRegistry) {
+                  CollectorRegistry collectorRegistry,
+                  String narExtractionDirectory) {
         this.instanceConfig = instanceConfig;
         if (instanceConfig.getFunctionDetails().getRuntime() != Function.FunctionDetails.Runtime.JAVA) {
             throw new RuntimeException("Thread Container only supports Java Runtime");
@@ -77,6 +79,7 @@ public class ThreadRuntime implements Runtime {
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.secretsProvider = secretsProvider;
         this.collectorRegistry = collectorRegistry;
+        this.narExtractionDirectory = narExtractionDirectory;
         this.javaInstanceRunnable = new JavaInstanceRunnable(
                 instanceConfig,
                 fnCache,
@@ -84,7 +87,8 @@ public class ThreadRuntime implements Runtime {
                 pulsarClient,
                 stateStorageServiceUrl,
                 secretsProvider,
-                collectorRegistry);
+                collectorRegistry,
+                narExtractionDirectory);
     }
 
     /**
@@ -100,7 +104,8 @@ public class ThreadRuntime implements Runtime {
                 pulsarClient,
                 stateStorageServiceUrl,
                 secretsProvider,
-                collectorRegistry);
+                collectorRegistry,
+                narExtractionDirectory);
 
         log.info("ThreadContainer starting function with instance config {}", instanceConfig);
         this.fnThread = new Thread(threadGroup, javaInstanceRunnable,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 5ba741b..c675418 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -59,22 +59,24 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     private String storageServiceUrl;
     private SecretsProvider secretsProvider;
     private CollectorRegistry collectorRegistry;
+    private String narExtractionDirectory;
     private volatile boolean closed;
 
     public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl,
                                 AuthenticationConfig authConfig, SecretsProvider secretsProvider,
-                                CollectorRegistry collectorRegistry, ClassLoader rootClassLoader) throws Exception {
+                                CollectorRegistry collectorRegistry, String narExtractionDirectory,
+                                ClassLoader rootClassLoader) throws Exception {
         initialize(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig),
-                storageServiceUrl, secretsProvider, collectorRegistry, rootClassLoader);
+                storageServiceUrl, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
     }
 
     @VisibleForTesting
     public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
                                 SecretsProvider secretsProvider, CollectorRegistry collectorRegistry,
-                                ClassLoader rootClassLoader) {
+                                String narExtractionDirectory, ClassLoader rootClassLoader) {
 
         initialize(threadGroupName, pulsarClient, storageServiceUrl,
-                secretsProvider, collectorRegistry, rootClassLoader);
+                secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
     }
 
     private static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
@@ -100,7 +102,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
 
     private void initialize(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
                             SecretsProvider secretsProvider, CollectorRegistry collectorRegistry,
-                            ClassLoader rootClassLoader) {
+                            String narExtractionDirectory, ClassLoader rootClassLoader) {
         if (rootClassLoader == null) {
             rootClassLoader = Thread.currentThread().getContextClassLoader();
         }
@@ -111,6 +113,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
         this.pulsarClient = pulsarClient;
         this.storageServiceUrl = storageServiceUrl;
         this.collectorRegistry = collectorRegistry;
+        this.narExtractionDirectory = narExtractionDirectory;
     }
 
     @Override
@@ -124,7 +127,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
         initialize(factoryConfig.getThreadGroupName(),
                 createPulsarClient(workerConfig.getPulsarServiceUrl(), authenticationConfig),
                 workerConfig.getStateStorageServiceUrl(), new ClearTextSecretsProvider(),
-                null, null);
+                null, workerConfig.getNarExtractionDirectory(), null);
     }
 
     @Override
@@ -139,7 +142,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
             pulsarClient,
             storageServiceUrl,
             secretsProvider,
-            collectorRegistry);
+            collectorRegistry,
+            narExtractionDirectory);
     }
 
     @Override
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 7cd7186..4fd21f0 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.common.functions.Resources;
 
 import lombok.Data;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
 import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
 import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
@@ -135,6 +136,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     )
     private String connectorsDirectory = "./connectors";
     @FieldContext(
+        category = CATEGORY_CONNECTORS,
+        doc = "The directory where nar packages are extractors"
+    )
+    private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
+    @FieldContext(
         category = CATEGORY_FUNC_METADATA_MNG,
         doc = "The pulsar topic used for storing function metadata"
     )
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 7cd3819..ab6325e 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -71,6 +71,7 @@ public class KubernetesRuntimeTest {
     private static final Map<String, ConsumerSpec> topicsToSchema = new HashMap<>();
     private static final Function.Resources RESOURCES = Function.Resources.newBuilder()
             .setRam(1000L).setCpu(1).setDisk(10000L).build();
+    private static final String narExtractionDirectory = "/tmp/foo";
 
     static {
         topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", "");
@@ -201,6 +202,7 @@ public class KubernetesRuntimeTest {
         kubernetesRuntimeFactoryConfig.setChangeConfigMap(null);
         kubernetesRuntimeFactoryConfig.setGrpcPort(4332);
         kubernetesRuntimeFactoryConfig.setMetricsPort(4331);
+        kubernetesRuntimeFactoryConfig.setNarExtractionDirectory(narExtractionDirectory);
         workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
         workerConfig.setFunctionRuntimeFactoryConfigs(
                 ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig, Map.class));
@@ -358,14 +360,14 @@ public class KubernetesRuntimeTest {
         if (null != depsDir) {
             extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
             classpath = classpath + ":" + depsDir + "/*";
-            totalArgs = 35;
+            totalArgs = 37;
             portArg = 26;
             metricsPortArg = 28;
         } else {
             extraDepsEnv = "";
             portArg = 25;
             metricsPortArg = 27;
-            totalArgs = 34;
+            totalArgs = 36;
         }
         if (secretsAttached) {
             totalArgs += 4;
@@ -394,7 +396,7 @@ public class KubernetesRuntimeTest {
             expectedArgs += " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
                     + " --secrets_provider_config '{\"Somevalue\":\"myvalue\"}'";
         }
-        expectedArgs += " --cluster_name standalone";
+        expectedArgs += " --cluster_name standalone --nar_extraction_directory " + narExtractionDirectory;
 
         assertEquals(String.join(" ", args), expectedArgs);
 
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index a7b63d8..83477c1 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -58,6 +58,7 @@ import org.testng.annotations.Test;
  * Unit test of {@link ThreadRuntime}.
  */
 public class ProcessRuntimeTest {
+    private String narExtractionDirectory = "/tmp/foo";
 
     class TestSecretsProviderConfigurator implements SecretsProviderConfigurator {
 
@@ -148,6 +149,7 @@ public class ProcessRuntimeTest {
         workerConfig.setPulsarServiceUrl(pulsarServiceUrl);
         workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
         workerConfig.setAuthenticationEnabled(false);
+        workerConfig.setNarExtractionDirectory(narExtractionDirectory);
 
         ProcessRuntimeFactoryConfig processRuntimeFactoryConfig = new ProcessRuntimeFactoryConfig();
         processRuntimeFactoryConfig.setJavaInstanceJarLocation(javaInstanceJarFile);
@@ -274,13 +276,13 @@ public class ProcessRuntimeTest {
         int portArg;
         int metricsPortArg;
         if (null != depsDir) {
-            assertEquals(args.size(), 37);
+            assertEquals(args.size(), 39);
             extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir.toString();
             classpath = classpath + ":" + depsDir + "/*";
             portArg = 24;
             metricsPortArg = 26;
         } else {
-            assertEquals(args.size(), 36);
+            assertEquals(args.size(), 38);
             extraDepsEnv = "";
             portArg = 23;
             metricsPortArg = 25;
@@ -303,7 +305,7 @@ public class ProcessRuntimeTest {
                 + " --expected_healthcheck_interval 30"
                 + " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
                 + " --secrets_provider_config '{\"Config\":\"Value\"}'"
-                + " --cluster_name standalone";
+                + " --cluster_name standalone --nar_extraction_directory " + narExtractionDirectory;
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 7c22518..1bd99b2 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -298,11 +298,12 @@ public class FunctionCommon {
         return objectClass;
     }
 
-    public static NarClassLoader extractNarClassLoader(Path archivePath, File packageFile) {
+    public static NarClassLoader extractNarClassLoader(Path archivePath, File packageFile,
+                                                       String narExtractionDirectory) {
         if (archivePath != null) {
             try {
                 return NarClassLoader.getFromArchive(archivePath.toFile(),
-                            Collections.emptySet());
+                            Collections.emptySet(), narExtractionDirectory);
             } catch (IOException e) {
                 throw new IllegalArgumentException(String.format("The archive %s is corrupted", archivePath));
             }
@@ -311,7 +312,7 @@ public class FunctionCommon {
         if (packageFile != null) {
             try {
                 return NarClassLoader.getFromArchive(packageFile,
-                        Collections.emptySet());
+                        Collections.emptySet(), narExtractionDirectory);
             } catch (IOException e) {
                 throw new IllegalArgumentException(e.getMessage());
             }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 0c50265..57aa8ec 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -302,7 +302,7 @@ public class SinkConfigUtils {
     }
 
     public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archivePath,
-                                          File sinkPackageFile) {
+                                          File sinkPackageFile, String narExtractionDirectory) {
         if (isEmpty(sinkConfig.getTenant())) {
             throw new IllegalArgumentException("Sink tenant cannot be null");
         }
@@ -356,7 +356,7 @@ public class SinkConfigUtils {
             jarClassLoaderException = e;
         }
         try {
-            narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sinkPackageFile);
+            narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sinkPackageFile, narExtractionDirectory);
         } catch (Exception e) {
             narClassLoaderException = e;
         }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index c3d6d0a..b184ef0 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -208,7 +208,8 @@ public class SourceConfigUtils {
         return sourceConfig;
     }
 
-    public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path archivePath, File sourcePackageFile) {
+    public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path archivePath,
+                                                  File sourcePackageFile, String narExtractionDirectory) {
         if (isEmpty(sourceConfig.getTenant())) {
             throw new IllegalArgumentException("Source tenant cannot be null");
         }
@@ -249,7 +250,7 @@ public class SourceConfigUtils {
             jarClassLoaderException = e;
         }
         try {
-            narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile);
+            narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile, narExtractionDirectory);
         } catch (Exception e) {
             narClassLoaderException = e;
         }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
index 5dbb804..81d39d9 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
@@ -71,8 +71,10 @@ public class FunctionCacheEntry implements AutoCloseable {
         this.executionHolders = new HashSet<>(Collections.singleton(initialInstanceId));
     }
 
-    FunctionCacheEntry(String narArchive, String initialInstanceId, ClassLoader rootClassLoader) throws IOException {
-        this.classLoader = NarClassLoader.getFromArchive(new File(narArchive), Collections.emptySet(), rootClassLoader);
+    FunctionCacheEntry(String narArchive, String initialInstanceId, ClassLoader rootClassLoader,
+                       String narExtractionDirectory) throws IOException {
+        this.classLoader = NarClassLoader.getFromArchive(new File(narArchive), Collections.emptySet(),
+                rootClassLoader, narExtractionDirectory);
         this.classpaths = Collections.emptySet();
         this.jarFiles = Collections.singleton(narArchive);
         this.executionHolders = new HashSet<>(Collections.singleton(initialInstanceId));
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManager.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManager.java
index 427e263..58508ec 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManager.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManager.java
@@ -60,7 +60,9 @@ public interface FunctionCacheManager extends AutoCloseable {
                                   List<URL> requiredClasspaths)
         throws IOException;
 
-    void registerFunctionInstanceWithArchive(String fid, String eid, String narArchive) throws IOException;
+    void registerFunctionInstanceWithArchive(String fid, String eid,
+                                             String narArchive,
+                                             String narExtractionDirectory) throws IOException;
 
     /**
      * Unregisters a job from the function cache manager.
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
index b5f7e42..73b7222 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
@@ -110,7 +110,8 @@ public class FunctionCacheManagerImpl implements FunctionCacheManager {
     }
 
     @Override
-    public void registerFunctionInstanceWithArchive(String fid, String eid, String narArchive) throws IOException {
+    public void registerFunctionInstanceWithArchive(String fid, String eid,
+                                                    String narArchive, String narExtractionDirectory) throws IOException {
         if (fid == null) {
             throw new NullPointerException("FunctionID not set");
         }
@@ -125,7 +126,7 @@ public class FunctionCacheManagerImpl implements FunctionCacheManager {
 
             // Create new cache entry
             try {
-                cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader));
+                cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader, narExtractionDirectory));
             } catch (Throwable cause) {
                 Exceptions.rethrowIOException(cause);
             }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index 2b78bf0..a78c24c 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -98,15 +98,15 @@ public class ConnectorUtils {
         return conf.getSinkClass();
     }
 
-    public static ConnectorDefinition getConnectorDefinition(String narPath) throws IOException {
-        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
+    public static ConnectorDefinition getConnectorDefinition(String narPath, String narExtractionDirectory) throws IOException {
+        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
             String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
 
             return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
         }
     }
 
-    public static Connectors searchForConnectors(String connectorsDirectory) throws IOException {
+    public static Connectors searchForConnectors(String connectorsDirectory, String narExtractionDirectory) throws IOException {
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
@@ -120,7 +120,7 @@ public class ConnectorUtils {
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
                 try {
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(archive.toString());
+                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(archive.toString(), narExtractionDirectory);
                     log.info("Found connector {} from {}", cntDef, archive);
 
                     if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
index 02e3fc4..40156eb 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
@@ -31,7 +31,7 @@ public class ConnectorsManager {
     private Connectors connectors;
 
     public ConnectorsManager(WorkerConfig workerConfig) throws IOException {
-        this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory());
+        this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory());
     }
 
     public List<ConnectorDefinition> getConnectors() {
@@ -47,6 +47,6 @@ public class ConnectorsManager {
     }
 
     public void reloadConnectors(WorkerConfig workerConfig) throws IOException {
-        this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory());
+        this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory());
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index eef95a1..0a95755 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -398,7 +398,7 @@ public class FunctionActioner {
             SourceSpec sourceSpec = functionDetails.getSource();
             if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
                 File archive = connectorsManager.getSourceArchive(sourceSpec.getBuiltin()).toFile();
-                String sourceClass = ConnectorUtils.getConnectorDefinition(archive.toString()).getSourceClass();
+                String sourceClass = ConnectorUtils.getConnectorDefinition(archive.toString(), workerConfig.getNarExtractionDirectory()).getSourceClass();
                 SourceSpec.Builder builder = SourceSpec.newBuilder(functionDetails.getSource());
                 builder.setClassName(sourceClass);
                 functionDetails.setSource(builder);
@@ -412,7 +412,7 @@ public class FunctionActioner {
             SinkSpec sinkSpec = functionDetails.getSink();
             if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
                 File archive = connectorsManager.getSinkArchive(sinkSpec.getBuiltin()).toFile();
-                String sinkClass = ConnectorUtils.getConnectorDefinition(archive.toString()).getSinkClass();
+                String sinkClass = ConnectorUtils.getConnectorDefinition(archive.toString(), workerConfig.getNarExtractionDirectory()).getSinkClass();
                 SinkSpec.Builder builder = SinkSpec.newBuilder(functionDetails.getSink());
                 builder.setClassName(sinkClass);
                 functionDetails.setSink(builder);
@@ -427,7 +427,7 @@ public class FunctionActioner {
 
     private void fillSourceTypeClass(FunctionDetails.Builder functionDetails, File archive, String className)
             throws IOException, ClassNotFoundException {
-        try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet())) {
+        try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet(), workerConfig.getNarExtractionDirectory())) {
             String typeArg = getSourceType(className, ncl).getName();
 
             SourceSpec.Builder sourceBuilder = SourceSpec.newBuilder(functionDetails.getSource());
@@ -445,7 +445,7 @@ public class FunctionActioner {
 
     private void fillSinkTypeClass(FunctionDetails.Builder functionDetails, File archive, String className)
             throws IOException, ClassNotFoundException {
-        try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet())) {
+        try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet(), workerConfig.getNarExtractionDirectory())) {
             String typeArg = getSinkType(className, ncl).getName();
 
             SinkSpec.Builder sinkBuilder = SinkSpec.newBuilder(functionDetails.getSink());
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index dd4b0ee..560c8cb 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -688,7 +688,8 @@ public class SinksImpl extends ComponentImpl {
                 throw new IllegalArgumentException(String.format("No Sink archive %s found", archivePath));
             }
         }
-        SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath, componentPackageFile);
+        SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath,
+                componentPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
         return SinkConfigUtils.convert(sinkConfig, sinkDetails);
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 9f74a1d..4f434fa 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -684,7 +684,8 @@ public class SourcesImpl extends ComponentImpl {
                 throw new IllegalArgumentException(String.format("No Source archive %s found", archivePath));
             }
         }
-        SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath, sourcePackageFile);
+        SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath,
+                sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
         return SourceConfigUtils.convert(sourceConfig, sourceDetails);
     }
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 6ee14dc..243f127 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -148,7 +148,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -194,7 +194,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -241,7 +241,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -301,7 +301,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -367,7 +367,7 @@ public class SchedulerManagerTest {
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
         ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider
-                (), new CollectorRegistry(), null);
+                (), new CollectorRegistry(), null, null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -478,7 +478,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -605,7 +605,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
@@ -659,7 +659,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -792,7 +792,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index cd55b2a..bd65a0b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -162,7 +162,7 @@ public class FunctionsImplTest {
         instanceConfig.setMaxBufferedTuples(1024);
 
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                instanceConfig, null, null, null, null, null, null);
+                instanceConfig, null, null, null, null, null, null, null);
         CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<InstanceCommunication.MetricsData>();
         metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics());
         Runtime runtime = mock(Runtime.class);
@@ -208,7 +208,7 @@ public class FunctionsImplTest {
         instanceConfig.setMaxBufferedTuples(1024);
 
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                instanceConfig, null, null, null, null, null, null);
+                instanceConfig, null, null, null, null, null, null, null);
         CompletableFuture<InstanceCommunication.MetricsData> completableFuture = new CompletableFuture<InstanceCommunication.MetricsData>();
         completableFuture.complete(javaInstanceRunnable.getMetrics());
         Runtime runtime = mock(Runtime.class);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index b9aeccb..bc117ab 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -817,7 +817,7 @@ public class SinkApiV3ResourceTest {
         FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
 
         doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
-        FunctionCommon.extractNarClassLoader(any(), any());
+        FunctionCommon.extractNarClassLoader(any(), any(), any());
 
         doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
         FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
@@ -888,7 +888,7 @@ public class SinkApiV3ResourceTest {
         FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
 
         doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
-        FunctionCommon.extractNarClassLoader(any(), any());
+        FunctionCommon.extractNarClassLoader(any(), any(), any());
 
         doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
         FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
@@ -986,7 +986,7 @@ public class SinkApiV3ResourceTest {
         PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
 
         doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
-        FunctionCommon.extractNarClassLoader(any(), any());
+        FunctionCommon.extractNarClassLoader(any(), any(), any());
 
         doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
         FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 88dcff4..d13fd3d 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -837,7 +837,7 @@ public class SourceApiV3ResourceTest {
         FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
 
         doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
-        FunctionCommon.extractNarClassLoader(any(), any());
+        FunctionCommon.extractNarClassLoader(any(), any(), any());
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -907,7 +907,7 @@ public class SourceApiV3ResourceTest {
         FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
 
         doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
-        FunctionCommon.extractNarClassLoader(any(), any(File.class));
+        FunctionCommon.extractNarClassLoader(any(), any(File.class), any());
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -1001,7 +1001,7 @@ public class SourceApiV3ResourceTest {
         PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
 
         doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
-        FunctionCommon.extractNarClassLoader(any(), any());
+        FunctionCommon.extractNarClassLoader(any(), any(), any());
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index a29aabd..b870888 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -83,7 +83,7 @@ public class PulsarConnectorCache {
 
         OffloadPolicies offloadPolicies = new OffloadPolicies();
         BeanUtils.copyProperties(offloadPolicies, pulsarConnectorConfig);
-        this.defaultOffloader = initManagedLedgerOffloader(offloadPolicies);
+        this.defaultOffloader = initManagedLedgerOffloader(offloadPolicies, pulsarConnectorConfig);
     }
 
     public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
@@ -117,7 +117,8 @@ public class PulsarConnectorCache {
         return new ManagedLedgerFactoryImpl(bkClientConfiguration, managedLedgerFactoryConfig);
     }
 
-    public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, OffloadPolicies offloadPolicies) {
+    public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, OffloadPolicies offloadPolicies,
+                                                      PulsarConnectorConfig pulsarConnectorConfig) {
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         if (offloadPolicies == null) {
             managedLedgerConfig.setLedgerOffloader(this.defaultOffloader);
@@ -130,7 +131,7 @@ public class PulsarConnectorCache {
                             if (offloader != null) {
                                 offloader.close();
                             }
-                            return initManagedLedgerOffloader(offloadPolicies);
+                            return initManagedLedgerOffloader(offloadPolicies, pulsarConnectorConfig);
                         }
                     });
             managedLedgerConfig.setLedgerOffloader(ledgerOffloader);
@@ -147,14 +148,16 @@ public class PulsarConnectorCache {
         return this.offloaderScheduler;
     }
 
-    private LedgerOffloader initManagedLedgerOffloader(OffloadPolicies offloadPolicies) {
+    private LedgerOffloader initManagedLedgerOffloader(OffloadPolicies offloadPolicies,
+                                                       PulsarConnectorConfig pulsarConnectorConfig) {
 
         try {
             if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) {
                 checkNotNull(offloadPolicies.getOffloadersDirectory(),
                         "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
                         offloadPolicies.getManagedLedgerOffloadDriver());
-                this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory());
+                this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory(),
+                        pulsarConnectorConfig.getNarExtractionDirectory());
                 LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
                         offloadPolicies.getManagedLedgerOffloadDriver());
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index b2b2755..49d2ae3 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.NamedEntity;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.protocol.Commands;
 
 /**
@@ -74,6 +75,9 @@ public class PulsarConnectorConfig implements AutoCloseable {
     private int managedLedgerNumWorkerThreads = Runtime.getRuntime().availableProcessors();
     private int managedLedgerNumSchedulerThreads = Runtime.getRuntime().availableProcessors();
 
+    // --- Nar extraction
+    private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
+
     @NotNull
     public String getBrokerServiceUrl() {
         return brokerServiceUrl;
@@ -358,6 +362,17 @@ public class PulsarConnectorConfig implements AutoCloseable {
         return this;
     }
 
+    // --- Nar extraction config
+    public String getNarExtractionDirectory() {
+        return narExtractionDirectory;
+    }
+
+    @Config("pulsar.nar-extraction-directory")
+    public PulsarConnectorConfig setNarExtractionDirectory(String narExtractionDirectory) {
+        this.narExtractionDirectory = narExtractionDirectory;
+        return this;
+    }
+
     @NotNull
     public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
         if (this.pulsarAdmin == null) {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 003c51f..d7e2e2f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -124,7 +124,8 @@ public class PulsarRecordCursor implements RecordCursor {
                 pulsarConnectorCache.getManagedLedgerFactory(),
                 pulsarConnectorCache.getManagedLedgerConfig(
                         TopicName.get("persistent", NamespaceName.get(pulsarSplit.getSchemaName()),
-                                pulsarSplit.getTableName()).getNamespaceObject(), offloadPolicies),
+                                pulsarSplit.getTableName()).getNamespaceObject(), offloadPolicies,
+                        pulsarConnectorConfig),
                 new PulsarConnectorMetricsTracker(pulsarConnectorCache.getStatsProvider()));
     }