You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/05 00:08:42 UTC
[pulsar] branch master updated: [#6003][pulsar-functions]
Possibility to add builtin Functions (#6895)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9b0098a [#6003][pulsar-functions] Possibility to add builtin Functions (#6895)
9b0098a is described below
commit 9b0098a13ac67d5e342b9b6b15db1f680fc1d6d6
Author: oncode <in...@oncode.it>
AuthorDate: Fri Jun 5 02:08:25 2020 +0200
[#6003][pulsar-functions] Possibility to add builtin Functions (#6895)
Master Issue: #6003
### Motivation
This pull request implements the possibility to add builtin functions (in the same way of the build in connectors).
The builtin function must include a `pulsar-io.yml` file with the following content
```yml
name: <function-name>
description: <function-desciption>
functionClass: <function-class>
```
e.g.
```yml
name: test-function
description: test function description
functionClass: it.oncode.pulsar.functions.TestFunction
```
it is possible to create a builtin function in the same way of the builtin sinks/sources.
Example in scala
```scala
val functionConfigBuilder: FunctionConfigBuilder = FunctionConfig.builder()
val function =
functionConfigBuilder
.tenant("public")
.namespace("default")
.jar("builtin://test-function")
.name("test-function-name")
.className("it.oncode.pulsar.functions.TestFunction")
.inputs(Seq("channel_in").asJava)
.output("channel_out")
.runtime(FunctionConfig.Runtime.JAVA)
.build()
Pulsar.admin.functions
.createFunction(function, null)
```
Function folder to be specified in the `conf/functions_worker.yml` conf file
e.g.
`functionsDirectory: ./functions`
Function package must be in `*.nar` format like for source/sink connectors
### Modifications
I modified the `pulsar-function-utils`, `pulsar-functions-worker` and `pulsar-common` modules on the basis of the built in connectors implementation.
Also `Function.proto` has been modified in order to include the `builtin` property
#### What this MR does not include
- modification of pulsar-admin to fetch the available buildin functions
- the related documentation
This is a feature that is critical for us, I think we could open an issue for the remaining points and consider to merge this PR.
---
conf/functions_worker.yml | 1 +
.../common/functions/FunctionDefinition.java | 47 +++++++++
.../apache/pulsar/common/nar/NarClassLoader.java | 12 +--
pulsar-functions/localrun-shaded/pom.xml | 24 +++--
.../org/apache/pulsar/functions/LocalRunner.java | 21 +++-
.../proto/src/main/proto/Function.proto | 3 +
.../pulsar/functions/worker/WorkerConfig.java | 7 ++
.../functions/utils/FunctionConfigUtils.java | 17 +--
.../functions/utils/functions/FunctionUtils.java | 115 +++++++++++++++++++++
.../functions/utils/functions/Functions.java | 35 +++++++
.../pulsar/functions/worker/FunctionActioner.java | 8 +-
.../functions/worker/FunctionRuntimeManager.java | 4 +-
.../pulsar/functions/worker/FunctionsManager.java | 49 +++++++++
.../pulsar/functions/worker/WorkerService.java | 4 +-
.../pulsar/functions/worker/WorkerUtils.java | 4 +
.../functions/worker/rest/api/FunctionsImpl.java | 23 ++++-
.../functions/worker/FunctionActionerTest.java | 6 +-
.../worker/FunctionRuntimeManagerTest.java | 15 ++-
.../functions/worker/MembershipManagerTest.java | 4 +
19 files changed, 364 insertions(+), 35 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index a2071f1..4ecf1e0 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -214,3 +214,4 @@ brokerClientTrustCertsFilePath:
########################
connectorsDirectory: ./connectors
+functionsDirectory: ./functions
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java
new file mode 100644
index 0000000..44163cb
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.functions;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Basic information about a Pulsar function.
+ */
+@Data
+@NoArgsConstructor
+public class FunctionDefinition {
+
+ /**
+ * The name of the function type.
+ */
+ private String name;
+
+ /**
+ * Description to be used for user help.
+ */
+ private String description;
+
+ /**
+ * The class name for the function implementation.
+ *
+ * <p>If not defined, it will be assumed this function cannot act as a data.
+ */
+ private String functionClass;
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index e33cbfa..bc6af79 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -139,12 +139,12 @@ public class NarClassLoader extends URLClassLoader {
public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars,
String narExtractionDirectory) throws IOException {
- File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
- try {
- return new NarClassLoader(unpacked, additionalJars, NarClassLoader.class.getClassLoader());
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IOException(e);
- }
+ return NarClassLoader.getFromArchive(narPath, additionalJars, NarClassLoader.class.getClassLoader(),
+ NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);
+ }
+
+ public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars) throws IOException {
+ return NarClassLoader.getFromArchive(narPath, additionalJars, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);
}
public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars, ClassLoader parent,
diff --git a/pulsar-functions/localrun-shaded/pom.xml b/pulsar-functions/localrun-shaded/pom.xml
index fce2f24..f27c4b5 100644
--- a/pulsar-functions/localrun-shaded/pom.xml
+++ b/pulsar-functions/localrun-shaded/pom.xml
@@ -20,7 +20,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -71,10 +71,11 @@
</filter>
</filters>
<relocations>
- <relocation>
+ <!-- Netty cannot be shaded, this is causing java.lang.NoSuchMethodError -->
+ <!-- <relocation>
<pattern>com.typesafe.netty</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.typesafe.netty</shadedPattern>
- </relocation>
+ </relocation> -->
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.google</shadedPattern>
@@ -179,10 +180,11 @@
<pattern>org.apache.distributedlog</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog</shadedPattern>
</relocation>
- <relocation>
+ <!-- Jackson cannot be shaded, this is causing java.lang.NoSuchMethodError when calling getThreadLocalYaml-->
+ <!-- <relocation>
<pattern>com.fasterxml</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.fasterxml</shadedPattern>
- </relocation>
+ </relocation> -->
<relocation>
<pattern>org.inferred</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.inferred</shadedPattern>
@@ -199,10 +201,11 @@
<pattern>dlshade</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.dlshade</shadedPattern>
</relocation>
- <relocation>
+ <!-- This refers to an older version of Jackson -->
+ <!-- <relocation>
<pattern>org.codehaus.jackson</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.jackson</shadedPattern>
- </relocation>
+ </relocation> -->
<relocation>
<pattern>net.java.dev.jna</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.java.dev.jna</shadedPattern>
@@ -315,10 +318,11 @@
<pattern>com.beust</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.beust</shadedPattern>
</relocation>
- <relocation>
+ <!-- Netty cannot be shaded, this is causing java.lang.NoSuchMethodError -->
+ <!-- <relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.netty</shadedPattern>
- </relocation>
+ </relocation> -->
<relocation>
<pattern>org.hamcrest</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.hamcrest</shadedPattern>
@@ -357,4 +361,4 @@
</plugin>
</plugins>
</build>
-</project>
+</project>
\ No newline at end of file
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 8229de2..2eb6c97 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -44,8 +44,11 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.functions.Functions;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.utils.io.Connectors;
+import org.apache.pulsar.functions.worker.WorkerConfig;
import java.io.File;
import java.io.IOException;
@@ -212,6 +215,14 @@ public class LocalRunner {
.getProtectionDomain().getCodeSource().getLocation().getFile();
}
+ boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
+ if(isBuiltin){
+ WorkerConfig workerConfig = WorkerConfig.load(System.getenv("PULSAR_HOME") + "/conf/functions_worker.yml");
+ Functions functions = FunctionUtils.searchForFunctions(System.getenv("PULSAR_HOME") + workerConfig.getFunctionsDirectory().replaceFirst("^.", ""));
+ String functionType = functionConfig.getJar().replaceFirst("^builtin://", "");
+ userCodeFile = functions.getFunctions().get(functionType).toString();
+ }
+
if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
classLoader = FunctionConfigUtils.validate(functionConfig, file);
@@ -440,9 +451,10 @@ public class LocalRunner {
// Validate the connector source type from the locally available connectors
Connectors connectors = getConnectors();
- if (connectors.getSources().containsKey(sourceType)) {
+ String source = sourceType.replaceFirst("^builtin://", "");
+ if (connectors.getSources().containsKey(source)) {
// Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
- return connectors.getSources().get(sourceType).toString();
+ return connectors.getSources().get(source).toString();
} else {
return null;
}
@@ -452,9 +464,10 @@ public class LocalRunner {
// Validate the connector source type from the locally available connectors
Connectors connectors = getConnectors();
- if (connectors.getSinks().containsKey(sinkType)) {
+ String sink = sinkType.replaceFirst("^builtin://", "");
+ if (connectors.getSinks().containsKey(sink)) {
// Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
- return connectors.getSinks().get(sinkType).toString();
+ return connectors.getSinks().get(sink).toString();
} else {
return null;
}
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 97215bd..f01d63c 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -81,6 +81,9 @@ message FunctionDetails {
string runtimeFlags = 17;
ComponentType componentType = 18;
string customRuntimeOptions = 19;
+ /* If specified, this will refer to an archive that is
+ * already present in the server */
+ string builtin = 20;
}
message ConsumerSpec {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index b1cf7cb..8e256a6 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -79,6 +79,8 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private static final String CATEGORY_STATE = "State Management";
@Category
private static final String CATEGORY_CONNECTORS = "Connectors";
+ @Category
+ private static final String CATEGORY_FUNCTIONS = "Functions";
@FieldContext(
category = CATEGORY_WORKER,
@@ -146,6 +148,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
)
private Boolean validateConnectorConfig = false;
@FieldContext(
+ category = CATEGORY_FUNCTIONS,
+ doc = "The path to the location to locate builtin functions"
+ )
+ private String functionsDirectory = "./functions";
+ @FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulsar topic used for storing function metadata"
)
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 4e8f2d9..c6615e1 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -54,6 +54,8 @@ public class FunctionConfigUtils {
public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
throws IllegalArgumentException {
+
+ boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
Class<?>[] typeArgs = null;
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
@@ -249,6 +251,11 @@ public class FunctionConfigUtils {
functionDetailsBuilder.setCustomRuntimeOptions(functionConfig.getCustomRuntimeOptions());
}
+ if (isBuiltin) {
+ String builtin = functionConfig.getJar().replaceFirst("^builtin://", "");
+ functionDetailsBuilder.setBuiltin(builtin);
+ }
+
return functionDetailsBuilder.build();
}
@@ -596,12 +603,6 @@ public class FunctionConfigUtils {
throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity");
}
- if (!isEmpty(functionConfig.getJar()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getJar())
- && functionConfig.getJar().startsWith(BUILTIN)) {
- if (!new File(functionConfig.getJar()).exists()) {
- throw new IllegalArgumentException("The supplied jar file does not exist");
- }
- }
if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getPy())
&& functionConfig.getPy().startsWith(BUILTIN)) {
if (!new File(functionConfig.getPy()).exists()) {
@@ -698,6 +699,10 @@ public class FunctionConfigUtils {
mergedConfig.setClassName(newConfig.getClassName());
}
+ if (!StringUtils.isEmpty(newConfig.getJar())) {
+ mergedConfig.setJar(newConfig.getJar());
+ }
+
if (newConfig.getInputSpecs() == null) {
newConfig.setInputSpecs(new HashMap<>());
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
new file mode 100644
index 0000000..1551421
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.functions;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.functions.FunctionDefinition;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.utils.Exceptions;
+import org.apache.pulsar.functions.api.Function;
+
+
+@UtilityClass
+@Slf4j
+public class FunctionUtils {
+
+ private static final String PULSAR_IO_SERVICE_NAME = "pulsar-io.yaml";
+
+ /**
+ * Extract the Pulsar Function class from a functionctor archive.
+ */
+ public static String getFunctionClass(ClassLoader classLoader) throws IOException {
+ NarClassLoader ncl = (NarClassLoader) classLoader;
+ String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+
+ FunctionDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
+ FunctionDefinition.class);
+ if (StringUtils.isEmpty(conf.getFunctionClass())) {
+ throw new IOException(
+ String.format("The '%s' functionctor does not provide a function implementation", conf.getName()));
+ }
+
+ try {
+ // Try to load source class and check it implements Function interface
+ Class functionClass = ncl.loadClass(conf.getFunctionClass());
+ if (!(Function.class.isAssignableFrom(functionClass))) {
+ throw new IOException(
+ "Class " + conf.getFunctionClass() + " does not implement interface " + Function.class.getName());
+ }
+ } catch (Throwable t) {
+ Exceptions.rethrowIOException(t);
+ }
+
+ return conf.getFunctionClass();
+ }
+
+ public static FunctionDefinition getFunctionDefinition(String narPath) throws IOException {
+ try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
+ String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+ return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, FunctionDefinition.class);
+ }
+ }
+
+ public static Functions searchForFunctions(String functionsDirectory) throws IOException {
+ Path path = Paths.get(functionsDirectory).toAbsolutePath();
+ log.info("Searching for functions in {}", path);
+
+ Functions functions = new Functions();
+
+ if (!path.toFile().exists()) {
+ log.warn("Functions archive directory not found");
+ return functions;
+ }
+
+ try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
+ for (Path archive : stream) {
+ try {
+ FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(archive.toString());
+ log.info("Found function {} from {}", cntDef, archive);
+ log.error(cntDef.getName());
+ log.error(cntDef.getFunctionClass());
+ if (!StringUtils.isEmpty(cntDef.getFunctionClass())) {
+ functions.functions.put(cntDef.getName(), archive);
+ }
+
+ functions.functionsDefinitions.add(cntDef);
+ } catch (Throwable t) {
+ log.warn("Failed to load function from {}", archive, t);
+ }
+ }
+ }
+
+ Collections.sort(functions.functionsDefinitions,
+ (c1, c2) -> String.CASE_INSENSITIVE_ORDER.compare(c1.getName(), c2.getName()));
+
+ return functions;
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java
new file mode 100644
index 0000000..a7538a8
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.functions;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import lombok.Data;
+
+import org.apache.pulsar.common.functions.FunctionDefinition;
+
+@Data
+public class Functions {
+ final List<FunctionDefinition> functionsDefinitions = new ArrayList<>();
+ final Map<String, Path> functions = new TreeMap<>();
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index ad93ba2..ccf00d5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -78,16 +78,18 @@ public class FunctionActioner {
private final RuntimeFactory runtimeFactory;
private final Namespace dlogNamespace;
private final ConnectorsManager connectorsManager;
+ private final FunctionsManager functionsManager;
private final PulsarAdmin pulsarAdmin;
public FunctionActioner(WorkerConfig workerConfig,
RuntimeFactory runtimeFactory,
Namespace dlogNamespace,
- ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin) {
+ ConnectorsManager connectorsManager,FunctionsManager functionsManager,PulsarAdmin pulsarAdmin) {
this.workerConfig = workerConfig;
this.runtimeFactory = runtimeFactory;
this.dlogNamespace = dlogNamespace;
this.connectorsManager = connectorsManager;
+ this.functionsManager = functionsManager;
this.pulsarAdmin = pulsarAdmin;
}
@@ -439,6 +441,10 @@ public class FunctionActioner {
}
}
+ if (!StringUtils.isEmpty(functionDetails.getBuiltin())) {
+ return functionsManager.getFunctionArchive(functionDetails.getBuiltin()).toFile();
+ }
+
throw new IOException("Could not find built in archive definition");
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 5ac73d9..075146e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -131,7 +131,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace,
- MembershipManager membershipManager, ConnectorsManager connectorsManager,
+ MembershipManager membershipManager, ConnectorsManager connectorsManager, FunctionsManager functionsManager,
FunctionMetaDataManager functionMetaDataManager) throws Exception {
this.workerConfig = workerConfig;
this.workerService = workerService;
@@ -196,7 +196,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
this.runtimeFactory.initialize(workerConfig, authConfig, secretsProviderConfigurator, functionAuthProvider, runtimeCustomizer);
this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
- dlogNamespace, connectorsManager, workerService.getBrokerAdmin());
+ dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin());
this.membershipManager = membershipManager;
this.functionMetaDataManager = functionMetaDataManager;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
new file mode 100644
index 0000000..427cc1e
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.common.functions.FunctionDefinition;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.functions.Functions;
+@Slf4j
+public class FunctionsManager {
+
+ private Functions functions;
+
+ public FunctionsManager(WorkerConfig workerConfig) throws IOException {
+ this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
+ }
+
+ public List<FunctionDefinition> getFunctions() {
+ return functions.getFunctionsDefinitions();
+ }
+
+ public Path getFunctionArchive(String functionType) {
+ return functions.getFunctions().get(functionType);
+ }
+
+ public void reloadFunctions(WorkerConfig workerConfig) throws IOException {
+ this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
+ }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 6fe7500..edaade6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -66,6 +66,7 @@ public class WorkerService {
private AuthenticationService authenticationService;
private AuthorizationService authorizationService;
private ConnectorsManager connectorsManager;
+ private FunctionsManager functionsManager;
private PulsarAdmin brokerAdmin;
private PulsarAdmin functionAdmin;
private final MetricsGenerator metricsGenerator;
@@ -170,6 +171,7 @@ public class WorkerService {
this.workerConfig, this.schedulerManager, this.client);
this.connectorsManager = new ConnectorsManager(workerConfig);
+ this.functionsManager = new FunctionsManager(workerConfig);
//create membership manager
String coordinationTopic = workerConfig.getClusterCoordinationTopic();
@@ -180,7 +182,7 @@ public class WorkerService {
// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
- this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionMetaDataManager);
+ this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionsManager, functionMetaDataManager);
// Setting references to managers in scheduler
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index a4e46a8..5cad320 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -309,6 +309,10 @@ public final class WorkerUtils {
}
}
+ if (!StringUtils.isEmpty(functionDetails.getBuiltin())) {
+ return true;
+ }
+
return false;
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index d806c69..3ed7f38 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
+import org.apache.commons.lang3.StringUtils;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import javax.ws.rs.WebApplicationException;
@@ -48,6 +49,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.nio.file.Path;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -651,11 +653,30 @@ public class FunctionsImpl extends ComponentImpl {
final File componentPackageFile) throws IOException {
// The rest end points take precedence over whatever is there in function config
+ Path archivePath = null;
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(componentName);
FunctionConfigUtils.inferMissingArguments(functionConfig);
- ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
+
+ if (!StringUtils.isEmpty(functionConfig.getJar())) {
+ String builtinArchive = functionConfig.getJar();
+ if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+ builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
+ }
+ try {
+ archivePath = this.worker().getFunctionsManager().getFunctionArchive(builtinArchive);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("No Function archive %s found", archivePath));
+ }
+ }
+ ClassLoader clsLoader = null;
+ if(archivePath != null){
+ clsLoader = FunctionConfigUtils.validate(functionConfig, archivePath.toFile());
+ }
+ else{
+ clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
+ }
return FunctionConfigUtils.convert(functionConfig, clsLoader);
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index e066404..dc49036 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -78,7 +78,7 @@ public class FunctionActionerTest {
@SuppressWarnings("resource")
FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
- new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
+ new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class));
Runtime runtime = mock(Runtime.class);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
@@ -124,7 +124,7 @@ public class FunctionActionerTest {
@SuppressWarnings("resource")
FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
- new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
+ new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class));
// (1) test with file url. functionActioner should be able to consider file-url and it should be able to call
// RuntimeSpawner
@@ -186,7 +186,7 @@ public class FunctionActionerTest {
@SuppressWarnings("resource")
FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
- new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
+ new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class));
String pkgPathLocation = "http://invalid/my-file.jar";
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index a6569b3..2e6ed5b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -101,6 +101,7 @@ public class FunctionRuntimeManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class)));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -183,6 +184,7 @@ public class FunctionRuntimeManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class)));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -268,6 +270,7 @@ public class FunctionRuntimeManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -397,6 +400,7 @@ public class FunctionRuntimeManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -574,6 +578,7 @@ public class FunctionRuntimeManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -632,7 +637,7 @@ public class FunctionRuntimeManagerTest {
FunctionActioner functionActioner = spy(new FunctionActioner(
workerConfig,
- kubernetesRuntimeFactory, null, null, null));
+ kubernetesRuntimeFactory, null, null, null, null));
// test new assignment update functions
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
@@ -641,6 +646,7 @@ public class FunctionRuntimeManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
@@ -744,6 +750,7 @@ public class FunctionRuntimeManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
fail();
@@ -767,6 +774,7 @@ public class FunctionRuntimeManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
fail();
@@ -790,6 +798,7 @@ public class FunctionRuntimeManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
fail();
@@ -813,6 +822,7 @@ public class FunctionRuntimeManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
@@ -841,6 +851,7 @@ public class FunctionRuntimeManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), KubernetesRuntimeFactory.class);
@@ -868,6 +879,7 @@ public class FunctionRuntimeManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ProcessRuntimeFactory.class);
@@ -891,6 +903,7 @@ public class FunctionRuntimeManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 2d0460b..14d1044 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -154,6 +154,7 @@ public class MembershipManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
functionMetaDataManager));
MembershipManager membershipManager = spy(new MembershipManager(workerService, pulsarClient, pulsarAdmin));
@@ -226,6 +227,7 @@ public class MembershipManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
functionMetaDataManager));
MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
@@ -313,6 +315,7 @@ public class MembershipManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
functionMetaDataManager));
MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
@@ -392,6 +395,7 @@ public class MembershipManagerTest {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
functionMetaDataManager));
MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));