You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/09/30 20:05:49 UTC

[pulsar] branch master updated: [fix][functions] Fix the download of builtin Functions (#17877)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6651bbbab5b [fix][functions] Fix the download of builtin Functions (#17877)
6651bbbab5b is described below

commit 6651bbbab5b33f09cdde83de048d8116b2835de6
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Sep 30 22:05:35 2022 +0200

    [fix][functions] Fix the download of builtin Functions (#17877)
---
 .../functions/worker/rest/api/ComponentImpl.java   |  46 ++++--
 .../rest/api/v3/FunctionApiV3ResourceTest.java     | 171 +++++++++++++--------
 2 files changed, 136 insertions(+), 81 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 132641c8f01..7cd35352bd2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -46,7 +46,6 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
-import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -102,7 +101,7 @@ import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
 import org.apache.pulsar.functions.utils.functions.FunctionArchive;
-import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.io.Connector;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -1473,10 +1472,18 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
                 ? functionMetaData.getTransformFunctionPackageLocation().getPackagePath()
                 : functionMetaData.getPackageLocation().getPackagePath();
 
-        return getStreamingOutput(pkgPath);
+        FunctionDetails.ComponentType componentType = transformFunction
+                ? FunctionDetails.ComponentType.FUNCTION
+                : InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails());
+
+        return getStreamingOutput(pkgPath, componentType);
     }
 
     private StreamingOutput getStreamingOutput(String pkgPath) {
+        return getStreamingOutput(pkgPath, null);
+    }
+
+    private StreamingOutput getStreamingOutput(String pkgPath, FunctionDetails.ComponentType componentType) {
         return output -> {
             if (pkgPath.startsWith(Utils.HTTP)) {
                 URL url = URI.create(pkgPath).toURL();
@@ -1489,15 +1496,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
                 Files.copy(file.toPath(), output);
             } else if (pkgPath.startsWith(Utils.BUILTIN)
                     && !worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
-                String sType = pkgPath.replaceFirst("^builtin://", "");
-                final String connectorsDir = worker().getWorkerConfig().getConnectorsDirectory();
-                log.warn("Processing package {} ; looking at the dir {}", pkgPath, connectorsDir);
-                TreeMap<String, FunctionArchive> sinksOrSources =
-                        FunctionUtils.searchForFunctions(connectorsDir, true);
-                Path narPath = sinksOrSources.get(sType).getArchivePath();
-                if (narPath == null) {
-                    throw new IllegalStateException("Didn't find " + pkgPath + " in " + connectorsDir);
-                }
+                Path narPath = getBuiltinArchivePath(pkgPath, componentType);
                 log.info("Loading {} from {}", pkgPath, narPath);
                 try (InputStream in = new FileInputStream(narPath.toString())) {
                     IOUtils.copy(in, output, 1024);
@@ -1511,7 +1510,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
                         output.flush();
                     }
                 } catch (Exception e) {
-                    log.error("Failed download package {} from packageMangment Service", pkgPath, e);
+                    log.error("Failed download package {} from packageManagement Service", pkgPath, e);
 
                 }
             } else {
@@ -1520,6 +1519,27 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
         };
     }
 
+    private Path getBuiltinArchivePath(String pkgPath, FunctionDetails.ComponentType componentType) {
+        String type = pkgPath.replaceFirst("^builtin://", "");
+        if (!FunctionDetails.ComponentType.FUNCTION.equals(componentType)) {
+            Connector connector = worker().getConnectorsManager().getConnector(type);
+            if (connector != null) {
+                return connector.getArchivePath();
+            }
+            if (componentType != null) {
+                throw new IllegalStateException("Didn't find " + type + " in built-in connectors");
+            }
+        }
+        FunctionArchive function = worker().getFunctionsManager().getFunction(type);
+        if (function != null) {
+            return function.getArchivePath();
+        }
+        if (componentType != null) {
+            throw new IllegalStateException("Didn't find " + type + " in built-in functions");
+        }
+        throw new IllegalStateException("Didn't find " + type + " in built-in connectors or functions");
+    }
+
     @Override
     public StreamingOutput downloadFunction(
             final String path, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 8d19869b473..fb09a4026a5 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
@@ -29,7 +28,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.FileInputStream;
@@ -44,7 +42,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.function.Consumer;
 import javax.ws.rs.core.Response;
@@ -78,7 +75,8 @@ import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.functions.FunctionArchive;
-import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.io.Connector;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.FunctionsManager;
@@ -1604,20 +1602,13 @@ public class FunctionApiV3ResourceTest {
         String jarHttpUrl =
                 "https://repo1.maven.org/maven2/org/apache/pulsar/pulsar-common/2.4.2/pulsar-common-2.4.2.jar";
         String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        PulsarWorkerService worker = mock(PulsarWorkerService.class);
-        doReturn(true).when(worker).isInitialized();
-        WorkerConfig config = mock(WorkerConfig.class);
-        when(config.isAuthorizationEnabled()).thenReturn(false);
-        when(worker.getWorkerConfig()).thenReturn(config);
-        FunctionsImpl function = new FunctionsImpl(() -> worker);
-        StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl, null, null);
+
+        StreamingOutput streamOutput = resource.downloadFunction(jarHttpUrl, null, null);
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
         streamOutput.write(output);
         Assert.assertTrue(pkgFile.exists());
-        if (pkgFile.exists()) {
-            pkgFile.delete();
-        }
+        pkgFile.delete();
     }
 
     @Test
@@ -1626,53 +1617,61 @@ public class FunctionApiV3ResourceTest {
         File file = Paths.get(fileUrl.toURI()).toFile();
         String fileLocation = file.getAbsolutePath().replace('\\', '/');
         String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        PulsarWorkerService worker = mock(PulsarWorkerService.class);
-        doReturn(true).when(worker).isInitialized();
-        WorkerConfig config = mock(WorkerConfig.class);
-        when(config.isAuthorizationEnabled()).thenReturn(false);
-        when(worker.getWorkerConfig()).thenReturn(config);
-        FunctionsImpl function = new FunctionsImpl(() -> worker);
-        StreamingOutput streamOutput = function.downloadFunction("file:///" + fileLocation, null, null);
+
+        StreamingOutput streamOutput = resource.downloadFunction("file:///" + fileLocation, null, null);
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
         streamOutput.write(output);
         Assert.assertTrue(pkgFile.exists());
-        if (pkgFile.exists()) {
-            pkgFile.delete();
-        }
+        Assert.assertEquals(file.length(), pkgFile.length());
+        pkgFile.delete();
     }
 
     @Test
-    public void testDownloadFunctionBuiltin() throws Exception {
-        mockStatic(WorkerUtils.class, ctx -> {
-        });
-
+    public void testDownloadFunctionBuiltinConnector() throws Exception {
         URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
         File file = Paths.get(fileUrl.toURI()).toFile();
         String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
 
-        PulsarWorkerService worker = mock(PulsarWorkerService.class);
-        doReturn(true).when(worker).isInitialized();
+        WorkerConfig config = new WorkerConfig()
+            .setUploadBuiltinSinksSources(false);
+        when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
 
-        WorkerConfig config = mock(WorkerConfig.class);
-        when(config.isAuthorizationEnabled()).thenReturn(false);
-        when(config.getUploadBuiltinSinksSources()).thenReturn(false);
-        when(config.getConnectorsDirectory()).thenReturn("/connectors");
+        Connector connector = Connector.builder().archivePath(file.toPath()).build();
+        ConnectorsManager connectorsManager = mock(ConnectorsManager.class);
+        when(connectorsManager.getConnector("cassandra")).thenReturn(connector);
+        when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager);
 
-        when(worker.getDlogNamespace()).thenReturn(mock(Namespace.class));
-        when(worker.getWorkerConfig()).thenReturn(config);
-        FunctionsImpl function = new FunctionsImpl(() -> worker);
+        StreamingOutput streamOutput = resource.downloadFunction("builtin://cassandra", null, null);
 
-        TreeMap<String, FunctionArchive> functions = new TreeMap<>();
-        FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
-        functions.put("cassandra", functionArchive);
+        File pkgFile = new File(testDir, UUID.randomUUID().toString());
+        OutputStream output = new FileOutputStream(pkgFile);
+        streamOutput.write(output);
+        output.flush();
+        output.close();
+        Assert.assertTrue(pkgFile.exists());
+        Assert.assertTrue(pkgFile.exists());
+        Assert.assertEquals(file.length(), pkgFile.length());
+        pkgFile.delete();
+    }
 
-        mockStatic(FunctionUtils.class, ctx -> {
-            ctx.when(() -> FunctionUtils.searchForFunctions(anyString(), anyBoolean())).thenReturn(functions);
+    @Test
+    public void testDownloadFunctionBuiltinFunction() throws Exception {
+        URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
+        File file = Paths.get(fileUrl.toURI()).toFile();
+        String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
 
-        });
+        WorkerConfig config = new WorkerConfig()
+            .setUploadBuiltinSinksSources(false);
+        when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
+
+        FunctionsManager functionsManager = mock(FunctionsManager.class);
+        FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
+        when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive);
+        when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class));
+        when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager);
 
-        StreamingOutput streamOutput = function.downloadFunction("builtin://cassandra", null, null);
+        StreamingOutput streamOutput = resource.downloadFunction("builtin://exclamation", null, null);
 
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
@@ -1680,71 +1679,107 @@ public class FunctionApiV3ResourceTest {
         output.flush();
         output.close();
         Assert.assertTrue(pkgFile.exists());
-        if (pkgFile.exists()) {
-            Assert.assertEquals(file.length(), pkgFile.length());
-            pkgFile.delete();
-        } else {
-            fail("expected file");
-        }
+        Assert.assertEquals(file.length(), pkgFile.length());
+        pkgFile.delete();
     }
 
     @Test
-    public void testDownloadFunctionByName() throws Exception {
+    public void testDownloadFunctionBuiltinConnectorByName() throws Exception {
         URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
         File file = Paths.get(fileUrl.toURI()).toFile();
-        String fileLocation = file.getAbsolutePath().replace('\\', '/');
         String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        doReturn(true).when(mockedWorkerService).isInitialized();
-        WorkerConfig config = mock(WorkerConfig.class);
-        when(config.isAuthorizationEnabled()).thenReturn(false);
+        WorkerConfig config = new WorkerConfig()
+            .setUploadBuiltinSinksSources(false);
         when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
         FunctionMetaData metaData = FunctionMetaData.newBuilder()
-                .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("file:///" + fileLocation))
+                .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("builtin://cassandra"))
                 .setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid"))
+                .setFunctionDetails(FunctionDetails.newBuilder().setComponentType(FunctionDetails.ComponentType.SINK))
                 .build();
         when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
 
+        Connector connector = Connector.builder().archivePath(file.toPath()).build();
+        ConnectorsManager connectorsManager = mock(ConnectorsManager.class);
+        when(connectorsManager.getConnector("cassandra")).thenReturn(connector);
+        when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager);
+
         StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, false);
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
         streamOutput.write(output);
         Assert.assertTrue(pkgFile.exists());
-        if (pkgFile.exists()) {
-            pkgFile.delete();
-        }
+        Assert.assertEquals(file.length(), pkgFile.length());
+        pkgFile.delete();
     }
 
     @Test
-    public void testDownloadTransformFunctionByName() throws Exception {
+    public void testDownloadFunctionBuiltinFunctionByName() throws Exception {
         URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
         File file = Paths.get(fileUrl.toURI()).toFile();
-        String fileLocation = file.getAbsolutePath().replace('\\', '/');
         String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        doReturn(true).when(mockedWorkerService).isInitialized();
-        WorkerConfig config = mock(WorkerConfig.class);
-        when(config.isAuthorizationEnabled()).thenReturn(false);
+        WorkerConfig config = new WorkerConfig()
+            .setUploadBuiltinSinksSources(false);
         when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
+        FunctionMetaData metaData = FunctionMetaData.newBuilder()
+            .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("builtin://exclamation"))
+            .setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid"))
+            .setFunctionDetails(FunctionDetails.newBuilder().setComponentType(FunctionDetails.ComponentType.FUNCTION))
+            .build();
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
+
+        FunctionsManager functionsManager = mock(FunctionsManager.class);
+        FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
+        when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive);
+        when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class));
+        when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager);
+
+        StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, false);
+        File pkgFile = new File(testDir, UUID.randomUUID().toString());
+        OutputStream output = new FileOutputStream(pkgFile);
+        streamOutput.write(output);
+        Assert.assertTrue(pkgFile.exists());
+        Assert.assertEquals(file.length(), pkgFile.length());
+        pkgFile.delete();
+    }
+
+    @Test
+    public void testDownloadTransformFunctionByName() throws Exception {
+        URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
+        File file = Paths.get(fileUrl.toURI()).toFile();
+        String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+
+        WorkerConfig workerConfig = new WorkerConfig()
+            .setUploadBuiltinSinksSources(false);
+        when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
         FunctionMetaData metaData = FunctionMetaData.newBuilder()
                 .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid"))
                 .setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder()
-                        .setPackagePath("file:///" + fileLocation))
+                        .setPackagePath("builtin://exclamation"))
                 .build();
         when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
 
+        FunctionsManager functionsManager = mock(FunctionsManager.class);
+        FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
+        when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive);
+        when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class));
+        when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager);
+
         StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, true);
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
         streamOutput.write(output);
         Assert.assertTrue(pkgFile.exists());
-        if (pkgFile.exists()) {
-            pkgFile.delete();
-        }
+        Assert.assertEquals(file.length(), pkgFile.length());
+        pkgFile.delete();
     }