You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ay...@apache.org on 2022/06/24 20:13:42 UTC

[pulsar] branch master updated: [fix][functions] Handle uploading of builtin Functions to BK if uploadBuiltinSinksSources is true (#16111)

This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b23689930e3 [fix][functions] Handle uploading of builtin Functions to BK if uploadBuiltinSinksSources is true (#16111)
b23689930e3 is described below

commit b23689930e3af65cfc913e2890164a071acda440
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Jun 24 22:13:31 2022 +0200

    [fix][functions] Handle uploading of builtin Functions to BK if uploadBuiltinSinksSources is true (#16111)
    
    * Don't upload builtin Functions to BK if uploadBuiltinSinksSources is false
    
    * Fix builtin Function package location path
---
 conf/functions_worker.yml                          |   2 +-
 .../pulsar/functions/worker/WorkerConfig.java      |   2 +-
 pulsar-functions/worker/pom.xml                    |  18 +++
 .../functions/worker/rest/api/ComponentImpl.java   |  36 ++++--
 .../rest/api/v3/FunctionApiV3ResourceTest.java     | 131 +++++++++++++++++++++
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  |   2 -
 6 files changed, 175 insertions(+), 16 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 7a01536117e..6929de9f1be 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -159,7 +159,7 @@ functionRuntimeFactoryConfigs:
 #### Kubernetes Runtime ####
 # Pulsar function are deployed to Kubernetes
 
-# Upload the builtin sources/sinks to BookKeeper.
+# Upload the builtin sources/sinks/functions to BookKeeper.
 # True by default.
 # uploadBuiltinSinksSources: true
 #functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 438f1ab992d..8363f1313b9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -242,7 +242,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     private Boolean validateConnectorConfig = false;
     @FieldContext(
         category = CATEGORY_FUNCTIONS,
-        doc = "Should the builtin sources/sinks be uploaded for the externally managed runtimes?"
+        doc = "Should the builtin sources/sinks/functions be uploaded for the externally managed runtimes?"
     )
     private Boolean uploadBuiltinSinksSources = true;
     @FieldContext(
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index cef4624a91e..bead0bd2f1b 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -175,6 +175,14 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-api-examples-builtin</artifactId>
+      <version>${project.version}</version>
+      <type>pom</type>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
@@ -217,6 +225,15 @@
                   <outputDirectory>${project.build.directory}</outputDirectory>
                   <destFileName>pulsar-functions-api-examples.jar</destFileName>
                 </artifactItem>
+                <artifactItem>
+                  <groupId>${project.groupId}</groupId>
+                  <artifactId>pulsar-functions-api-examples-builtin</artifactId>
+                  <version>${project.version}</version>
+                  <type>jar</type>
+                  <overWrite>true</overWrite>
+                  <outputDirectory>${project.build.directory}</outputDirectory>
+                  <destFileName>pulsar-functions-api-examples.nar</destFileName>
+                </artifactItem>
               </artifactItems>
             </configuration>
           </execution>
@@ -230,6 +247,7 @@
           <systemPropertyVariables>
             <pulsar-io-data-generator.nar.path>${project.build.directory}/pulsar-io-data-generator.nar</pulsar-io-data-generator.nar.path>
             <pulsar-functions-api-examples.jar.path>${project.build.directory}/pulsar-functions-api-examples.jar</pulsar-functions-api-examples.jar.path>
+            <pulsar-functions-api-examples.nar.path>${project.build.directory}/pulsar-functions-api-examples.nar</pulsar-functions-api-examples.nar.path>
             <pulsar-io-cassandra.nar.path>${project.build.directory}/pulsar-io-cassandra.nar</pulsar-io-cassandra.nar.path>
             <pulsar-io-twitter.nar.path>${project.build.directory}/pulsar-io-twitter.nar</pulsar-io-twitter.nar.path>
             <!-- valid jar file that is not a valid nar file -->
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index e1103214b32..999cae8ea09 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -312,24 +312,32 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
             // if the function worker image does not include connectors
             if (isBuiltin) {
                 if (worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
-                    File sinkOrSource;
-                    if (componentType == FunctionDetails.ComponentType.SOURCE) {
-                        String archiveName = functionDetails.getSource().getBuiltin();
-                        sinkOrSource = worker().getConnectorsManager().getSourceArchive(archiveName).toFile();
-                    } else {
-                        String archiveName = functionDetails.getSink().getBuiltin();
-                        sinkOrSource = worker().getConnectorsManager().getSinkArchive(archiveName).toFile();
-                    }
+                    File component;
+                    String archiveName;
+                    switch (componentType) {
+                        case SOURCE:
+                            archiveName = functionDetails.getSource().getBuiltin();
+                            component = worker().getConnectorsManager().getSourceArchive(archiveName).toFile();
+                            break;
+                        case SINK:
+                            archiveName = functionDetails.getSink().getBuiltin();
+                            component = worker().getConnectorsManager().getSinkArchive(archiveName).toFile();
+                            break;
+                        default:
+                            archiveName = functionDetails.getBuiltin();
+                            component = worker().getFunctionsManager().getFunctionArchive(archiveName).toFile();
+                            break;
+                        }
                     packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
-                            sinkOrSource.getName()));
-                    packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName());
+                            component.getName()));
+                    packageLocationMetaDataBuilder.setOriginalFileName(component.getName());
                     if (isPackageManagementEnabled) {
                         packageLocationMetaDataBuilder.setPackagePath(packageName.toString());
                         worker().getBrokerAdmin().packages().upload(metadata,
-                                packageName.toString(), sinkOrSource.getAbsolutePath());
+                                packageName.toString(), component.getAbsolutePath());
                     } else {
                         WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(),
-                                sinkOrSource, worker().getDlogNamespace());
+                                component, worker().getDlogNamespace());
                     }
                     log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType),
                             packageLocationMetaDataBuilder.getPackagePath());
@@ -1611,6 +1619,10 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
             }
         }
 
+        if (!isEmpty(functionDetails.getBuiltin())) {
+            return functionDetails.getBuiltin();
+        }
+
         return null;
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 2edf48c95a9..689e7ccbd48 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -32,6 +32,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -42,6 +43,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.function.Consumer;
@@ -57,6 +59,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RestException;
@@ -72,11 +75,13 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.functions.FunctionArchive;
 import org.apache.pulsar.functions.utils.functions.FunctionUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.FunctionsManager;
 import org.apache.pulsar.functions.worker.LeaderService;
 import org.apache.pulsar.functions.worker.PulsarWorkerService;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -145,6 +150,16 @@ public class FunctionApiV3ResourceTest {
     private PulsarFunctionTestTemporaryDirectory tempDirectory;
     private static Map<String, MockedStatic> mockStaticContexts = new HashMap<>();
 
+    private static final String SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH =
+            "pulsar-functions-api-examples.nar.path";
+
+    public static File getPulsarApiExamplesNar() {
+        return new File(Objects.requireNonNull(
+                System.getProperty(SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH)
+                , "pulsar-functions-api-examples.nar file location must be specified with "
+                        + SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH + " system property"));
+    }
+
     @BeforeMethod
     public void setup() throws Exception {
         this.mockedManager = mock(FunctionMetaDataManager.class);
@@ -730,6 +745,122 @@ public class FunctionApiV3ResourceTest {
         }
     }
 
+    /*
+    Externally managed runtime,
+    uploadBuiltinSinksSources == false
+    Make sure uploadFileToBookkeeper is not called
+    */
+    @Test
+    public void testRegisterFunctionSuccessK8sNoUpload() throws Exception {
+        mockedWorkerService.getWorkerConfig().setUploadBuiltinSinksSources(false);
+
+        mockStatic(WorkerUtils.class, ctx -> {
+            ctx.when(() -> WorkerUtils.uploadFileToBookkeeper(
+                    anyString(),
+                    any(File.class),
+                    any(Namespace.class)))
+                    .thenThrow(new RuntimeException("uploadFileToBookkeeper triggered"));
+
+        });
+
+        NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
+        mockStatic(FunctionCommon.class, ctx -> {
+            ctx.when(() -> FunctionCommon.getFunctionTypes(any(FunctionConfig.class), any(Class.class))).thenReturn(new Class[]{String.class, String.class});
+            ctx.when(() -> FunctionCommon.convertRuntime(any(FunctionConfig.Runtime.class))).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
+
+        });
+
+        doReturn(Function.class).when(mockedClassLoader).loadClass(anyString());
+
+        FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
+        FunctionArchive functionArchive = FunctionArchive.builder()
+                .classLoader(mockedClassLoader)
+                .build();
+        when(mockedFunctionsManager.getFunction("exclamation")).thenReturn(functionArchive);
+
+        when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);
+
+        when(mockedRuntimeFactory.externallyManaged()).thenReturn(true);
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+
+        FunctionConfig functionConfig = createDefaultFunctionConfig();
+        functionConfig.setJar("builtin://exclamation");
+
+        try (FileInputStream inputStream = new FileInputStream(getPulsarApiExamplesNar())) {
+            resource.registerFunction(
+                    tenant,
+                    namespace,
+                    function,
+                    inputStream,
+                    mockedFormData,
+                    null,
+                    functionConfig,
+                    null, null);
+        }
+    }
+
+    /*
+    Externally managed runtime,
+    uploadBuiltinSinksSources == true
+    Make sure uploadFileToBookkeeper is called
+    */
+    @Test
+    public void testRegisterFunctionSuccessK8sWithUpload() throws Exception {
+        final String injectedErrMsg = "uploadFileToBookkeeper triggered";
+        mockedWorkerService.getWorkerConfig().setUploadBuiltinSinksSources(true);
+
+        mockStatic(WorkerUtils.class, ctx -> {
+            ctx.when(() -> WorkerUtils.uploadFileToBookkeeper(
+                    anyString(),
+                    any(File.class),
+                    any(Namespace.class)))
+                    .thenThrow(new RuntimeException(injectedErrMsg));
+
+        });
+
+        NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
+        mockStatic(FunctionCommon.class, ctx -> {
+            ctx.when(() -> FunctionCommon.getFunctionTypes(any(FunctionConfig.class), any(Class.class))).thenReturn(new Class[]{String.class, String.class});
+            ctx.when(() -> FunctionCommon.convertRuntime(any(FunctionConfig.Runtime.class))).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
+        });
+
+        doReturn(Function.class).when(mockedClassLoader).loadClass(anyString());
+
+        FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
+        FunctionArchive functionArchive = FunctionArchive.builder()
+                .classLoader(mockedClassLoader)
+                .build();
+        when(mockedFunctionsManager.getFunction("exclamation")).thenReturn(functionArchive);
+        when(mockedFunctionsManager.getFunctionArchive(any())).thenReturn(getPulsarApiExamplesNar().toPath());
+
+        when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);
+
+        when(mockedRuntimeFactory.externallyManaged()).thenReturn(true);
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+
+        FunctionConfig functionConfig = createDefaultFunctionConfig();
+        functionConfig.setJar("builtin://exclamation");
+
+        try (FileInputStream inputStream = new FileInputStream(getPulsarApiExamplesNar())) {
+            try {
+                resource.registerFunction(
+                        tenant,
+                        namespace,
+                        function,
+                        inputStream,
+                        mockedFormData,
+                        null,
+                        functionConfig,
+                        null, null);
+                Assert.fail();
+            } catch (RuntimeException e) {
+                Assert.assertEquals(e.getMessage(), injectedErrMsg);
+            }
+        }
+    }
+
     //
     // Update Functions
     //
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 1e588e48016..791273cf537 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -1600,7 +1600,6 @@ public class SinkApiV3ResourceTest {
             ctx.when(() -> FunctionCommon.getSinkType(any())).thenReturn(String.class);
             ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
             ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
-            ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
             ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mockedClassLoader);
 
         });
@@ -1655,7 +1654,6 @@ public class SinkApiV3ResourceTest {
             ctx.when(() -> FunctionCommon.getSinkType(any())).thenReturn(String.class);
             ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
             ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
-            ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
             ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mockedClassLoader);
         });