You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ay...@apache.org on 2022/06/24 20:13:42 UTC
[pulsar] branch master updated: [fix][functions] Handle uploading of builtin Functions to BK if uploadBuiltinSinksSources is true (#16111)
This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b23689930e3 [fix][functions] Handle uploading of builtin Functions to BK if uploadBuiltinSinksSources is true (#16111)
b23689930e3 is described below
commit b23689930e3af65cfc913e2890164a071acda440
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Jun 24 22:13:31 2022 +0200
[fix][functions] Handle uploading of builtin Functions to BK if uploadBuiltinSinksSources is true (#16111)
* Don't upload builtin Functions to BK if uploadBuiltinSinksSources is false
* Fix builtin Function package location path
---
conf/functions_worker.yml | 2 +-
.../pulsar/functions/worker/WorkerConfig.java | 2 +-
pulsar-functions/worker/pom.xml | 18 +++
.../functions/worker/rest/api/ComponentImpl.java | 36 ++++--
.../rest/api/v3/FunctionApiV3ResourceTest.java | 131 +++++++++++++++++++++
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 2 -
6 files changed, 175 insertions(+), 16 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 7a01536117e..6929de9f1be 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -159,7 +159,7 @@ functionRuntimeFactoryConfigs:
#### Kubernetes Runtime ####
# Pulsar function are deployed to Kubernetes
-# Upload the builtin sources/sinks to BookKeeper.
+# Upload the builtin sources/sinks/functions to BookKeeper.
# True by default.
# uploadBuiltinSinksSources: true
#functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 438f1ab992d..8363f1313b9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -242,7 +242,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private Boolean validateConnectorConfig = false;
@FieldContext(
category = CATEGORY_FUNCTIONS,
- doc = "Should the builtin sources/sinks be uploaded for the externally managed runtimes?"
+ doc = "Should the builtin sources/sinks/functions be uploaded for the externally managed runtimes?"
)
private Boolean uploadBuiltinSinksSources = true;
@FieldContext(
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index cef4624a91e..bead0bd2f1b 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -175,6 +175,14 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-api-examples-builtin</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
@@ -217,6 +225,15 @@
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-functions-api-examples.jar</destFileName>
</artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-api-examples-builtin</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ <outputDirectory>${project.build.directory}</outputDirectory>
+ <destFileName>pulsar-functions-api-examples.nar</destFileName>
+ </artifactItem>
</artifactItems>
</configuration>
</execution>
@@ -230,6 +247,7 @@
<systemPropertyVariables>
<pulsar-io-data-generator.nar.path>${project.build.directory}/pulsar-io-data-generator.nar</pulsar-io-data-generator.nar.path>
<pulsar-functions-api-examples.jar.path>${project.build.directory}/pulsar-functions-api-examples.jar</pulsar-functions-api-examples.jar.path>
+ <pulsar-functions-api-examples.nar.path>${project.build.directory}/pulsar-functions-api-examples.nar</pulsar-functions-api-examples.nar.path>
<pulsar-io-cassandra.nar.path>${project.build.directory}/pulsar-io-cassandra.nar</pulsar-io-cassandra.nar.path>
<pulsar-io-twitter.nar.path>${project.build.directory}/pulsar-io-twitter.nar</pulsar-io-twitter.nar.path>
<!-- valid jar file that is not a valid nar file -->
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index e1103214b32..999cae8ea09 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -312,24 +312,32 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
// if the function worker image does not include connectors
if (isBuiltin) {
if (worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
- File sinkOrSource;
- if (componentType == FunctionDetails.ComponentType.SOURCE) {
- String archiveName = functionDetails.getSource().getBuiltin();
- sinkOrSource = worker().getConnectorsManager().getSourceArchive(archiveName).toFile();
- } else {
- String archiveName = functionDetails.getSink().getBuiltin();
- sinkOrSource = worker().getConnectorsManager().getSinkArchive(archiveName).toFile();
- }
+ File component;
+ String archiveName;
+ switch (componentType) {
+ case SOURCE:
+ archiveName = functionDetails.getSource().getBuiltin();
+ component = worker().getConnectorsManager().getSourceArchive(archiveName).toFile();
+ break;
+ case SINK:
+ archiveName = functionDetails.getSink().getBuiltin();
+ component = worker().getConnectorsManager().getSinkArchive(archiveName).toFile();
+ break;
+ default:
+ archiveName = functionDetails.getBuiltin();
+ component = worker().getFunctionsManager().getFunctionArchive(archiveName).toFile();
+ break;
+ }
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
- sinkOrSource.getName()));
- packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName());
+ component.getName()));
+ packageLocationMetaDataBuilder.setOriginalFileName(component.getName());
if (isPackageManagementEnabled) {
packageLocationMetaDataBuilder.setPackagePath(packageName.toString());
worker().getBrokerAdmin().packages().upload(metadata,
- packageName.toString(), sinkOrSource.getAbsolutePath());
+ packageName.toString(), component.getAbsolutePath());
} else {
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(),
- sinkOrSource, worker().getDlogNamespace());
+ component, worker().getDlogNamespace());
}
log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType),
packageLocationMetaDataBuilder.getPackagePath());
@@ -1611,6 +1619,10 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
}
}
+ if (!isEmpty(functionDetails.getBuiltin())) {
+ return functionDetails.getBuiltin();
+ }
+
return null;
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 2edf48c95a9..689e7ccbd48 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -32,6 +32,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -42,6 +43,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Consumer;
@@ -57,6 +59,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
@@ -72,11 +75,13 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.LeaderService;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -145,6 +150,16 @@ public class FunctionApiV3ResourceTest {
private PulsarFunctionTestTemporaryDirectory tempDirectory;
private static Map<String, MockedStatic> mockStaticContexts = new HashMap<>();
+ private static final String SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH =
+ "pulsar-functions-api-examples.nar.path";
+
+ public static File getPulsarApiExamplesNar() {
+ return new File(Objects.requireNonNull(
+ System.getProperty(SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH)
+ , "pulsar-functions-api-examples.nar file location must be specified with "
+ + SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH + " system property"));
+ }
+
@BeforeMethod
public void setup() throws Exception {
this.mockedManager = mock(FunctionMetaDataManager.class);
@@ -730,6 +745,122 @@ public class FunctionApiV3ResourceTest {
}
}
+ /*
+ Externally managed runtime,
+ uploadBuiltinSinksSources == false
+ Make sure uploadFileToBookkeeper is not called
+ */
+ @Test
+ public void testRegisterFunctionSuccessK8sNoUpload() throws Exception {
+ mockedWorkerService.getWorkerConfig().setUploadBuiltinSinksSources(false);
+
+ mockStatic(WorkerUtils.class, ctx -> {
+ ctx.when(() -> WorkerUtils.uploadFileToBookkeeper(
+ anyString(),
+ any(File.class),
+ any(Namespace.class)))
+ .thenThrow(new RuntimeException("uploadFileToBookkeeper triggered"));
+
+ });
+
+ NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
+ mockStatic(FunctionCommon.class, ctx -> {
+ ctx.when(() -> FunctionCommon.getFunctionTypes(any(FunctionConfig.class), any(Class.class))).thenReturn(new Class[]{String.class, String.class});
+ ctx.when(() -> FunctionCommon.convertRuntime(any(FunctionConfig.Runtime.class))).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
+
+ });
+
+ doReturn(Function.class).when(mockedClassLoader).loadClass(anyString());
+
+ FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
+ FunctionArchive functionArchive = FunctionArchive.builder()
+ .classLoader(mockedClassLoader)
+ .build();
+ when(mockedFunctionsManager.getFunction("exclamation")).thenReturn(functionArchive);
+
+ when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);
+
+ when(mockedRuntimeFactory.externallyManaged()).thenReturn(true);
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+
+ FunctionConfig functionConfig = createDefaultFunctionConfig();
+ functionConfig.setJar("builtin://exclamation");
+
+ try (FileInputStream inputStream = new FileInputStream(getPulsarApiExamplesNar())) {
+ resource.registerFunction(
+ tenant,
+ namespace,
+ function,
+ inputStream,
+ mockedFormData,
+ null,
+ functionConfig,
+ null, null);
+ }
+ }
+
+ /*
+ Externally managed runtime,
+ uploadBuiltinSinksSources == true
+ Make sure uploadFileToBookkeeper is called
+ */
+ @Test
+ public void testRegisterFunctionSuccessK8sWithUpload() throws Exception {
+ final String injectedErrMsg = "uploadFileToBookkeeper triggered";
+ mockedWorkerService.getWorkerConfig().setUploadBuiltinSinksSources(true);
+
+ mockStatic(WorkerUtils.class, ctx -> {
+ ctx.when(() -> WorkerUtils.uploadFileToBookkeeper(
+ anyString(),
+ any(File.class),
+ any(Namespace.class)))
+ .thenThrow(new RuntimeException(injectedErrMsg));
+
+ });
+
+ NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
+ mockStatic(FunctionCommon.class, ctx -> {
+ ctx.when(() -> FunctionCommon.getFunctionTypes(any(FunctionConfig.class), any(Class.class))).thenReturn(new Class[]{String.class, String.class});
+ ctx.when(() -> FunctionCommon.convertRuntime(any(FunctionConfig.Runtime.class))).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
+ });
+
+ doReturn(Function.class).when(mockedClassLoader).loadClass(anyString());
+
+ FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
+ FunctionArchive functionArchive = FunctionArchive.builder()
+ .classLoader(mockedClassLoader)
+ .build();
+ when(mockedFunctionsManager.getFunction("exclamation")).thenReturn(functionArchive);
+ when(mockedFunctionsManager.getFunctionArchive(any())).thenReturn(getPulsarApiExamplesNar().toPath());
+
+ when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);
+
+ when(mockedRuntimeFactory.externallyManaged()).thenReturn(true);
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+
+ FunctionConfig functionConfig = createDefaultFunctionConfig();
+ functionConfig.setJar("builtin://exclamation");
+
+ try (FileInputStream inputStream = new FileInputStream(getPulsarApiExamplesNar())) {
+ try {
+ resource.registerFunction(
+ tenant,
+ namespace,
+ function,
+ inputStream,
+ mockedFormData,
+ null,
+ functionConfig,
+ null, null);
+ Assert.fail();
+ } catch (RuntimeException e) {
+ Assert.assertEquals(e.getMessage(), injectedErrMsg);
+ }
+ }
+ }
+
//
// Update Functions
//
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 1e588e48016..791273cf537 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -1600,7 +1600,6 @@ public class SinkApiV3ResourceTest {
ctx.when(() -> FunctionCommon.getSinkType(any())).thenReturn(String.class);
ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
- ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mockedClassLoader);
});
@@ -1655,7 +1654,6 @@ public class SinkApiV3ResourceTest {
ctx.when(() -> FunctionCommon.getSinkType(any())).thenReturn(String.class);
ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
- ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mockedClassLoader);
});