You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/10/27 03:51:58 UTC

[pulsar] branch branch-2.8 updated: allow download package from package management service (#14814)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 7837ecb36a4 allow download package from package management service (#14814)
7837ecb36a4 is described below

commit 7837ecb36a49ee8b48a290065e3795362b1b8b7e
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Tue Apr 5 03:48:05 2022 +0800

    allow download package from package management service (#14814)
    
    (cherry picked from commit 7cc5cf1fadf8835fcc3166494f0e05632894da43)
---
 .../pulsar/functions/worker/FunctionActioner.java  |  7 +++-
 .../functions/worker/FunctionActionerTest.java     | 49 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 6497e154789..e069be74569 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -68,6 +68,7 @@ import java.util.stream.Collectors;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.common.functions.Utils.FILE;
 import static org.apache.pulsar.common.functions.Utils.HTTP;
+import static org.apache.pulsar.common.functions.Utils.hasPackageTypePrefix;
 import static org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported;
 import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
 import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
@@ -192,7 +193,8 @@ public class FunctionActioner {
         return instanceConfig;
     }
 
-    private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData, int instanceId) throws FileNotFoundException, IOException {
+    private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData,
+                              int instanceId) throws FileNotFoundException, IOException, PulsarAdminException {
 
         FunctionDetails details = functionMetaData.getFunctionDetails();
         File pkgDir = pkgFile.getParentFile();
@@ -214,12 +216,15 @@ public class FunctionActioner {
         }
         String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath();
         boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(HTTP);
+        boolean downloadFromPackageManagementService = isPkgUrlProvided && hasPackageTypePrefix(pkgLocationPath);
         log.info("{}/{}/{} Function package file {} will be downloaded from {}", tempPkgFile, details.getTenant(),
                 details.getNamespace(), details.getName(),
                 downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation());
 
         if(downloadFromHttp) {
             FunctionCommon.downloadFromHttpUrl(pkgLocationPath, tempPkgFile);
+        } else if (downloadFromPackageManagementService) {
+            getPulsarAdmin().packages().download(pkgLocationPath, tempPkgFile.getPath());
         } else {
             FileOutputStream tempPkgFos = new FileOutputStream(tempPkgFile);
             WorkerUtils.downloadFromBookkeeper(
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index dc49036a5be..7502e247d99 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.Packages;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
@@ -220,4 +221,52 @@ public class FunctionActionerTest {
         verify(functionAuthProvider.get(), times(0)).cleanUpAuthData(any(), any());
     }
 
+    @Test
+    public void testStartFunctionWithPackageUrl() throws Exception {
+
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+        workerConfig.setStateStorageServiceUrl("foo");
+        workerConfig.setFunctionAssignmentTopicName("assignments");
+        String downloadDir = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+        workerConfig.setDownloadDirectory(downloadDir);
+
+        RuntimeFactory factory = mock(RuntimeFactory.class);
+        Runtime runtime = mock(Runtime.class);
+        doReturn(runtime).when(factory).createContainer(any(), any(), any(), any());
+        doNothing().when(runtime).start();
+        Namespace dlogNamespace = mock(Namespace.class);
+        final String exceptionMsg = "dl namespace not-found";
+        doThrow(new IllegalArgumentException(exceptionMsg)).when(dlogNamespace).openLog(any());
+        PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
+        Packages packages = mock(Packages.class);
+        doReturn(packages).when(pulsarAdmin).packages();
+        doNothing().when(packages).download(any(), any());
+
+        @SuppressWarnings("resource")
+        FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
+                new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), pulsarAdmin);
+
+        // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call
+        // RuntimeSpawner
+        String pkgPathLocation = "function://public/default/test-function@latest";
+        Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
+                .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
+                        .setNamespace("test-namespace").setName("func-1"))
+                .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build())
+                .build();
+        Function.Instance instance = Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
+                .build();
+        FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
+        doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
+
+        actioner.startFunction(functionRuntimeInfo);
+        verify(runtime, times(1)).start();
+    }
+
 }