You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/05/11 19:15:03 UTC
[pulsar] branch master updated: Make Nar Extraction Directory
configurable (#6933)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 54f8d13 Make Nar Extraction Directory configurable (#6933)
54f8d13 is described below
commit 54f8d13ba2688797caf2f553062e97dfb1b66bec
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon May 11 12:14:43 2020 -0700
Make Nar Extraction Directory configurable (#6933)
* Make Nar Extraction Directory configurable
* Fixed unittests
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../bookkeeper/mledger/offload/OffloaderUtils.java | 15 ++++++++-------
.../org/apache/pulsar/broker/ServiceConfiguration.java | 7 +++++++
.../java/org/apache/pulsar/broker/PulsarService.java | 2 +-
.../pulsar/broker/protocol/ProtocolHandlerUtils.java | 14 ++++++++------
.../pulsar/broker/protocol/ProtocolHandlers.java | 4 ++--
.../broker/protocol/ProtocolHandlerUtilsTest.java | 15 +++++++++------
.../org/apache/pulsar/common/nar/NarClassLoader.java | 16 +++++++++++-----
.../apache/pulsar/common/util/SearchBcNarUtils.java | 2 +-
.../functions/instance/JavaInstanceRunnable.java | 7 +++++--
.../functions/instance/JavaInstanceRunnableTest.java | 2 +-
.../java/org/apache/pulsar/functions/LocalRunner.java | 15 +++++++++------
.../pulsar/functions/runtime/JavaInstanceStarter.java | 6 +++++-
.../apache/pulsar/functions/runtime/RuntimeUtils.java | 15 ++++++++++++---
.../runtime/kubernetes/KubernetesRuntime.java | 6 +++++-
.../runtime/kubernetes/KubernetesRuntimeFactory.java | 3 +++
.../kubernetes/KubernetesRuntimeFactoryConfig.java | 7 +++++++
.../functions/runtime/process/ProcessRuntime.java | 5 ++++-
.../runtime/process/ProcessRuntimeFactory.java | 8 +++++++-
.../pulsar/functions/runtime/thread/ThreadRuntime.java | 11 ++++++++---
.../functions/runtime/thread/ThreadRuntimeFactory.java | 18 +++++++++++-------
.../apache/pulsar/functions/worker/WorkerConfig.java | 6 ++++++
.../runtime/kubernetes/KubernetesRuntimeTest.java | 8 +++++---
.../functions/runtime/process/ProcessRuntimeTest.java | 8 +++++---
.../apache/pulsar/functions/utils/FunctionCommon.java | 7 ++++---
.../apache/pulsar/functions/utils/SinkConfigUtils.java | 4 ++--
.../pulsar/functions/utils/SourceConfigUtils.java | 5 +++--
.../utils/functioncache/FunctionCacheEntry.java | 6 ++++--
.../utils/functioncache/FunctionCacheManager.java | 4 +++-
.../utils/functioncache/FunctionCacheManagerImpl.java | 5 +++--
.../pulsar/functions/utils/io/ConnectorUtils.java | 8 ++++----
.../pulsar/functions/worker/ConnectorsManager.java | 4 ++--
.../pulsar/functions/worker/FunctionActioner.java | 8 ++++----
.../pulsar/functions/worker/rest/api/SinksImpl.java | 3 ++-
.../pulsar/functions/worker/rest/api/SourcesImpl.java | 3 ++-
.../pulsar/functions/worker/SchedulerManagerTest.java | 18 +++++++++---------
.../functions/worker/rest/api/FunctionsImplTest.java | 4 ++--
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 6 +++---
.../worker/rest/api/v3/SourceApiV3ResourceTest.java | 6 +++---
.../apache/pulsar/sql/presto/PulsarConnectorCache.java | 13 ++++++++-----
.../pulsar/sql/presto/PulsarConnectorConfig.java | 15 +++++++++++++++
.../apache/pulsar/sql/presto/PulsarRecordCursor.java | 3 ++-
41 files changed, 215 insertions(+), 107 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
index 845b53f..5243691 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
@@ -48,11 +48,12 @@ public class OffloaderUtils {
* @return the offloader class name
* @throws IOException when fail to retrieve the pulsar offloader class
*/
- static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String narPath) throws IOException {
+ static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String narPath, String narExtractionDirectory) throws IOException {
// need to load offloader NAR to the classloader that also loaded LedgerOffloaderFactory in case
// LedgerOffloaderFactory is loaded by a classloader that is not the default classloader
// as is the case for the pulsar presto plugin
- NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), LedgerOffloaderFactory.class.getClassLoader());
+ NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(),
+ LedgerOffloaderFactory.class.getClassLoader(), narExtractionDirectory);
String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME);
OffloaderDefinition conf = ObjectMapperFactory.getThreadLocalYaml()
@@ -105,15 +106,15 @@ public class OffloaderUtils {
}
}
- public static OffloaderDefinition getOffloaderDefinition(String narPath) throws IOException {
- try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
+ public static OffloaderDefinition getOffloaderDefinition(String narPath, String narExtractionDirectory) throws IOException {
+ try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME);
return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, OffloaderDefinition.class);
}
}
- public static Offloaders searchForOffloaders(String connectorsDirectory) throws IOException {
+ public static Offloaders searchForOffloaders(String connectorsDirectory, String narExtractionDirectory) throws IOException {
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for offloaders in {}", path);
@@ -127,13 +128,13 @@ public class OffloaderUtils {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
stream.forEach(archive -> {
try {
- OffloaderDefinition definition = getOffloaderDefinition(archive.toString());
+ OffloaderDefinition definition = getOffloaderDefinition(archive.toString(), narExtractionDirectory);
log.info("Found offloader {} from {}", definition, archive);
if (!StringUtils.isEmpty(definition.getOffloaderFactoryClass())) {
// Validate offloader factory class to be present and of the right type
Pair<NarClassLoader, LedgerOffloaderFactory> offloaderFactoryPair =
- getOffloaderFactory(archive.toString());
+ getOffloaderFactory(archive.toString(), narExtractionDirectory);
if (null != offloaderFactoryPair) {
offloaders.getOffloaders().add(offloaderFactoryPair);
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 1d10b8d..34163b4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -34,6 +34,7 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.Commands;
@@ -1564,6 +1565,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int managedLedgerOffloadMaxThreads = 2;
@FieldContext(
+ category = CATEGORY_STORAGE_OFFLOADING,
+ doc = "The directory where nar Extraction of offloaders happens"
+ )
+ private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
+
+ @FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "Maximum prefetch rounds for ledger reading for offloading"
)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 30d1dde..21a73a0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -838,7 +838,7 @@ public class PulsarService implements AutoCloseable {
checkNotNull(offloadPolicies.getOffloadersDirectory(),
"Offloader driver is configured to be '%s' but no offloaders directory is configured.",
offloadPolicies.getManagedLedgerOffloadDriver());
- this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory());
+ this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
offloadPolicies.getManagedLedgerOffloadDriver());
try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtils.java
index 4a64ebf..6ccfff9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtils.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtils.java
@@ -49,8 +49,8 @@ class ProtocolHandlerUtils {
* @return the protocol handler definition
* @throws IOException when fail to load the protocol handler or get the definition
*/
- public static ProtocolHandlerDefinition getProtocolHandlerDefinition(String narPath) throws IOException {
- try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
+ public static ProtocolHandlerDefinition getProtocolHandlerDefinition(String narPath, String narExtractionDirectory) throws IOException {
+ try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
return getProtocolHandlerDefinition(ncl);
}
}
@@ -70,7 +70,8 @@ class ProtocolHandlerUtils {
* @return a collection of protocol handlers
* @throws IOException when fail to load the available protocol handlers from the provided directory.
*/
- public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirectory) throws IOException {
+ public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirectory,
+ String narExtractionDirectory) throws IOException {
Path path = Paths.get(handlersDirectory).toAbsolutePath();
log.info("Searching for protocol handlers in {}", path);
@@ -84,7 +85,7 @@ class ProtocolHandlerUtils {
for (Path archive : stream) {
try {
ProtocolHandlerDefinition phDef =
- ProtocolHandlerUtils.getProtocolHandlerDefinition(archive.toString());
+ ProtocolHandlerUtils.getProtocolHandlerDefinition(archive.toString(), narExtractionDirectory);
log.info("Found protocol handler from {} : {}", archive, phDef);
checkArgument(StringUtils.isNotBlank(phDef.getName()));
@@ -113,11 +114,12 @@ class ProtocolHandlerUtils {
* @param metadata the protocol handler definition.
* @return
*/
- static ProtocolHandlerWithClassLoader load(ProtocolHandlerMetadata metadata) throws IOException {
+ static ProtocolHandlerWithClassLoader load(ProtocolHandlerMetadata metadata,
+ String narExtractionDirectory) throws IOException {
NarClassLoader ncl = NarClassLoader.getFromArchive(
metadata.getArchivePath().toAbsolutePath().toFile(),
Collections.emptySet(),
- ProtocolHandler.class.getClassLoader());
+ ProtocolHandler.class.getClassLoader(), narExtractionDirectory);
ProtocolHandlerDefinition phDef = getProtocolHandlerDefinition(ncl);
if (StringUtils.isBlank(phDef.getHandlerClass())) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java
index b09912b..af0d726 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java
@@ -46,7 +46,7 @@ public class ProtocolHandlers implements AutoCloseable {
*/
public static ProtocolHandlers load(ServiceConfiguration conf) throws IOException {
ProtocolHandlerDefinitions definitions =
- ProtocolHandlerUtils.searchForHandlers(conf.getProtocolHandlerDirectory());
+ ProtocolHandlerUtils.searchForHandlers(conf.getProtocolHandlerDirectory(), conf.getNarExtractionDirectory());
ImmutableMap.Builder<String, ProtocolHandlerWithClassLoader> handlersBuilder = ImmutableMap.builder();
@@ -60,7 +60,7 @@ public class ProtocolHandlers implements AutoCloseable {
ProtocolHandlerWithClassLoader handler;
try {
- handler = ProtocolHandlerUtils.load(definition);
+ handler = ProtocolHandlerUtils.load(definition, conf.getNarExtractionDirectory());
} catch (IOException e) {
log.error("Failed to load the protocol handler for protocol `" + protocol + "`", e);
throw new RuntimeException("Failed to load the protocol handler for protocol `" + protocol + "`");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtilsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtilsTest.java
index e7910dc..3fda508 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtilsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtilsTest.java
@@ -75,10 +75,11 @@ public class ProtocolHandlerUtilsTest {
PowerMockito.when(NarClassLoader.getFromArchive(
any(File.class),
any(Set.class),
- any(ClassLoader.class)
+ any(ClassLoader.class),
+ any(String.class)
)).thenReturn(mockLoader);
- ProtocolHandlerWithClassLoader returnedPhWithCL = ProtocolHandlerUtils.load(metadata);
+ ProtocolHandlerWithClassLoader returnedPhWithCL = ProtocolHandlerUtils.load(metadata, "");
ProtocolHandler returnedPh = returnedPhWithCL.getHandler();
assertSame(mockLoader, returnedPhWithCL.getClassLoader());
@@ -107,11 +108,12 @@ public class ProtocolHandlerUtilsTest {
PowerMockito.when(NarClassLoader.getFromArchive(
any(File.class),
any(Set.class),
- any(ClassLoader.class)
+ any(ClassLoader.class),
+ any(String.class)
)).thenReturn(mockLoader);
try {
- ProtocolHandlerUtils.load(metadata);
+ ProtocolHandlerUtils.load(metadata, "");
fail("Should not reach here");
} catch (IOException ioe) {
// expected
@@ -141,11 +143,12 @@ public class ProtocolHandlerUtilsTest {
PowerMockito.when(NarClassLoader.getFromArchive(
any(File.class),
any(Set.class),
- any(ClassLoader.class)
+ any(ClassLoader.class),
+ any(String.class)
)).thenReturn(mockLoader);
try {
- ProtocolHandlerUtils.load(metadata);
+ ProtocolHandlerUtils.load(metadata, "");
fail("Should not reach here");
} catch (IOException ioe) {
// expected
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index 3fdfef0..e33cbfa 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -135,10 +135,11 @@ public class NarClassLoader extends URLClassLoader {
private static final String TMP_DIR_PREFIX = "pulsar-nar";
- private static final File NAR_CACHE_DIR = new File(System.getProperty("java.io.tmpdir") + "/" + TMP_DIR_PREFIX);
+ public static final String DEFAULT_NAR_EXTRACTION_DIR = System.getProperty("java.io.tmpdir");
- public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars) throws IOException {
- File unpacked = NarUnpacker.unpackNar(narPath, NAR_CACHE_DIR);
+ public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars,
+ String narExtractionDirectory) throws IOException {
+ File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
try {
return new NarClassLoader(unpacked, additionalJars, NarClassLoader.class.getClassLoader());
} catch (ClassNotFoundException | NoClassDefFoundError e) {
@@ -146,9 +147,10 @@ public class NarClassLoader extends URLClassLoader {
}
}
- public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars, ClassLoader parent)
+ public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars, ClassLoader parent,
+ String narExtractionDirectory)
throws IOException {
- File unpacked = NarUnpacker.unpackNar(narPath, NAR_CACHE_DIR);
+ File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
try {
return new NarClassLoader(unpacked, additionalJars, parent);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
@@ -156,6 +158,10 @@ public class NarClassLoader extends URLClassLoader {
}
}
+ private static File getNarExtractionDirectory(String configuredDirectory) {
+ return new File(configuredDirectory + "/" + TMP_DIR_PREFIX);
+ }
+
/**
* Construct a nar class loader.
*
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SearchBcNarUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SearchBcNarUtils.java
index 0dfa469..59bd652 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SearchBcNarUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SearchBcNarUtils.java
@@ -65,7 +65,7 @@ public class SearchBcNarUtils {
NarClassLoader ncl = NarClassLoader.getFromArchive(
new File(narPath),
Collections.emptySet(),
- BCLoader.class.getClassLoader());
+ BCLoader.class.getClassLoader(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);
String configStr = ncl.getServiceDefinition(BC_DEF_NAME);
BcNarDefinition nar = ObjectMapperFactory.getThreadLocalYaml()
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 66cdb60..0fa4fc6 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -133,6 +133,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private final ClassLoader instanceClassLoader;
private ClassLoader functionClassLoader;
+ private String narExtractionDirectory;
public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
@@ -140,7 +141,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
PulsarClient pulsarClient,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
- CollectorRegistry collectorRegistry) {
+ CollectorRegistry collectorRegistry,
+ String narExtractionDirectory) {
this.instanceConfig = instanceConfig;
this.fnCache = fnCache;
this.jarFile = jarFile;
@@ -148,6 +150,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
this.collectorRegistry = collectorRegistry;
+ this.narExtractionDirectory = narExtractionDirectory;
this.metricsLabels = new String[]{
instanceConfig.getFunctionDetails().getTenant(),
String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(),
@@ -304,7 +307,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
fnCache.registerFunctionInstanceWithArchive(
instanceConfig.getFunctionId(),
instanceConfig.getInstanceName(),
- jarFile);
+ jarFile, narExtractionDirectory);
} catch (FileNotFoundException e) {
// create the function class loader
fnCache.registerFunctionInstance(
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 56d80ae..691b1fe 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -59,7 +59,7 @@ public class JavaInstanceRunnableTest {
private JavaInstanceRunnable createRunnable(String outputSerde) throws Exception {
InstanceConfig config = createInstanceConfig(outputSerde);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- config, null, null, null, null, null, null);
+ config, null, null, null, null, null, null, null);
return javaInstanceRunnable;
}
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 2d8b72f..57209de 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -67,6 +68,7 @@ public class LocalRunner {
private final AtomicBoolean running = new AtomicBoolean(false);
private final List<RuntimeSpawner> spawners = new LinkedList<>();
+ private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
public enum RuntimeEnv {
THREAD,
@@ -256,14 +258,14 @@ public class LocalRunner {
if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
- functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file));
+ functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory));
} else {
File file = new File(userCodeFile);
if (!file.exists()) {
throw new RuntimeException("Source archive (" + userCodeFile + ") does not exist");
}
- functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file));
+ functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory));
}
} else if (sinkConfig != null) {
inferMissingArguments(sinkConfig);
@@ -284,13 +286,13 @@ public class LocalRunner {
if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
- functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file));
+ functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory));
} else {
File file = new File(userCodeFile);
if (!file.exists()) {
throw new RuntimeException("Sink archive does not exist");
}
- functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file));
+ functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory));
}
} else {
throw new IllegalArgumentException("Must specify Function, Source or Sink config");
@@ -346,6 +348,7 @@ public class LocalRunner {
null, /* python instance file */
null, /* log directory */
null, /* extra dependencies dir */
+ narExtractionDirectory, /* nar extraction dir */
new DefaultSecretsProviderConfigurator(), false, Optional.empty(), Optional.empty())) {
for (int i = 0; i < parallelism; ++i) {
@@ -407,7 +410,7 @@ public class LocalRunner {
serviceUrl,
stateStorageServiceUrl,
authConfig,
- new ClearTextSecretsProvider(), null, null);
+ new ClearTextSecretsProvider(), null, narExtractionDirectory, null);
for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(functionDetails);
@@ -461,6 +464,6 @@ public class LocalRunner {
pulsarHome = Paths.get("").toAbsolutePath().toString();
}
String connectorsDir = Paths.get(pulsarHome, "connectors").toString();
- return ConnectorUtils.searchForConnectors(connectorsDir);
+ return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory);
}
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index 970047f..f7104df 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -41,6 +41,7 @@ import io.prometheus.client.hotspot.VersionInfoExports;
import io.prometheus.jmx.JmxCollector;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -125,6 +126,9 @@ public class JavaInstanceStarter implements AutoCloseable {
@Parameter(names = "--cluster_name", description = "The name of the cluster this instance is running on", required = true)
public String clusterName;
+ @Parameter(names = "--nar_extraction_directory", description = "The directory where extraction of nar packages happen", required = false)
+ public String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
+
@Parameter(names = "--pending_async_requests", description = "Max pending async requests per instance", required = false)
public int maxPendingAsyncRequests = 1000;
@@ -198,7 +202,7 @@ public class JavaInstanceStarter implements AutoCloseable {
.tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection))
.tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
- secretsProvider, collectorRegistry, rootClassLoader);
+ secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
runtimeSpawner = new RuntimeSpawner(
instanceConfig,
jarFile,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 351fb67..ca70b42 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -70,7 +70,8 @@ public class RuntimeUtils {
Boolean installUserCodeDependencies,
String pythonDependencyRepository,
String pythonExtraDependencyRepository,
- int metricsPort) throws Exception {
+ int metricsPort,
+ String narExtractionDirectory) throws Exception {
final List<String> cmd = getArgsBeforeCmd(instanceConfig, extraDependenciesDir);
@@ -79,7 +80,7 @@ public class RuntimeUtils {
authConfig, shardId, grpcPort, expectedHealthCheckInterval,
logConfigFile, secretsProviderClassName, secretsProviderConfig,
installUserCodeDependencies, pythonDependencyRepository,
- pythonExtraDependencyRepository, metricsPort));
+ pythonExtraDependencyRepository, metricsPort, narExtractionDirectory));
return cmd;
}
@@ -244,7 +245,8 @@ public class RuntimeUtils {
Boolean installUserCodeDependencies,
String pythonDependencyRepository,
String pythonExtraDependencyRepository,
- int metricsPort) throws Exception {
+ int metricsPort,
+ String narExtractionDirectory) throws Exception {
final List<String> args = new LinkedList<>();
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
@@ -386,6 +388,13 @@ public class RuntimeUtils {
args.add("--cluster_name");
args.add(instanceConfig.getClusterName());
+
+ if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
+ if (!StringUtils.isEmpty(narExtractionDirectory)) {
+ args.add("--nar_extraction_directory");
+ args.add(narExtractionDirectory);
+ }
+ }
return args;
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index ca6c353..56f6aaf 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -145,6 +145,7 @@ public class KubernetesRuntime implements Runtime {
private final AuthenticationConfig authConfig;
private Integer grpcPort;
private Integer metricsPort;
+ private String narExtractionDirectory;
private final Optional<KubernetesManifestCustomizer> manifestCustomizer;
KubernetesRuntime(AppsV1Api appsClient,
@@ -177,6 +178,7 @@ public class KubernetesRuntime implements Runtime {
boolean authenticationEnabled,
Integer grpcPort,
Integer metricsPort,
+ String narExtractionDirectory,
Optional<KubernetesManifestCustomizer> manifestCustomizer) throws Exception {
this.appsClient = appsClient;
this.coreClient = coreClient;
@@ -219,6 +221,7 @@ public class KubernetesRuntime implements Runtime {
this.grpcPort = grpcPort;
this.metricsPort = metricsPort;
+ this.narExtractionDirectory = narExtractionDirectory;
this.processArgs = new LinkedList<>();
this.processArgs.addAll(RuntimeUtils.getArgsBeforeCmd(instanceConfig, extraDependenciesDir));
@@ -245,7 +248,8 @@ public class KubernetesRuntime implements Runtime {
installUserCodeDependencies,
pythonDependencyRepository,
pythonExtraDependencyRepository,
- metricsPort));
+ metricsPort,
+ narExtractionDirectory));
doChecks(instanceConfig.getFunctionDetails());
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index 46a3216..6414783 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -93,6 +93,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
private boolean authenticationEnabled;
private Integer grpcPort;
private Integer metricsPort;
+ private String narExtractionDirectory;
@ToString.Exclude
@EqualsAndHashCode.Exclude
@@ -229,6 +230,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
this.grpcPort = factoryConfig.getGrpcPort();
this.metricsPort = factoryConfig.getMetricsPort();
+ this.narExtractionDirectory = factoryConfig.getNarExtractionDirectory();
}
@Override
@@ -292,6 +294,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
authenticationEnabled,
grpcPort,
metricsPort,
+ narExtractionDirectory,
manifestCustomizer);
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
index f017342..61b1f10 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.runtime.kubernetes;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.common.configuration.FieldContext;
+import org.apache.pulsar.common.nar.NarClassLoader;
import java.util.Map;
@@ -139,4 +140,10 @@ public class KubernetesRuntimeFactoryConfig {
doc = "The port inside the function pod on which prometheus metrics are exposed"
)
private Integer metricsPort = 9094;
+
+ @FieldContext(
+ doc = "The directory inside the function pod where nar packages will be extracted"
+ )
+ private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
+
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
index 91e2e08..db2d8c1 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
@@ -76,12 +76,14 @@ class ProcessRuntime implements Runtime {
private final Long expectedHealthCheckInterval;
private final SecretsProviderConfigurator secretsProviderConfigurator;
private final String extraDependenciesDir;
+ private final String narExtractionDirectory;
private static final long GRPC_TIMEOUT_SECS = 5;
private final String funcLogDir;
ProcessRuntime(InstanceConfig instanceConfig,
String instanceFile,
String extraDependenciesDir,
+ String narExtractionDirectory,
String logDirectory,
String codeFile,
String pulsarServiceUrl,
@@ -112,6 +114,7 @@ class ProcessRuntime implements Runtime {
break;
}
this.extraDependenciesDir = extraDependenciesDir;
+ this.narExtractionDirectory = narExtractionDirectory;
this.processArgs = RuntimeUtils.composeCmd(
instanceConfig,
instanceFile,
@@ -133,7 +136,7 @@ class ProcessRuntime implements Runtime {
false,
null,
null,
- this.metricsPort);
+ this.metricsPort, narExtractionDirectory);
}
/**
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
index f0c0db2..6892f8f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
@@ -58,6 +58,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
private String pythonInstanceFile;
private String logDirectory;
private String extraDependenciesDir;
+ private String narExtractionDirectory;
@ToString.Exclude
@EqualsAndHashCode.Exclude
@@ -78,13 +79,14 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
String pythonInstanceFile,
String logDirectory,
String extraDependenciesDir,
+ String narExtractionDirectory,
SecretsProviderConfigurator secretsProviderConfigurator,
boolean authenticationEnabled,
Optional<FunctionAuthProvider> functionAuthProvider,
Optional<RuntimeCustomizer> runtimeCustomizer) {
initialize(pulsarServiceUrl, stateStorageServiceUrl, authConfig, javaInstanceJarFile,
- pythonInstanceFile, logDirectory, extraDependenciesDir,
+ pythonInstanceFile, logDirectory, extraDependenciesDir, narExtractionDirectory,
secretsProviderConfigurator, authenticationEnabled, functionAuthProvider, runtimeCustomizer);
}
@@ -103,6 +105,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
factoryConfig.getPythonInstanceLocation(),
factoryConfig.getLogDirectory(),
factoryConfig.getExtraFunctionDependenciesDir(),
+ workerConfig.getNarExtractionDirectory(),
secretsProviderConfigurator,
workerConfig.isAuthenticationEnabled(),
authProvider,
@@ -116,6 +119,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
String pythonInstanceFile,
String logDirectory,
String extraDependenciesDir,
+ String narExtractionDirectory,
SecretsProviderConfigurator secretsProviderConfigurator,
boolean authenticationEnabled,
Optional<FunctionAuthProvider> functionAuthProvider,
@@ -127,6 +131,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
this.javaInstanceJarFile = javaInstanceJarFile;
this.pythonInstanceFile = pythonInstanceFile;
this.extraDependenciesDir = extraDependenciesDir;
+ this.narExtractionDirectory = narExtractionDirectory;
this.logDirectory = logDirectory;
this.authenticationEnabled = authenticationEnabled;
@@ -209,6 +214,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
instanceConfig,
instanceFile,
extraDependenciesDir,
+ narExtractionDirectory,
logDirectory,
codeFile,
pulsarServiceUrl,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index 3dea623..ffbed6b 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -57,6 +57,7 @@ public class ThreadRuntime implements Runtime {
private String stateStorageServiceUrl;
private SecretsProvider secretsProvider;
private CollectorRegistry collectorRegistry;
+ private String narExtractionDirectory;
ThreadRuntime(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
ThreadGroup threadGroup,
@@ -64,7 +65,8 @@ public class ThreadRuntime implements Runtime {
PulsarClient pulsarClient,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
- CollectorRegistry collectorRegistry) {
+ CollectorRegistry collectorRegistry,
+ String narExtractionDirectory) {
this.instanceConfig = instanceConfig;
if (instanceConfig.getFunctionDetails().getRuntime() != Function.FunctionDetails.Runtime.JAVA) {
throw new RuntimeException("Thread Container only supports Java Runtime");
@@ -77,6 +79,7 @@ public class ThreadRuntime implements Runtime {
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
this.collectorRegistry = collectorRegistry;
+ this.narExtractionDirectory = narExtractionDirectory;
this.javaInstanceRunnable = new JavaInstanceRunnable(
instanceConfig,
fnCache,
@@ -84,7 +87,8 @@ public class ThreadRuntime implements Runtime {
pulsarClient,
stateStorageServiceUrl,
secretsProvider,
- collectorRegistry);
+ collectorRegistry,
+ narExtractionDirectory);
}
/**
@@ -100,7 +104,8 @@ public class ThreadRuntime implements Runtime {
pulsarClient,
stateStorageServiceUrl,
secretsProvider,
- collectorRegistry);
+ collectorRegistry,
+ narExtractionDirectory);
log.info("ThreadContainer starting function with instance config {}", instanceConfig);
this.fnThread = new Thread(threadGroup, javaInstanceRunnable,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 5ba741b..c675418 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -59,22 +59,24 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
private String storageServiceUrl;
private SecretsProvider secretsProvider;
private CollectorRegistry collectorRegistry;
+ private String narExtractionDirectory;
private volatile boolean closed;
public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl,
AuthenticationConfig authConfig, SecretsProvider secretsProvider,
- CollectorRegistry collectorRegistry, ClassLoader rootClassLoader) throws Exception {
+ CollectorRegistry collectorRegistry, String narExtractionDirectory,
+ ClassLoader rootClassLoader) throws Exception {
initialize(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig),
- storageServiceUrl, secretsProvider, collectorRegistry, rootClassLoader);
+ storageServiceUrl, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
}
@VisibleForTesting
public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry,
- ClassLoader rootClassLoader) {
+ String narExtractionDirectory, ClassLoader rootClassLoader) {
initialize(threadGroupName, pulsarClient, storageServiceUrl,
- secretsProvider, collectorRegistry, rootClassLoader);
+ secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
}
private static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
@@ -100,7 +102,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
private void initialize(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry,
- ClassLoader rootClassLoader) {
+ String narExtractionDirectory, ClassLoader rootClassLoader) {
if (rootClassLoader == null) {
rootClassLoader = Thread.currentThread().getContextClassLoader();
}
@@ -111,6 +113,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
this.pulsarClient = pulsarClient;
this.storageServiceUrl = storageServiceUrl;
this.collectorRegistry = collectorRegistry;
+ this.narExtractionDirectory = narExtractionDirectory;
}
@Override
@@ -124,7 +127,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
initialize(factoryConfig.getThreadGroupName(),
createPulsarClient(workerConfig.getPulsarServiceUrl(), authenticationConfig),
workerConfig.getStateStorageServiceUrl(), new ClearTextSecretsProvider(),
- null, null);
+ null, workerConfig.getNarExtractionDirectory(), null);
}
@Override
@@ -139,7 +142,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
pulsarClient,
storageServiceUrl,
secretsProvider,
- collectorRegistry);
+ collectorRegistry,
+ narExtractionDirectory);
}
@Override
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 7cd7186..4fd21f0 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.common.functions.Resources;
import lombok.Data;
import lombok.experimental.Accessors;
+import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
@@ -135,6 +136,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
)
private String connectorsDirectory = "./connectors";
@FieldContext(
+ category = CATEGORY_CONNECTORS,
+ doc = "The directory where nar packages are extractors"
+ )
+ private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
+ @FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulsar topic used for storing function metadata"
)
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 7cd3819..ab6325e 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -71,6 +71,7 @@ public class KubernetesRuntimeTest {
private static final Map<String, ConsumerSpec> topicsToSchema = new HashMap<>();
private static final Function.Resources RESOURCES = Function.Resources.newBuilder()
.setRam(1000L).setCpu(1).setDisk(10000L).build();
+ private static final String narExtractionDirectory = "/tmp/foo";
static {
topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", "");
@@ -201,6 +202,7 @@ public class KubernetesRuntimeTest {
kubernetesRuntimeFactoryConfig.setChangeConfigMap(null);
kubernetesRuntimeFactoryConfig.setGrpcPort(4332);
kubernetesRuntimeFactoryConfig.setMetricsPort(4331);
+ kubernetesRuntimeFactoryConfig.setNarExtractionDirectory(narExtractionDirectory);
workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig, Map.class));
@@ -358,14 +360,14 @@ public class KubernetesRuntimeTest {
if (null != depsDir) {
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
classpath = classpath + ":" + depsDir + "/*";
- totalArgs = 35;
+ totalArgs = 37;
portArg = 26;
metricsPortArg = 28;
} else {
extraDepsEnv = "";
portArg = 25;
metricsPortArg = 27;
- totalArgs = 34;
+ totalArgs = 36;
}
if (secretsAttached) {
totalArgs += 4;
@@ -394,7 +396,7 @@ public class KubernetesRuntimeTest {
expectedArgs += " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
+ " --secrets_provider_config '{\"Somevalue\":\"myvalue\"}'";
}
- expectedArgs += " --cluster_name standalone";
+ expectedArgs += " --cluster_name standalone --nar_extraction_directory " + narExtractionDirectory;
assertEquals(String.join(" ", args), expectedArgs);
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index a7b63d8..83477c1 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -58,6 +58,7 @@ import org.testng.annotations.Test;
* Unit test of {@link ThreadRuntime}.
*/
public class ProcessRuntimeTest {
+ private String narExtractionDirectory = "/tmp/foo";
class TestSecretsProviderConfigurator implements SecretsProviderConfigurator {
@@ -148,6 +149,7 @@ public class ProcessRuntimeTest {
workerConfig.setPulsarServiceUrl(pulsarServiceUrl);
workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
workerConfig.setAuthenticationEnabled(false);
+ workerConfig.setNarExtractionDirectory(narExtractionDirectory);
ProcessRuntimeFactoryConfig processRuntimeFactoryConfig = new ProcessRuntimeFactoryConfig();
processRuntimeFactoryConfig.setJavaInstanceJarLocation(javaInstanceJarFile);
@@ -274,13 +276,13 @@ public class ProcessRuntimeTest {
int portArg;
int metricsPortArg;
if (null != depsDir) {
- assertEquals(args.size(), 37);
+ assertEquals(args.size(), 39);
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir.toString();
classpath = classpath + ":" + depsDir + "/*";
portArg = 24;
metricsPortArg = 26;
} else {
- assertEquals(args.size(), 36);
+ assertEquals(args.size(), 38);
extraDepsEnv = "";
portArg = 23;
metricsPortArg = 25;
@@ -303,7 +305,7 @@ public class ProcessRuntimeTest {
+ " --expected_healthcheck_interval 30"
+ " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
+ " --secrets_provider_config '{\"Config\":\"Value\"}'"
- + " --cluster_name standalone";
+ + " --cluster_name standalone --nar_extraction_directory " + narExtractionDirectory;
assertEquals(String.join(" ", args), expectedArgs);
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 7c22518..1bd99b2 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -298,11 +298,12 @@ public class FunctionCommon {
return objectClass;
}
- public static NarClassLoader extractNarClassLoader(Path archivePath, File packageFile) {
+ public static NarClassLoader extractNarClassLoader(Path archivePath, File packageFile,
+ String narExtractionDirectory) {
if (archivePath != null) {
try {
return NarClassLoader.getFromArchive(archivePath.toFile(),
- Collections.emptySet());
+ Collections.emptySet(), narExtractionDirectory);
} catch (IOException e) {
throw new IllegalArgumentException(String.format("The archive %s is corrupted", archivePath));
}
@@ -311,7 +312,7 @@ public class FunctionCommon {
if (packageFile != null) {
try {
return NarClassLoader.getFromArchive(packageFile,
- Collections.emptySet());
+ Collections.emptySet(), narExtractionDirectory);
} catch (IOException e) {
throw new IllegalArgumentException(e.getMessage());
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 0c50265..57aa8ec 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -302,7 +302,7 @@ public class SinkConfigUtils {
}
public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archivePath,
- File sinkPackageFile) {
+ File sinkPackageFile, String narExtractionDirectory) {
if (isEmpty(sinkConfig.getTenant())) {
throw new IllegalArgumentException("Sink tenant cannot be null");
}
@@ -356,7 +356,7 @@ public class SinkConfigUtils {
jarClassLoaderException = e;
}
try {
- narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sinkPackageFile);
+ narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sinkPackageFile, narExtractionDirectory);
} catch (Exception e) {
narClassLoaderException = e;
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index c3d6d0a..b184ef0 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -208,7 +208,8 @@ public class SourceConfigUtils {
return sourceConfig;
}
- public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path archivePath, File sourcePackageFile) {
+ public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path archivePath,
+ File sourcePackageFile, String narExtractionDirectory) {
if (isEmpty(sourceConfig.getTenant())) {
throw new IllegalArgumentException("Source tenant cannot be null");
}
@@ -249,7 +250,7 @@ public class SourceConfigUtils {
jarClassLoaderException = e;
}
try {
- narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile);
+ narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile, narExtractionDirectory);
} catch (Exception e) {
narClassLoaderException = e;
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
index 5dbb804..81d39d9 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
@@ -71,8 +71,10 @@ public class FunctionCacheEntry implements AutoCloseable {
this.executionHolders = new HashSet<>(Collections.singleton(initialInstanceId));
}
- FunctionCacheEntry(String narArchive, String initialInstanceId, ClassLoader rootClassLoader) throws IOException {
- this.classLoader = NarClassLoader.getFromArchive(new File(narArchive), Collections.emptySet(), rootClassLoader);
+ FunctionCacheEntry(String narArchive, String initialInstanceId, ClassLoader rootClassLoader,
+ String narExtractionDirectory) throws IOException {
+ this.classLoader = NarClassLoader.getFromArchive(new File(narArchive), Collections.emptySet(),
+ rootClassLoader, narExtractionDirectory);
this.classpaths = Collections.emptySet();
this.jarFiles = Collections.singleton(narArchive);
this.executionHolders = new HashSet<>(Collections.singleton(initialInstanceId));
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManager.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManager.java
index 427e263..58508ec 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManager.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManager.java
@@ -60,7 +60,9 @@ public interface FunctionCacheManager extends AutoCloseable {
List<URL> requiredClasspaths)
throws IOException;
- void registerFunctionInstanceWithArchive(String fid, String eid, String narArchive) throws IOException;
+ void registerFunctionInstanceWithArchive(String fid, String eid,
+ String narArchive,
+ String narExtractionDirectory) throws IOException;
/**
* Unregisters a job from the function cache manager.
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
index b5f7e42..73b7222 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
@@ -110,7 +110,8 @@ public class FunctionCacheManagerImpl implements FunctionCacheManager {
}
@Override
- public void registerFunctionInstanceWithArchive(String fid, String eid, String narArchive) throws IOException {
+ public void registerFunctionInstanceWithArchive(String fid, String eid,
+ String narArchive, String narExtractionDirectory) throws IOException {
if (fid == null) {
throw new NullPointerException("FunctionID not set");
}
@@ -125,7 +126,7 @@ public class FunctionCacheManagerImpl implements FunctionCacheManager {
// Create new cache entry
try {
- cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader));
+ cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader, narExtractionDirectory));
} catch (Throwable cause) {
Exceptions.rethrowIOException(cause);
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index 2b78bf0..a78c24c 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -98,15 +98,15 @@ public class ConnectorUtils {
return conf.getSinkClass();
}
- public static ConnectorDefinition getConnectorDefinition(String narPath) throws IOException {
- try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
+ public static ConnectorDefinition getConnectorDefinition(String narPath, String narExtractionDirectory) throws IOException {
+ try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
}
}
- public static Connectors searchForConnectors(String connectorsDirectory) throws IOException {
+ public static Connectors searchForConnectors(String connectorsDirectory, String narExtractionDirectory) throws IOException {
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
@@ -120,7 +120,7 @@ public class ConnectorUtils {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
try {
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(archive.toString());
+ ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(archive.toString(), narExtractionDirectory);
log.info("Found connector {} from {}", cntDef, archive);
if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
index 02e3fc4..40156eb 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
@@ -31,7 +31,7 @@ public class ConnectorsManager {
private Connectors connectors;
public ConnectorsManager(WorkerConfig workerConfig) throws IOException {
- this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory());
+ this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory());
}
public List<ConnectorDefinition> getConnectors() {
@@ -47,6 +47,6 @@ public class ConnectorsManager {
}
public void reloadConnectors(WorkerConfig workerConfig) throws IOException {
- this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory());
+ this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory());
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index eef95a1..0a95755 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -398,7 +398,7 @@ public class FunctionActioner {
SourceSpec sourceSpec = functionDetails.getSource();
if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
File archive = connectorsManager.getSourceArchive(sourceSpec.getBuiltin()).toFile();
- String sourceClass = ConnectorUtils.getConnectorDefinition(archive.toString()).getSourceClass();
+ String sourceClass = ConnectorUtils.getConnectorDefinition(archive.toString(), workerConfig.getNarExtractionDirectory()).getSourceClass();
SourceSpec.Builder builder = SourceSpec.newBuilder(functionDetails.getSource());
builder.setClassName(sourceClass);
functionDetails.setSource(builder);
@@ -412,7 +412,7 @@ public class FunctionActioner {
SinkSpec sinkSpec = functionDetails.getSink();
if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
File archive = connectorsManager.getSinkArchive(sinkSpec.getBuiltin()).toFile();
- String sinkClass = ConnectorUtils.getConnectorDefinition(archive.toString()).getSinkClass();
+ String sinkClass = ConnectorUtils.getConnectorDefinition(archive.toString(), workerConfig.getNarExtractionDirectory()).getSinkClass();
SinkSpec.Builder builder = SinkSpec.newBuilder(functionDetails.getSink());
builder.setClassName(sinkClass);
functionDetails.setSink(builder);
@@ -427,7 +427,7 @@ public class FunctionActioner {
private void fillSourceTypeClass(FunctionDetails.Builder functionDetails, File archive, String className)
throws IOException, ClassNotFoundException {
- try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet())) {
+ try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet(), workerConfig.getNarExtractionDirectory())) {
String typeArg = getSourceType(className, ncl).getName();
SourceSpec.Builder sourceBuilder = SourceSpec.newBuilder(functionDetails.getSource());
@@ -445,7 +445,7 @@ public class FunctionActioner {
private void fillSinkTypeClass(FunctionDetails.Builder functionDetails, File archive, String className)
throws IOException, ClassNotFoundException {
- try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet())) {
+ try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet(), workerConfig.getNarExtractionDirectory())) {
String typeArg = getSinkType(className, ncl).getName();
SinkSpec.Builder sinkBuilder = SinkSpec.newBuilder(functionDetails.getSink());
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index dd4b0ee..560c8cb 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -688,7 +688,8 @@ public class SinksImpl extends ComponentImpl {
throw new IllegalArgumentException(String.format("No Sink archive %s found", archivePath));
}
}
- SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath, componentPackageFile);
+ SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath,
+ componentPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
return SinkConfigUtils.convert(sinkConfig, sinkDetails);
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 9f74a1d..4f434fa 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -684,7 +684,8 @@ public class SourcesImpl extends ComponentImpl {
throw new IllegalArgumentException(String.format("No Source archive %s found", archivePath));
}
}
- SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath, sourcePackageFile);
+ SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath,
+ sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
return SourceConfigUtils.convert(sourceConfig, sourceDetails);
}
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 6ee14dc..243f127 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -148,7 +148,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -194,7 +194,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -241,7 +241,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -301,7 +301,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -367,7 +367,7 @@ public class SchedulerManagerTest {
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider
- (), new CollectorRegistry(), null);
+ (), new CollectorRegistry(), null, null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -478,7 +478,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -605,7 +605,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
@@ -659,7 +659,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -792,7 +792,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index cd55b2a..bd65a0b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -162,7 +162,7 @@ public class FunctionsImplTest {
instanceConfig.setMaxBufferedTuples(1024);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- instanceConfig, null, null, null, null, null, null);
+ instanceConfig, null, null, null, null, null, null, null);
CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<InstanceCommunication.MetricsData>();
metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics());
Runtime runtime = mock(Runtime.class);
@@ -208,7 +208,7 @@ public class FunctionsImplTest {
instanceConfig.setMaxBufferedTuples(1024);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- instanceConfig, null, null, null, null, null, null);
+ instanceConfig, null, null, null, null, null, null, null);
CompletableFuture<InstanceCommunication.MetricsData> completableFuture = new CompletableFuture<InstanceCommunication.MetricsData>();
completableFuture.complete(javaInstanceRunnable.getMetrics());
Runtime runtime = mock(Runtime.class);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index b9aeccb..bc117ab 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -817,7 +817,7 @@ public class SinkApiV3ResourceTest {
FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
- FunctionCommon.extractNarClassLoader(any(), any());
+ FunctionCommon.extractNarClassLoader(any(), any(), any());
doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
@@ -888,7 +888,7 @@ public class SinkApiV3ResourceTest {
FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
- FunctionCommon.extractNarClassLoader(any(), any());
+ FunctionCommon.extractNarClassLoader(any(), any(), any());
doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
@@ -986,7 +986,7 @@ public class SinkApiV3ResourceTest {
PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
- FunctionCommon.extractNarClassLoader(any(), any());
+ FunctionCommon.extractNarClassLoader(any(), any(), any());
doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 88dcff4..d13fd3d 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -837,7 +837,7 @@ public class SourceApiV3ResourceTest {
FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
- FunctionCommon.extractNarClassLoader(any(), any());
+ FunctionCommon.extractNarClassLoader(any(), any(), any());
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -907,7 +907,7 @@ public class SourceApiV3ResourceTest {
FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
- FunctionCommon.extractNarClassLoader(any(), any(File.class));
+ FunctionCommon.extractNarClassLoader(any(), any(File.class), any());
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -1001,7 +1001,7 @@ public class SourceApiV3ResourceTest {
PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
- FunctionCommon.extractNarClassLoader(any(), any());
+ FunctionCommon.extractNarClassLoader(any(), any(), any());
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index a29aabd..b870888 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -83,7 +83,7 @@ public class PulsarConnectorCache {
OffloadPolicies offloadPolicies = new OffloadPolicies();
BeanUtils.copyProperties(offloadPolicies, pulsarConnectorConfig);
- this.defaultOffloader = initManagedLedgerOffloader(offloadPolicies);
+ this.defaultOffloader = initManagedLedgerOffloader(offloadPolicies, pulsarConnectorConfig);
}
public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
@@ -117,7 +117,8 @@ public class PulsarConnectorCache {
return new ManagedLedgerFactoryImpl(bkClientConfiguration, managedLedgerFactoryConfig);
}
- public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, OffloadPolicies offloadPolicies) {
+ public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, OffloadPolicies offloadPolicies,
+ PulsarConnectorConfig pulsarConnectorConfig) {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
if (offloadPolicies == null) {
managedLedgerConfig.setLedgerOffloader(this.defaultOffloader);
@@ -130,7 +131,7 @@ public class PulsarConnectorCache {
if (offloader != null) {
offloader.close();
}
- return initManagedLedgerOffloader(offloadPolicies);
+ return initManagedLedgerOffloader(offloadPolicies, pulsarConnectorConfig);
}
});
managedLedgerConfig.setLedgerOffloader(ledgerOffloader);
@@ -147,14 +148,16 @@ public class PulsarConnectorCache {
return this.offloaderScheduler;
}
- private LedgerOffloader initManagedLedgerOffloader(OffloadPolicies offloadPolicies) {
+ private LedgerOffloader initManagedLedgerOffloader(OffloadPolicies offloadPolicies,
+ PulsarConnectorConfig pulsarConnectorConfig) {
try {
if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) {
checkNotNull(offloadPolicies.getOffloadersDirectory(),
"Offloader driver is configured to be '%s' but no offloaders directory is configured.",
offloadPolicies.getManagedLedgerOffloadDriver());
- this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory());
+ this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory(),
+ pulsarConnectorConfig.getNarExtractionDirectory());
LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
offloadPolicies.getManagedLedgerOffloadDriver());
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index b2b2755..49d2ae3 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamedEntity;
+import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.protocol.Commands;
/**
@@ -74,6 +75,9 @@ public class PulsarConnectorConfig implements AutoCloseable {
private int managedLedgerNumWorkerThreads = Runtime.getRuntime().availableProcessors();
private int managedLedgerNumSchedulerThreads = Runtime.getRuntime().availableProcessors();
+ // --- Nar extraction
+ private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
+
@NotNull
public String getBrokerServiceUrl() {
return brokerServiceUrl;
@@ -358,6 +362,17 @@ public class PulsarConnectorConfig implements AutoCloseable {
return this;
}
+ // --- Nar extraction config
+ public String getNarExtractionDirectory() {
+ return narExtractionDirectory;
+ }
+
+ @Config("pulsar.nar-extraction-directory")
+ public PulsarConnectorConfig setNarExtractionDirectory(String narExtractionDirectory) {
+ this.narExtractionDirectory = narExtractionDirectory;
+ return this;
+ }
+
@NotNull
public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
if (this.pulsarAdmin == null) {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 003c51f..d7e2e2f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -124,7 +124,8 @@ public class PulsarRecordCursor implements RecordCursor {
pulsarConnectorCache.getManagedLedgerFactory(),
pulsarConnectorCache.getManagedLedgerConfig(
TopicName.get("persistent", NamespaceName.get(pulsarSplit.getSchemaName()),
- pulsarSplit.getTableName()).getNamespaceObject(), offloadPolicies),
+ pulsarSplit.getTableName()).getNamespaceObject(), offloadPolicies,
+ pulsarConnectorConfig),
new PulsarConnectorMetricsTracker(pulsarConnectorCache.getStatsProvider()));
}