You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/09 12:08:12 UTC
[pulsar] branch master updated: [improve][function] Get function class name from the NAR when using built-in functions (#15693)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b3cd139335e [improve][function] Get function class name from the NAR when using built-in functions (#15693)
b3cd139335e is described below
commit b3cd139335e65c6161abd7653e2c4c0cb61c983f
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Thu Jun 9 14:08:04 2022 +0200
[improve][function] Get function class name from the NAR when using built-in functions (#15693)
* Get function class name from the NAR manifest when using built-in functions
* Exclude pulsar-io.yaml from examples JAR
* Build a NAR separately from the JAR for the function examples
* Improve admin cli tes for built-in functions
---
pulsar-broker/pom.xml | 18 ++++
.../worker/PulsarFunctionLocalRunTest.java | 31 ++++++-
.../PulsarFunctionTestTemporaryDirectory.java | 8 +-
.../apache/pulsar/io/AbstractPulsarE2ETest.java | 6 ++
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 47 ++++++++---
.../apache/pulsar/admin/cli/CmdFunctionsTest.java | 1 -
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 15 ++--
.../org/apache/pulsar/common/functions/Utils.java | 13 +--
pulsar-functions/java-examples-builtin/pom.xml | 50 +++++++++++
.../resources/META-INF/services/pulsar-io.yaml | 22 +++++
.../org/apache/pulsar/functions/LocalRunner.java | 82 ++++++++++--------
pulsar-functions/pom.xml | 2 +
.../pulsar/functions/utils/FunctionCommon.java | 6 +-
.../functions/utils/FunctionConfigUtils.java | 96 ++++++++++++++++------
.../{Functions.java => FunctionArchive.java} | 13 ++-
.../functions/utils/functions/FunctionUtils.java | 46 ++++++-----
.../functions/utils/FunctionConfigUtilsTest.java | 8 +-
.../pulsar/functions/worker/FunctionsManager.java | 13 ++-
.../functions/worker/rest/api/ComponentImpl.java | 8 +-
.../functions/worker/rest/api/FunctionsImpl.java | 45 +++++-----
.../worker/rest/api/FunctionsImplTest.java | 2 +-
.../rest/api/v2/FunctionApiV2ResourceTest.java | 16 ++--
.../rest/api/v3/FunctionApiV3ResourceTest.java | 14 ++--
23 files changed, 397 insertions(+), 165 deletions(-)
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index c6d55b5f5f9..94702e2427c 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -320,6 +320,14 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-api-examples-builtin</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-batch-data-generator</artifactId>
@@ -460,6 +468,15 @@
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-functions-api-examples.jar</destFileName>
</artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-api-examples-builtin</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ <outputDirectory>${project.build.directory}</outputDirectory>
+ <destFileName>pulsar-functions-api-examples.nar</destFileName>
+ </artifactItem>
</artifactItems>
</configuration>
</execution>
@@ -473,6 +490,7 @@
<systemPropertyVariables>
<pulsar-io-data-generator.nar.path>${project.build.directory}/pulsar-io-data-generator.nar</pulsar-io-data-generator.nar.path>
<pulsar-functions-api-examples.jar.path>${project.build.directory}/pulsar-functions-api-examples.jar</pulsar-functions-api-examples.jar.path>
+ <pulsar-functions-api-examples.nar.path>${project.build.directory}/pulsar-functions-api-examples.nar</pulsar-functions-api-examples.nar.path>
<pulsar-io-batch-data-generator.nar.path>${project.build.directory}/pulsar-io-batch-data-generator.nar</pulsar-io-batch-data-generator.nar.path>
<!-- workaround issue #13750 which gets triggered if org.apache.bookkeeper.meta.MetadataDrivers class gets loaded before org.apache.pulsar.metadata.bookkeeper.BKCluster constructor is called -->
<bookkeeper.metadata.bookie.drivers>org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver</bookkeeper.metadata.bookie.drivers>
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 8cb067762da..388cd687b90 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -147,6 +147,16 @@ public class PulsarFunctionLocalRunTest {
+ SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_JAR_FILE_PATH + " system property"));
}
+ private static final String SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH =
+ "pulsar-functions-api-examples.nar.path";
+
+ public static File getPulsarApiExamplesNar() {
+ return new File(Objects.requireNonNull(
+ System.getProperty(SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH)
+ , "pulsar-functions-api-examples.nar file location must be specified with "
+ + SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH + " system property"));
+ }
+
private static final String SYSTEM_PROPERTY_NAME_BATCH_NAR_FILE_PATH = "pulsar-io-batch-data-generator.nar.path";
public static File getPulsarIOBatchDataGeneratorNar() {
@@ -235,6 +245,11 @@ public class PulsarFunctionLocalRunTest {
File file = getPulsarIODataGeneratorNar();
Files.copy(file.toPath(), new File(connectorsDir, file.getName()).toPath());
+
+ File functionsDir = new File(workerConfig.getFunctionsDirectory());
+
+ file = getPulsarApiExamplesNar();
+ Files.copy(file.toPath(), new File(functionsDir, file.getName()).toPath());
}
Optional<WorkerService> functionWorkerService = Optional.empty();
@@ -350,6 +365,7 @@ public class PulsarFunctionLocalRunTest {
protected static FunctionConfig createFunctionConfig(String tenant,
String namespace,
String functionName,
+ boolean isBuiltin,
String sourceTopic,
String sinkTopic,
String subscriptionName) {
@@ -363,7 +379,9 @@ public class PulsarFunctionLocalRunTest {
functionConfig.setSubName(subscriptionName);
functionConfig.setInputs(Collections.singleton(sourceTopic));
functionConfig.setAutoAck(true);
- functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+ if (!isBuiltin) {
+ functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+ }
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput(sinkTopic);
functionConfig.setCleanupSubscription(true);
@@ -425,7 +443,7 @@ public class PulsarFunctionLocalRunTest {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();
FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
- sourceTopic, sinkTopic, subscriptionName);
+ jarFilePathUrl != null && jarFilePathUrl.startsWith(Utils.BUILTIN), sourceTopic, sinkTopic, subscriptionName);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setJar(jarFilePathUrl);
@@ -441,6 +459,7 @@ public class PulsarFunctionLocalRunTest {
.tlsAllowInsecureConnection(true)
.tlsHostNameVerificationEnabled(false)
.metricsPortStart(metricsPort)
+ .functionsDirectory(workerConfig.getFunctionsDirectory())
.brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
localRunner.start(false);
@@ -594,7 +613,7 @@ public class PulsarFunctionLocalRunTest {
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(sinkTopic).subscriptionName("sub").subscribe();
FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
- sourceTopic, sinkTopic, subscriptionName);
+ jarFilePathUrl != null && jarFilePathUrl.startsWith(Utils.BUILTIN), sourceTopic, sinkTopic, subscriptionName);
//set jsr310ConversionEnabledăalwaysAllowNull
Map<String,String> schemaInput = new HashMap<>();
schemaInput.put(sourceTopic, "{\"schemaType\":\"AVRO\",\"schemaProperties\":{\"__jsr310ConversionEnabled\":\"true\",\"__alwaysAllowNull\":\"true\"}}");
@@ -708,6 +727,12 @@ public class PulsarFunctionLocalRunTest {
testE2EPulsarFunctionLocalRun(fileServer.getUrl("/pulsar-functions-api-examples.jar"));
}
+ @Test(timeOut = 20000, groups = "builtin")
+ public void testE2EPulsarFunctionLocalRunBuiltin() throws Exception {
+ String jarFilePathUrl = String.format("%s://exclamation", Utils.BUILTIN);
+ testE2EPulsarFunctionLocalRun(jarFilePathUrl);
+ }
+
@Test(timeOut = 40000)
public void testE2EPulsarFunctionLocalRunMultipleInstances() throws Throwable {
runWithPulsarFunctionsClassLoader(() -> testE2EPulsarFunctionLocalRun(null, 2));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestTemporaryDirectory.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestTemporaryDirectory.java
index b97d2a1203d..156a4d945e6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestTemporaryDirectory.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestTemporaryDirectory.java
@@ -27,8 +27,8 @@ import org.apache.commons.io.FileUtils;
import org.testng.Assert;
/**
- * Creates a temporary directory that contains 3 subdirectories,
- * "narExtractionDirectory", "downloadDirectory" and "connectorsDirectory",
+ * Creates a temporary directory that contains 4 subdirectories,
+ * "narExtractionDirectory", "downloadDirectory", "connectorsDirectory" and "functionsDirectory",
* which are assigned to the provided workerConfig's respective settings with
* the {@link #useTemporaryDirectoriesForWorkerConfig(WorkerConfig)} method
*/
@@ -37,6 +37,7 @@ public class PulsarFunctionTestTemporaryDirectory {
private final File narExtractionDirectory;
private final File downloadDirectory;
private final File connectorsDirectory;
+ private final File functionsDirectory;
private PulsarFunctionTestTemporaryDirectory(String tempDirectoryNamePrefix) throws IOException {
tempDirectory = Files.createTempDirectory(tempDirectoryNamePrefix).toFile();
@@ -46,6 +47,8 @@ public class PulsarFunctionTestTemporaryDirectory {
downloadDirectory.mkdir();
connectorsDirectory = new File(tempDirectory, "connectorsDirectory");
connectorsDirectory.mkdir();
+ functionsDirectory = new File(tempDirectory, "functionsDirectory");
+ functionsDirectory.mkdir();
}
public static PulsarFunctionTestTemporaryDirectory create(String tempDirectoryNamePrefix) {
@@ -60,6 +63,7 @@ public class PulsarFunctionTestTemporaryDirectory {
workerConfig.setNarExtractionDirectory(narExtractionDirectory.getAbsolutePath());
workerConfig.setDownloadDirectory(downloadDirectory.getAbsolutePath());
workerConfig.setConnectorsDirectory(connectorsDirectory.getAbsolutePath());
+ workerConfig.setFunctionsDirectory(functionsDirectory.getAbsolutePath());
}
public void delete() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
index 7c0f2f0bbe1..1ac908c6ec2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarApiExamplesJar;
+import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarApiExamplesNar;
import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarIOBatchDataGeneratorNar;
import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar;
import static org.mockito.Mockito.spy;
@@ -158,6 +159,11 @@ public abstract class AbstractPulsarE2ETest {
file = getPulsarIOBatchDataGeneratorNar();
Files.copy(file.toPath(), new File(connectorsDir, file.getName()).toPath());
+
+ File functionsDir = new File(workerConfig.getFunctionsDirectory());
+
+ file = getPulsarApiExamplesNar();
+ Files.copy(file.toPath(), new File(functionsDir, file.getName()).toPath());
}
Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 564fa269524..e68a556524c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatus;
@@ -76,7 +77,7 @@ import org.testng.annotations.Test;
@Test(groups = "broker-io")
public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
- protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
+ protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, boolean isBuiltin, String sourceTopic, String sinkTopic, String subscriptionName) {
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
@@ -89,7 +90,9 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
functionConfig.setTopicsPattern(sourceTopicPattern);
}
functionConfig.setAutoAck(true);
- functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+ if (!isBuiltin) {
+ functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+ }
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput(sinkTopic);
functionConfig.setCleanupSubscription(true);
@@ -121,14 +124,26 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic2).subscriptionName("sub").subscribe();
FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
- "my.*", sinkTopic, subscriptionName);
+ jarFilePathUrl.startsWith(Utils.BUILTIN), "my.*", sinkTopic, subscriptionName);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
- admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+ if (jarFilePathUrl.startsWith(Utils.BUILTIN)) {
+ functionConfig.setJar(jarFilePathUrl);
+ admin.functions().createFunction(functionConfig, null);
+ } else {
+ admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+ }
// try to update function to test: update-function functionality
functionConfig.setParallelism(2);
functionConfig.setOutput(sinkTopic2);
- admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+ if (jarFilePathUrl.startsWith(Utils.BUILTIN)) {
+ functionConfig.setJar(jarFilePathUrl);
+ admin.functions().updateFunction(functionConfig, null);
+ } else {
+ admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
+ }
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
TopicStats topicStats = admin.topics().getStats(sinkTopic2);
@@ -186,6 +201,18 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
testE2EPulsarFunction(fileServer.getUrl("/pulsar-functions-api-examples.jar"));
}
+ @Test(timeOut = 20000, groups = "builtin")
+ public void testPulsarFunctionBuiltin() throws Exception {
+ String jarFilePathUrl = String.format("%s://exclamation", Utils.BUILTIN);
+ testE2EPulsarFunction(jarFilePathUrl);
+ }
+
+ @Test(timeOut = 20000, groups = "builtin", expectedExceptions = {PulsarAdminException.class}, expectedExceptionsMessageRegExp = "No Function foo found")
+ public void testPulsarFunctionBuiltinDoesNotExist() throws Exception {
+ String jarFilePathUrl = String.format("%s://foo", Utils.BUILTIN);
+ testE2EPulsarFunction(jarFilePathUrl);
+ }
+
@Test(timeOut = 30000)
public void testReadCompactedFunction() throws Exception {
final String namespacePortion = "io";
@@ -225,7 +252,7 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
// 4 Setup function
// set source topic to null because we are setting the topic information separately
- FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+ FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, false,
null, sinkTopic, subscriptionName);
Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
ConsumerConfig consumerConfig = new ConsumerConfig();
@@ -275,7 +302,7 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
String jarFilePathUrl = getPulsarApiExamplesJar().toURI().toString();
- FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+ FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, false,
"my.*", sinkTopic, subscriptionName);
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
@@ -611,7 +638,7 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
String jarFilePathUrl = getPulsarApiExamplesJar().toURI().toString();
- FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+ FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, false,
"my.*", sinkTopic, subscriptionName);
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
@@ -693,7 +720,7 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
admin.tenants().updateTenant(tenant, propAdmin);
String jarFilePathUrl = getPulsarApiExamplesJar().toURI().toString();
- FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+ FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, false,
"my.*", sinkTopic, subscriptionName);
if (!validRoleName) {
// create a non-superuser admin to test the api
@@ -735,7 +762,7 @@ public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
String jarFilePathUrl = getPulsarApiExamplesJar().toURI().toString();
- FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+ FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, false,
sourceTopicName, sinkTopic, subscriptionName);
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index efb5ae8ad64..faa94ebef5a 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -436,7 +436,6 @@ public class CmdFunctionsTest {
"--jar", BUILTIN_NAR,
"--tenant", "sample",
"--namespace", "ns1",
- "--className", DummyFunction.class.getName(),
});
CreateFunction creater = cmd.getCreater();
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index a9c83a2cd09..61bf9c438d4 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -643,7 +643,8 @@ public class CmdFunctions extends CmdBase {
protected void validateFunctionConfigs(FunctionConfig functionConfig) {
// go doesn't need className
- if (functionConfig.getPy() != null || functionConfig.getJar() != null) {
+ if (functionConfig.getPy() != null
+ || (functionConfig.getJar() != null && !functionConfig.getJar().startsWith("builtin://"))) {
if (StringUtils.isEmpty(functionConfig.getClassName())) {
throw new ParameterException("No Function Classname specified");
}
@@ -651,6 +652,9 @@ public class CmdFunctions extends CmdBase {
if (StringUtils.isEmpty(functionConfig.getName())) {
org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig);
}
+ if (StringUtils.isEmpty(functionConfig.getName())) {
+ throw new IllegalArgumentException("No Function name specified");
+ }
if (StringUtils.isEmpty(functionConfig.getTenant())) {
org.apache.pulsar.common.functions.Utils.inferMissingTenant(functionConfig);
}
@@ -958,13 +962,12 @@ public class CmdFunctions extends CmdBase {
@Override
protected void validateFunctionConfigs(FunctionConfig functionConfig) {
- if (StringUtils.isEmpty(functionConfig.getClassName())) {
- if (StringUtils.isEmpty(functionConfig.getName())) {
- throw new ParameterException("Function Name not provided");
- }
- } else if (StringUtils.isEmpty(functionConfig.getName())) {
+ if (StringUtils.isEmpty(functionConfig.getName())) {
org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig);
}
+ if (StringUtils.isEmpty(functionConfig.getName())) {
+ throw new ParameterException("Function Name not provided");
+ }
if (StringUtils.isEmpty(functionConfig.getTenant())) {
org.apache.pulsar.common.functions.Utils.inferMissingTenant(functionConfig);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
index 2de38bbe02f..e4c9b9daeb5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
@@ -22,6 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import java.util.Arrays;
+import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.packages.management.core.common.PackageType;
@@ -46,11 +47,13 @@ public class Utils {
}
public static void inferMissingFunctionName(FunctionConfig functionConfig) {
- String[] domains = functionConfig.getClassName().split("\\.");
- if (domains.length == 0) {
- functionConfig.setName(functionConfig.getClassName());
- } else {
- functionConfig.setName(domains[domains.length - 1]);
+ if (!StringUtils.isEmpty(functionConfig.getClassName())) {
+ String[] domains = functionConfig.getClassName().split("\\.");
+ if (domains.length == 0) {
+ functionConfig.setName(functionConfig.getClassName());
+ } else {
+ functionConfig.setName(domains[domains.length - 1]);
+ }
}
}
diff --git a/pulsar-functions/java-examples-builtin/pom.xml b/pulsar-functions/java-examples-builtin/pom.xml
new file mode 100644
index 00000000000..7822aa9e83a
--- /dev/null
+++ b/pulsar-functions/java-examples-builtin/pom.xml
@@ -0,0 +1,50 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions</artifactId>
+ <version>2.11.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-functions-api-examples-builtin</artifactId>
+ <name>Pulsar Functions :: API Examples (NAR)</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-api-examples</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/pulsar-functions/java-examples-builtin/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-functions/java-examples-builtin/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 00000000000..32cc0fb8ac0
--- /dev/null
+++ b/pulsar-functions/java-examples-builtin/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: exclamation
+description: Exclamation function
+functionClass: org.apache.pulsar.functions.api.examples.ExclamationFunction
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 5b3422693b1..e83653100ad 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -48,7 +48,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SinkConfig;
@@ -75,11 +74,10 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
+import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
-import org.apache.pulsar.functions.utils.functions.Functions;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-import org.apache.pulsar.functions.worker.WorkerConfig;
@Slf4j
public class LocalRunner implements AutoCloseable {
@@ -89,6 +87,7 @@ public class LocalRunner implements AutoCloseable {
private final String narExtractionDirectory;
private final File narExtractionDirectoryCreated;
private final String connectorsDir;
+ private final String functionsDir;
private final Thread shutdownHook;
private final int instanceLivenessCheck;
private ClassLoader userCodeClassLoader;
@@ -218,7 +217,8 @@ public class LocalRunner implements AutoCloseable {
boolean useTls, boolean tlsAllowInsecureConnection, boolean tlsHostNameVerificationEnabled,
String tlsTrustCertFilePath, int instanceIdOffset, RuntimeEnv runtimeEnv,
String secretsProviderClassName, String secretsProviderConfig, String narExtractionDirectory,
- String connectorsDirectory, Integer metricsPortStart, boolean exitOnError) {
+ String connectorsDirectory, String functionsDirectory, Integer metricsPortStart,
+ boolean exitOnError) {
this.functionConfig = functionConfig;
this.sourceConfig = sourceConfig;
this.sinkConfig = sinkConfig;
@@ -242,15 +242,8 @@ public class LocalRunner implements AutoCloseable {
this.narExtractionDirectoryCreated = createNarExtractionTempDirectory();
this.narExtractionDirectory = this.narExtractionDirectoryCreated.getAbsolutePath();
}
- if (connectorsDirectory != null) {
- this.connectorsDir = connectorsDirectory;
- } else {
- String pulsarHome = System.getenv("PULSAR_HOME");
- if (pulsarHome == null) {
- pulsarHome = Paths.get("").toAbsolutePath().toString();
- }
- this.connectorsDir = Paths.get(pulsarHome, "connectors").toString();
- }
+ this.connectorsDir = connectorsDirectory != null ? connectorsDirectory : getPulsarDirectory("connectors");
+ this.functionsDir = functionsDirectory != null ? functionsDirectory : getPulsarDirectory("functions");
this.metricsPortStart = metricsPortStart;
this.exitOnError = exitOnError;
this.instanceLivenessCheck = exitOnError ? 0 : 30000;
@@ -263,6 +256,14 @@ public class LocalRunner implements AutoCloseable {
});
}
+ private static String getPulsarDirectory(String directory) {
+ String pulsarHome = System.getenv("PULSAR_HOME");
+ if (pulsarHome == null) {
+ pulsarHome = Paths.get("").toAbsolutePath().toString();
+ }
+ return Paths.get(pulsarHome, directory).toString();
+ }
+
private static File createNarExtractionTempDirectory() {
try {
return Files.createTempDirectory("pulsar_localrunner_nars_").toFile();
@@ -328,28 +329,23 @@ public class LocalRunner implements AutoCloseable {
throw new IllegalArgumentException("Pulsar Function local run already started!");
}
Runtime.getRuntime().addShutdownHook(shutdownHook);
- Function.FunctionDetails functionDetails;
- String userCodeFile = null;
+ Function.FunctionDetails functionDetails = null;
+ String userCodeFile;
int parallelism;
if (functionConfig != null) {
FunctionConfigUtils.inferMissingArguments(functionConfig, true);
parallelism = functionConfig.getParallelism();
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
userCodeFile = functionConfig.getJar();
-
- boolean isBuiltin = !StringUtils.isEmpty(functionConfig.getJar())
- && functionConfig.getJar().startsWith(Utils.BUILTIN);
- if (isBuiltin) {
- WorkerConfig workerConfig =
- WorkerConfig.load(System.getenv("PULSAR_HOME") + "/conf/functions_worker.yml");
- Functions functions = FunctionUtils
- .searchForFunctions(System.getenv("PULSAR_HOME")
- + workerConfig.getFunctionsDirectory().replaceFirst("^.", ""));
- String functionType = functionConfig.getJar().replaceFirst("^builtin://", "");
- userCodeFile = functions.getFunctions().get(functionType).toString();
- }
-
- if (Utils.isFunctionPackageUrlSupported(userCodeFile)) {
+ ClassLoader builtInFunctionClassLoader = userCodeFile != null
+ ? isBuiltInFunction(userCodeFile)
+ : null;
+ if (builtInFunctionClassLoader != null) {
+ userCodeClassLoader = builtInFunctionClassLoader;
+ functionDetails = FunctionConfigUtils.convert(
+ functionConfig,
+ FunctionConfigUtils.validateJavaFunction(functionConfig, builtInFunctionClassLoader));
+ } else if (Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
userCodeClassLoader = FunctionConfigUtils.validate(functionConfig, file);
userCodeClassLoaderCreated = true;
@@ -375,9 +371,11 @@ public class LocalRunner implements AutoCloseable {
throw new UnsupportedOperationException();
}
- functionDetails = FunctionConfigUtils.convert(functionConfig,
- userCodeClassLoader != null ? userCodeClassLoader :
- Thread.currentThread().getContextClassLoader());
+ if (functionDetails == null) {
+ functionDetails = FunctionConfigUtils.convert(functionConfig,
+ userCodeClassLoader != null ? userCodeClassLoader :
+ Thread.currentThread().getContextClassLoader());
+ }
} else if (sourceConfig != null) {
inferMissingArguments(sourceConfig);
userCodeFile = sourceConfig.getArchive();
@@ -590,7 +588,7 @@ public class LocalRunner implements AutoCloseable {
}
}
}, 30000, 30000);
- java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> statusCheckTimer.cancel()));
+ java.lang.Runtime.getRuntime().addShutdownHook(new Thread(statusCheckTimer::cancel));
}
@@ -679,6 +677,20 @@ public class LocalRunner implements AutoCloseable {
}
}
+ private ClassLoader isBuiltInFunction(String functionType) throws IOException {
+ // Validate the connector type from the locally available connectors
+ TreeMap<String, FunctionArchive> functions = getFunctions();
+
+ String functionName = functionType.replaceFirst("^builtin://", "");
+ FunctionArchive function = functions.get(functionName);
+ if (function != null && function.getFunctionDefinition().getFunctionClass() != null) {
+ // Function type is a valid built-in type.
+ return function.getClassLoader();
+ } else {
+ return null;
+ }
+ }
+
private ClassLoader isBuiltInSource(String sourceType) throws IOException {
// Validate the connector type from the locally available connectors
TreeMap<String, Connector> connectors = getConnectors();
@@ -707,6 +719,10 @@ public class LocalRunner implements AutoCloseable {
}
}
+ private TreeMap<String, FunctionArchive> getFunctions() throws IOException {
+ return FunctionUtils.searchForFunctions(functionsDir);
+ }
+
private TreeMap<String, Connector> getConnectors() throws IOException {
return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory);
}
diff --git a/pulsar-functions/pom.xml b/pulsar-functions/pom.xml
index 2bff9d42938..d60692163c8 100644
--- a/pulsar-functions/pom.xml
+++ b/pulsar-functions/pom.xml
@@ -45,6 +45,7 @@
<module>proto</module>
<module>api-java</module>
<module>java-examples</module>
+ <module>java-examples-builtin</module>
<module>utils</module>
<module>instance</module>
<module>runtime</module>
@@ -62,6 +63,7 @@
<module>proto</module>
<module>api-java</module>
<module>java-examples</module>
+ <module>java-examples-builtin</module>
<module>utils</module>
<module>instance</module>
<module>runtime</module>
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index d6f948a0fe7..02202ae7978 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -92,8 +92,12 @@ public class FunctionCommon {
public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig, ClassLoader classLoader)
throws ClassNotFoundException {
+ return getFunctionTypes(functionConfig, classLoader.loadClass(functionConfig.getClassName()));
+ }
+
+ public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig, Class functionClass)
+ throws ClassNotFoundException {
boolean isWindowConfigPresent = functionConfig.getWindowConfig() != null;
- Class functionClass = classLoader.loadClass(functionConfig.getClassName());
return getFunctionTypes(functionClass, isWindowConfigPresent);
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 99bca74ce90..08dddf3dbfa 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.File;
+import java.io.IOException;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.util.Collection;
@@ -37,6 +38,9 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -49,10 +53,20 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
@Slf4j
public class FunctionConfigUtils {
+ @Getter
+ @Setter
+ @AllArgsConstructor
+ public static class ExtractedFunctionDetails {
+ private String functionClassName;
+ private String typeArg0;
+ private String typeArg1;
+ }
+
static final Integer MAX_PENDING_ASYNC_REQUESTS_DEFAULT = 1000;
static final Boolean FORWARD_SOURCE_MESSAGE_PROPERTY_DEFAULT = Boolean.TRUE;
@@ -61,33 +75,42 @@ public class FunctionConfigUtils {
public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
throws IllegalArgumentException {
- boolean isBuiltin =
- !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar()
- .startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
-
- Class<?>[] typeArgs = null;
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
if (classLoader != null) {
try {
- typeArgs = FunctionCommon.getFunctionTypes(functionConfig, classLoader);
+ Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(functionConfig, classLoader);
+ return convert(
+ functionConfig,
+ new ExtractedFunctionDetails(
+ functionConfig.getClassName(),
+ typeArgs[0].getName(),
+ typeArgs[1].getName()));
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(
String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
}
}
}
+ return convert(functionConfig, new ExtractedFunctionDetails(functionConfig.getClassName(), null, null));
+ }
+
+ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFunctionDetails extractedDetails)
+ throws IllegalArgumentException {
+
+ boolean isBuiltin = !StringUtils.isEmpty(functionConfig.getJar())
+ && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
// Setup source
Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder();
if (functionConfig.getInputs() != null) {
- functionConfig.getInputs().forEach((topicName -> {
+ functionConfig.getInputs().forEach((topicName ->
sourceSpecBuilder.putInputSpecs(topicName,
Function.ConsumerSpec.newBuilder()
.setIsRegexPattern(false)
- .build());
- }));
+ .build())
+ ));
}
if (functionConfig.getTopicsPattern() != null && !functionConfig.getTopicsPattern().isEmpty()) {
sourceSpecBuilder.putInputSpecs(functionConfig.getTopicsPattern(),
@@ -166,7 +189,7 @@ public class FunctionConfigUtils {
// Set subscription position
if (functionConfig.getSubscriptionPosition() != null) {
- Function.SubscriptionPosition subPosition = null;
+ Function.SubscriptionPosition subPosition;
if (SubscriptionInitialPosition.Earliest == functionConfig.getSubscriptionPosition()) {
subPosition = Function.SubscriptionPosition.EARLIEST;
} else {
@@ -175,8 +198,8 @@ public class FunctionConfigUtils {
sourceSpecBuilder.setSubscriptionPosition(subPosition);
}
- if (typeArgs != null) {
- sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
+ if (extractedDetails.getTypeArg0() != null) {
+ sourceSpecBuilder.setTypeClassName(extractedDetails.getTypeArg0());
}
if (functionConfig.getTimeoutMs() != null) {
sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs());
@@ -217,8 +240,8 @@ public class FunctionConfigUtils {
String.format("Incorrect custom schema outputs,Topic %s ", functionConfig.getOutput()));
}
}
- if (typeArgs != null) {
- sinkSpecBuilder.setTypeClassName(typeArgs[1].getName());
+ if (extractedDetails.getTypeArg1() != null) {
+ sinkSpecBuilder.setTypeClassName(extractedDetails.getTypeArg1());
}
if (functionConfig.getProducerConfig() != null) {
ProducerConfig producerConf = functionConfig.getProducerConfig();
@@ -285,14 +308,14 @@ public class FunctionConfigUtils {
// windowing related
WindowConfig windowConfig = functionConfig.getWindowConfig();
if (windowConfig != null) {
- windowConfig.setActualWindowFunctionClassName(functionConfig.getClassName());
+ windowConfig.setActualWindowFunctionClassName(extractedDetails.getFunctionClassName());
configs.put(WindowConfig.WINDOW_CONFIG_KEY, windowConfig);
// set class name to window function executor
functionDetailsBuilder.setClassName("org.apache.pulsar.functions.windowing.WindowFunctionExecutor");
} else {
- if (functionConfig.getClassName() != null) {
- functionDetailsBuilder.setClassName(functionConfig.getClassName());
+ if (extractedDetails.getFunctionClassName() != null) {
+ functionDetailsBuilder.setClassName(extractedDetails.getFunctionClassName());
}
}
if (!configs.isEmpty()) {
@@ -517,10 +540,21 @@ public class FunctionConfigUtils {
}
}
- private static void doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
+ private static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
+ String functionClassName = functionConfig.getClassName();
+ Class functionClass;
try {
- Class functionClass = clsLoader.loadClass(functionConfig.getClassName());
+ // if class name in function config is not set, this should be a built-in function
+ // thus we should try to find its class name in the NAR service definition
+ if (functionClassName == null) {
+ try {
+ functionClassName = FunctionUtils.getFunctionClass(clsLoader);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to extract source class from archive", e);
+ }
+ }
+ functionClass = clsLoader.loadClass(functionClassName);
if (!org.apache.pulsar.functions.api.Function.class.isAssignableFrom(functionClass)
&& !java.util.function.Function.class.isAssignableFrom(functionClass)
@@ -536,7 +570,7 @@ public class FunctionConfigUtils {
Class<?>[] typeArgs;
try {
- typeArgs = FunctionCommon.getFunctionTypes(functionConfig, clsLoader);
+ typeArgs = FunctionCommon.getFunctionTypes(functionConfig, functionClass);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(
String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
@@ -590,7 +624,10 @@ public class FunctionConfigUtils {
}
if (Void.class.equals(typeArgs[1])) {
- return;
+ return new FunctionConfigUtils.ExtractedFunctionDetails(
+ functionClassName,
+ typeArgs[0].getName(),
+ typeArgs[1].getName());
}
// One and only one of outputSchemaType and outputSerdeClassName should be set
@@ -612,6 +649,10 @@ public class FunctionConfigUtils {
ValidatorUtils
.validateCryptoKeyReader(functionConfig.getProducerConfig().getCryptoConfig(), clsLoader, true);
}
+ return new FunctionConfigUtils.ExtractedFunctionDetails(
+ functionClassName,
+ typeArgs[0].getName(),
+ typeArgs[1].getName());
}
private static void doPythonChecks(FunctionConfig functionConfig) {
@@ -660,6 +701,10 @@ public class FunctionConfigUtils {
}
}
+ private static boolean isBuiltin(FunctionConfig functionConfig) {
+ return functionConfig.getJar() != null && functionConfig.getJar().startsWith("builtin://");
+ }
+
private static void doCommonChecks(FunctionConfig functionConfig) {
if (isEmpty(functionConfig.getTenant())) {
throw new IllegalArgumentException("Function tenant cannot be null");
@@ -672,7 +717,7 @@ public class FunctionConfigUtils {
}
// go doesn't need className
if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON
- || functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+ || (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA && !isBuiltin(functionConfig))) {
if (isEmpty(functionConfig.getClassName())) {
throw new IllegalArgumentException("Function classname cannot be null");
}
@@ -826,7 +871,7 @@ public class FunctionConfigUtils {
public static ClassLoader validate(FunctionConfig functionConfig, File functionPackageFile) {
doCommonChecks(functionConfig);
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
- ClassLoader classLoader = null;
+ ClassLoader classLoader;
if (functionPackageFile != null) {
try {
classLoader = loadJar(functionPackageFile);
@@ -860,9 +905,10 @@ public class FunctionConfigUtils {
}
}
- public static void validateJavaFunction(FunctionConfig functionConfig, ClassLoader classLoader) {
+ public static ExtractedFunctionDetails validateJavaFunction(FunctionConfig functionConfig,
+ ClassLoader classLoader) {
doCommonChecks(functionConfig);
- doJavaChecks(functionConfig, classLoader);
+ return doJavaChecks(functionConfig, classLoader);
}
public static FunctionConfig validateUpdate(FunctionConfig existingConfig, FunctionConfig newConfig) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java
similarity index 79%
rename from pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java
rename to pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java
index 206d1a52d55..8d621cc965f 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java
@@ -19,15 +19,14 @@
package org.apache.pulsar.functions.utils.functions;
import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import lombok.Builder;
import lombok.Data;
import org.apache.pulsar.common.functions.FunctionDefinition;
+@Builder
@Data
-public class Functions {
- final List<FunctionDefinition> functionsDefinitions = new ArrayList<>();
- final Map<String, Path> functions = new TreeMap<>();
+public class FunctionArchive {
+ private Path archivePath;
+ private ClassLoader classLoader;
+ private FunctionDefinition functionDefinition;
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
index 68c1fdd9be4..8b70a4b6508 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
@@ -24,7 +24,7 @@ import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Collections;
+import java.util.TreeMap;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -43,7 +43,7 @@ public class FunctionUtils {
private static final String PULSAR_IO_SERVICE_NAME = "pulsar-io.yaml";
/**
- * Extract the Pulsar Function class from a functionctor archive.
+ * Extract the Pulsar Function class from a function or archive.
*/
public static String getFunctionClass(ClassLoader classLoader) throws IOException {
NarClassLoader ncl = (NarClassLoader) classLoader;
@@ -71,24 +71,21 @@ public class FunctionUtils {
return conf.getFunctionClass();
}
- public static FunctionDefinition getFunctionDefinition(String narPath) throws IOException {
- try (NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(narPath))
- .build();) {
- String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
- return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, FunctionDefinition.class);
- }
+ public static FunctionDefinition getFunctionDefinition(NarClassLoader narClassLoader) throws IOException {
+ String configStr = narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+ return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, FunctionDefinition.class);
}
- public static Functions searchForFunctions(String functionsDirectory) throws IOException {
+
+ public static TreeMap<String, FunctionArchive> searchForFunctions(String functionsDirectory) throws IOException {
return searchForFunctions(functionsDirectory, false);
}
- public static Functions searchForFunctions(String functionsDirectory, boolean alwaysPopulatePath)
- throws IOException {
+ public static TreeMap<String, FunctionArchive> searchForFunctions(String functionsDirectory,
+ boolean alwaysPopulatePath) throws IOException {
Path path = Paths.get(functionsDirectory).toAbsolutePath();
log.info("Searching for functions in {}", path);
- Functions functions = new Functions();
+ TreeMap<String, FunctionArchive> functions = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Functions archive directory not found");
@@ -98,24 +95,29 @@ public class FunctionUtils {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
try {
- FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(archive.toString());
+
+ NarClassLoader ncl = NarClassLoaderBuilder.builder()
+ .narFile(new File(archive.toString()))
+ .build();
+
+ FunctionArchive.FunctionArchiveBuilder functionArchiveBuilder = FunctionArchive.builder();
+ FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(ncl);
log.info("Found function {} from {}", cntDef, archive);
- log.error(cntDef.getName());
- log.error(cntDef.getFunctionClass());
+
+ functionArchiveBuilder.archivePath(archive);
+
+ functionArchiveBuilder.classLoader(ncl);
+ functionArchiveBuilder.functionDefinition(cntDef);
+
if (alwaysPopulatePath || !StringUtils.isEmpty(cntDef.getFunctionClass())) {
- functions.functions.put(cntDef.getName(), archive);
+ functions.put(cntDef.getName(), functionArchiveBuilder.build());
}
-
- functions.functionsDefinitions.add(cntDef);
} catch (Throwable t) {
log.warn("Failed to load function from {}", archive, t);
}
}
}
- Collections.sort(functions.functionsDefinitions,
- (c1, c2) -> String.CASE_INSENSITIVE_ORDER.compare(c1.getName(), c2.getName()));
-
return functions;
}
}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index a6ebde14983..a2ee418a11c 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -84,7 +84,7 @@ public class FunctionConfigUtilsTest {
producerConfig.setUseThreadLocalProducers(true);
producerConfig.setBatchBuilder("DEFAULT");
functionConfig.setProducerConfig(producerConfig);
- Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
+ Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
// add default resources
@@ -125,7 +125,7 @@ public class FunctionConfigUtilsTest {
producerConfig.setUseThreadLocalProducers(true);
producerConfig.setBatchBuilder("KEY_BASED");
functionConfig.setProducerConfig(producerConfig);
- Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
+ Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
// add default resources
@@ -584,7 +584,7 @@ public class FunctionConfigUtilsTest {
@Test
public void testPoolMessages() {
FunctionConfig functionConfig = createFunctionConfig();
- Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
+ Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
assertFalse(functionDetails.getSource().getInputSpecsMap().get("test-input").getPoolMessages());
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
assertFalse(convertedConfig.getInputSpecs().get("test-input").isPoolMessages());
@@ -594,7 +594,7 @@ public class FunctionConfigUtilsTest {
.poolMessages(true).build());
functionConfig.setInputSpecs(inputSpecs);
- functionDetails = FunctionConfigUtils.convert(functionConfig, null);
+ functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
assertTrue(functionDetails.getSource().getInputSpecsMap().get("test-input").getPoolMessages());
convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
index f643915c2b3..0f1c0fcc835 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
@@ -20,27 +20,26 @@ package org.apache.pulsar.functions.worker;
import java.io.IOException;
import java.nio.file.Path;
-import java.util.List;
+import java.util.TreeMap;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.functions.FunctionDefinition;
+import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
-import org.apache.pulsar.functions.utils.functions.Functions;
@Slf4j
public class FunctionsManager {
- private Functions functions;
+ private TreeMap<String, FunctionArchive> functions;
public FunctionsManager(WorkerConfig workerConfig) throws IOException {
this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
}
- public List<FunctionDefinition> getFunctions() {
- return functions.getFunctionsDefinitions();
+ public FunctionArchive getFunction(String functionType) {
+ return functions.get(functionType);
}
public Path getFunctionArchive(String functionType) {
- return functions.getFunctions().get(functionType);
+ return functions.get(functionType).getArchivePath();
}
public void reloadFunctions(WorkerConfig workerConfig) throws IOException {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 801bf520911..e1103214b32 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -46,6 +46,7 @@ import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -99,8 +100,8 @@ import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
+import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
-import org.apache.pulsar.functions.utils.functions.Functions;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -1457,8 +1458,9 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
String sType = pkgPath.replaceFirst("^builtin://", "");
final String connectorsDir = worker().getWorkerConfig().getConnectorsDirectory();
log.warn("Processing package {} ; looking at the dir {}", pkgPath, connectorsDir);
- Functions sinksOrSources = FunctionUtils.searchForFunctions(connectorsDir, true);
- Path narPath = sinksOrSources.getFunctions().get(sType);
+ TreeMap<String, FunctionArchive> sinksOrSources =
+ FunctionUtils.searchForFunctions(connectorsDir, true);
+ Path narPath = sinksOrSources.get(sType).getArchivePath();
if (narPath == null) {
throw new IllegalStateException("Didn't find " + pkgPath + " in " + connectorsDir);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index bc14f7deb9e..4a765cc4056 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -57,7 +58,9 @@ import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.service.api.Functions;
@@ -118,7 +121,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant,
worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
- if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
+ if (!namespaces.contains(qualifiedNamespaceWithCluster)) {
log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, functionName, namespace);
throw new RestException(Response.Status.BAD_REQUEST, "Namespace does not exist");
}
@@ -144,7 +147,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
ComponentTypeUtils.toString(componentType), functionName));
}
- Function.FunctionDetails functionDetails = null;
+ Function.FunctionDetails functionDetails;
boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
File componentPackageFile = null;
try {
@@ -329,7 +332,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
}
- Function.FunctionDetails functionDetails = null;
+ Function.FunctionDetails functionDetails;
File componentPackageFile = null;
try {
@@ -775,35 +778,39 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
final String namespace,
final String componentName,
final FunctionConfig functionConfig,
- final File componentPackageFile) throws IOException {
+ final File componentPackageFile) {
// The rest end points take precedence over whatever is there in function config
- Path archivePath = null;
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(componentName);
FunctionConfigUtils.inferMissingArguments(
functionConfig, worker().getWorkerConfig().isForwardSourceMessageProperty());
- if (!StringUtils.isEmpty(functionConfig.getJar())) {
- String builtinArchive = functionConfig.getJar();
- if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
- builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
- }
- try {
- archivePath = this.worker().getFunctionsManager().getFunctionArchive(builtinArchive);
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format("No Function archive %s found", archivePath));
+ String archive = functionConfig.getJar();
+ if (!StringUtils.isEmpty(archive)) {
+ if (archive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+ archive = archive.replaceFirst("^builtin://", "");
+
+ FunctionsManager functionsManager = this.worker().getFunctionsManager();
+ FunctionArchive function = functionsManager.getFunction(archive);
+
+ // check if builtin function exists
+ if (function == null) {
+ throw new IllegalArgumentException(String.format("No Function %s found", archive));
+ }
+ return FunctionConfigUtils.convert(
+ functionConfig,
+ FunctionConfigUtils.validateJavaFunction(functionConfig, function.getClassLoader()));
}
}
ClassLoader clsLoader = null;
- if (archivePath != null) {
- clsLoader = FunctionConfigUtils.validate(functionConfig, archivePath.toFile());
- } else {
+ try {
clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
+ return FunctionConfigUtils.convert(functionConfig, clsLoader);
+ } finally {
+ ClassLoaderUtils.closeClassLoader(clsLoader);
}
- return FunctionConfigUtils.convert(functionConfig, clsLoader);
-
}
private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 3a98416fe81..4b7e9940230 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -373,6 +373,6 @@ public class FunctionsImplTest {
public static Function.FunctionDetails createDefaultFunctionDetails() {
FunctionConfig functionConfig = createDefaultFunctionConfig();
- return FunctionConfigUtils.convert(functionConfig, null);
+ return FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
}
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 2bad2bd266e..ca405c5736a 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -539,7 +539,7 @@ public class FunctionApiV2ResourceTest {
inputStream,
details,
functionPkgUrl,
- JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
+ JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, (ClassLoader) null)),
null);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
@@ -557,7 +557,7 @@ public class FunctionApiV2ResourceTest {
mockedInputStream,
mockedFormData,
null,
- JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
+ JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, (ClassLoader) null)),
null);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
@@ -940,7 +940,7 @@ public class FunctionApiV2ResourceTest {
inputStream,
details,
null,
- JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
+ JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, (ClassLoader) null)),
null);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
@@ -968,7 +968,7 @@ public class FunctionApiV2ResourceTest {
mockedInputStream,
mockedFormData,
null,
- JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
+ JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, (ClassLoader) null)),
null);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
@@ -1045,7 +1045,7 @@ public class FunctionApiV2ResourceTest {
null,
null,
filePackageUrl,
- JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
+ JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, (ClassLoader) null)),
null);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
@@ -1437,7 +1437,7 @@ public class FunctionApiV2ResourceTest {
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
try {
resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
- JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)), null);
+ JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, (ClassLoader) null)), null);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
@@ -1471,7 +1471,7 @@ public class FunctionApiV2ResourceTest {
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
try {
resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl,
- JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)), null);
+ JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, (ClassLoader) null)), null);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
@@ -1493,6 +1493,6 @@ public class FunctionApiV2ResourceTest {
public static FunctionDetails createDefaultFunctionDetails() {
FunctionConfig functionConfig = createDefaultFunctionConfig();
- return FunctionConfigUtils.convert(functionConfig, null);
+ return FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
}
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index fae1ae6d975..288001a2964 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -37,7 +37,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
-import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.LinkedList;
@@ -74,6 +73,7 @@ import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -1529,14 +1529,12 @@ public class FunctionApiV3ResourceTest {
when(worker.getWorkerConfig()).thenReturn(config);
FunctionsImpl function = new FunctionsImpl(() -> worker);
- Map<String, Path> functionsMap = new TreeMap<>();
- functionsMap.put("cassandra", file.toPath());
- org.apache.pulsar.functions.utils.functions.Functions mockedFunctions =
- mock(org.apache.pulsar.functions.utils.functions.Functions.class);
- when(mockedFunctions.getFunctions()).thenReturn(functionsMap);
+ TreeMap<String, FunctionArchive> functions = new TreeMap<>();
+ FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
+ functions.put("cassandra", functionArchive);
mockStatic(FunctionUtils.class, ctx -> {
- ctx.when(() -> FunctionUtils.searchForFunctions(anyString(), anyBoolean())).thenReturn(mockedFunctions);
+ ctx.when(() -> FunctionUtils.searchForFunctions(anyString(), anyBoolean())).thenReturn(functions);
});
@@ -1648,6 +1646,6 @@ public class FunctionApiV3ResourceTest {
public static FunctionDetails createDefaultFunctionDetails() {
FunctionConfig functionConfig = createDefaultFunctionConfig();
- return FunctionConfigUtils.convert(functionConfig, null);
+ return FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
}
}