You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/19 01:04:38 UTC

[pulsar] 01/02: [pulsar-admin] allow create functions with package URL (#11666)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d30c5d16525d7cb34a303a9f36915f262714e7f7
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Tue Aug 17 12:59:08 2021 +0800

    [pulsar-admin] allow create functions with package URL (#11666)
    
    Fix https://github.com/apache/pulsar/issues/11665
    
    ### Motivation
    
    Allow user to create function with package URL with pulsar-admin.
    
    ### Modifications
    
    - allow passing valid package URL from pulsar-admin functions
    - added tests
    
    (cherry picked from commit de86f4f615adbe0ce8ac1f5a0d9077153bc38bdb)
---
 .../management/core/common/PackageType.java        |  0
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  | 93 +++++++++++++++++++++-
 .../org/apache/pulsar/common/functions/Utils.java  | 10 ++-
 .../functions/worker/rest/api/FunctionsImpl.java   |  8 +-
 .../functions/worker/rest/api/SinksImpl.java       |  8 +-
 .../functions/worker/rest/api/SourcesImpl.java     |  8 +-
 .../tests/integration/cli/PackagesCliTest.java     | 12 +--
 7 files changed, 111 insertions(+), 28 deletions(-)

diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java
similarity index 100%
rename from pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java
rename to pulsar-client-admin-api/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index d7f68a4..3e32961 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -82,14 +82,18 @@ public class CmdFunctionsTest {
     private static final String JAR_NAME = CmdFunctionsTest.class.getClassLoader().getResource("dummyexamples.jar").getFile();
     private static final String GO_EXEC_FILE_NAME = "test-go-function-with-url";
     private static final String PYTHON_FILE_NAME = "test-go-function-with-url";
-    private static final String URL ="file:" + JAR_NAME;
-    private static final String URL_WITH_GO ="file:" + GO_EXEC_FILE_NAME;
-    private static final String URL_WITH_PY ="file:" + PYTHON_FILE_NAME;
+    private static final String URL = "file:" + JAR_NAME;
+    private static final String URL_WITH_GO = "file:" + GO_EXEC_FILE_NAME;
+    private static final String URL_WITH_PY = "file:" + PYTHON_FILE_NAME;
     private static final String FN_NAME = TEST_NAME + "-function";
     private static final String INPUT_TOPIC_NAME = TEST_NAME + "-input-topic";
     private static final String OUTPUT_TOPIC_NAME = TEST_NAME + "-output-topic";
     private static final String TENANT = TEST_NAME + "-tenant";
     private static final String NAMESPACE = TEST_NAME + "-namespace";
+    private static final String PACKAGE_URL = "function://sample/ns1/jardummyexamples@1";
+    private static final String PACKAGE_GO_URL = "function://sample/ns1/godummyexamples@1";
+    private static final String PACKAGE_PY_URL = "function://sample/ns1/pydummyexamples@1";
+    private static final String PACKAGE_INVALID_URL = "functionsample.jar";
 
     private PulsarAdmin admin;
     private Functions functions;
@@ -362,6 +366,89 @@ public class CmdFunctionsTest {
     }
 
     @Test
+    public void testCreateFunctionWithPackageUrl() throws Exception {
+        cmd.run(new String[] {
+                "create",
+                "--name", FN_NAME,
+                "--inputs", INPUT_TOPIC_NAME,
+                "--output", OUTPUT_TOPIC_NAME,
+                "--jar", PACKAGE_URL,
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        });
+
+        CreateFunction creater = cmd.getCreater();
+
+        assertEquals(FN_NAME, creater.getFunctionName());
+        assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
+        assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
+        verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
+    }
+
+    @Test
+    public void testCreateGoFunctionWithPackageUrl() throws Exception {
+        cmd.run(new String[] {
+                "create",
+                "--name", "test-go-function",
+                "--inputs", INPUT_TOPIC_NAME,
+                "--output", OUTPUT_TOPIC_NAME,
+                "--go", PACKAGE_GO_URL,
+                "--tenant", "sample",
+                "--namespace", "ns1",
+        });
+
+        CreateFunction creater = cmd.getCreater();
+
+        assertEquals("test-go-function", creater.getFunctionName());
+        assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
+        assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
+        verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
+    }
+
+    @Test
+    public void testCreatePyFunctionWithPackageUrl() throws Exception {
+        cmd.run(new String[] {
+                "create",
+                "--name", "test-py-function",
+                "--inputs", INPUT_TOPIC_NAME,
+                "--output", OUTPUT_TOPIC_NAME,
+                "--py", PACKAGE_PY_URL,
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", "process_python_function",
+        });
+
+        CreateFunction creater = cmd.getCreater();
+
+        assertEquals("test-py-function", creater.getFunctionName());
+        assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
+        assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
+        verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
+    }
+
+    @Test
+    public void testCreateFunctionWithInvalidPackageUrl() throws Exception {
+        cmd.run(new String[] {
+                "create",
+                "--name", FN_NAME,
+                "--inputs", INPUT_TOPIC_NAME,
+                "--output", OUTPUT_TOPIC_NAME,
+                "--jar", PACKAGE_INVALID_URL,
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        });
+
+        CreateFunction creater = cmd.getCreater();
+
+        assertEquals(FN_NAME, creater.getFunctionName());
+        assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
+        assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
+        verify(functions, times(0)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
+    }
+
+    @Test
     public void testCreateFunctionWithoutBasicArguments() throws Exception {
         cmd.run(new String[] {
                 "create",
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
index 9629aa8..abc601a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
@@ -21,8 +21,10 @@ package org.apache.pulsar.common.functions;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import java.util.Arrays;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.packages.management.core.common.PackageType;
 
 /**
  * Helper class to work with configuration.
@@ -34,7 +36,13 @@ public class Utils {
 
     public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) {
         return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(HTTP)
-                || functionPkgUrl.startsWith(FILE));
+                || functionPkgUrl.startsWith(FILE)
+                || hasPackageTypePrefix(functionPkgUrl));
+    }
+
+    public static boolean hasPackageTypePrefix(String destPkgUrl) {
+        return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString())
+                && destPkgUrl.contains("://"));
     }
 
     public static void inferMissingFunctionName(FunctionConfig functionConfig) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index c656954..dba3645 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -154,7 +154,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
             // validate parameters
             try {
                 if (isPkgUrlProvided) {
-                    if (hasPackageTypePrefix(functionPkgUrl)) {
+                    if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
                         componentPackageFile = downloadPackageFile(functionPkgUrl);
                     } else {
                         if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
@@ -323,7 +323,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
             // validate parameters
             try {
                 if (isNotBlank(functionPkgUrl)) {
-                    if (hasPackageTypePrefix(functionPkgUrl)) {
+                    if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
                         componentPackageFile = downloadPackageFile(functionName);
                     } else {
                         try {
@@ -759,10 +759,6 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
 
     }
 
-    private static boolean hasPackageTypePrefix(String destPkgUrl) {
-        return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
-    }
-
     private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
         return downloadPackageFile(worker(), packageName);
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index d0d2ed3..dd9e990 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -152,7 +152,7 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
             // validate parameters
             try {
                 if (isPkgUrlProvided) {
-                    if (hasPackageTypePrefix(sinkPkgUrl)) {
+                    if (Utils.hasPackageTypePrefix(sinkPkgUrl)) {
                         componentPackageFile = downloadPackageFile(sinkPkgUrl);
                     } else {
                         if (!Utils.isFunctionPackageUrlSupported(sinkPkgUrl)) {
@@ -322,7 +322,7 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
             // validate parameters
             try {
                 if (isNotBlank(sinkPkgUrl)) {
-                    if (hasPackageTypePrefix(sinkPkgUrl)) {
+                    if (Utils.hasPackageTypePrefix(sinkPkgUrl)) {
                         componentPackageFile = downloadPackageFile(sinkPkgUrl);
                     } else {
                         try {
@@ -742,10 +742,6 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
         return SinkConfigUtils.convert(sinkConfig, sinkDetails);
     }
 
-    private static boolean hasPackageTypePrefix(String destPkgUrl) {
-        return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
-    }
-
     private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
         return FunctionsImpl.downloadPackageFile(worker(), packageName);
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 47c379e..2e3295d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -152,7 +152,7 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
             // validate parameters
             try {
                 if (isPkgUrlProvided) {
-                    if (hasPackageTypePrefix(sourcePkgUrl)) {
+                    if (Utils.hasPackageTypePrefix(sourcePkgUrl)) {
                         componentPackageFile = downloadPackageFile(sourcePkgUrl);
                     } else {
                         if (!Utils.isFunctionPackageUrlSupported(sourcePkgUrl)) {
@@ -320,7 +320,7 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
             // validate parameters
             try {
                 if (isNotBlank(sourcePkgUrl)) {
-                    if (hasPackageTypePrefix(sourcePkgUrl)) {
+                    if (Utils.hasPackageTypePrefix(sourcePkgUrl)) {
                         componentPackageFile = downloadPackageFile(sourcePkgUrl);
                     } else {
                         try {
@@ -739,10 +739,6 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
         return SourceConfigUtils.convert(sourceConfig, sourceDetails);
     }
 
-    private static boolean hasPackageTypePrefix(String destPkgUrl) {
-        return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
-    }
-
     private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
         return FunctionsImpl.downloadPackageFile(worker(), packageName);
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java
index 3d8c6f1..df4b48c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java
@@ -45,9 +45,9 @@ public class PackagesCliTest extends TestRetrySupport {
     public final void setup() throws Exception {
         incrementSetupNumber();
         PulsarClusterSpec spec = PulsarClusterSpec.builder()
-            .clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6)))
-            .brokerEnvs(getPackagesManagementServiceEnvs())
-            .build();
+                .clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6)))
+                .brokerEnvs(getPackagesManagementServiceEnvs())
+                .build();
         pulsarCluster = PulsarCluster.forSpec(spec);
         pulsarCluster.start();
     }
@@ -88,13 +88,13 @@ public class PackagesCliTest extends TestRetrySupport {
     public void testPackagesOperationsWithUploadingPackages() throws Exception {
         String testPackageName = "function://public/default/test@v1";
         ContainerExecResult result = runPackagesCommand("upload", "--description", "a test package",
-            "--path", PulsarCluster.ADMIN_SCRIPT, testPackageName);
+                "--path", PulsarCluster.ADMIN_SCRIPT, testPackageName);
         assertEquals(result.getExitCode(), 0);
 
         BrokerContainer container = pulsarCluster.getBroker(0);
         String downloadFile = "tmp-file-" + RandomStringUtils.randomAlphabetic(8);
         String[] downloadCmd = new String[]{PulsarCluster.ADMIN_SCRIPT, "packages", "download",
-            "--path", downloadFile, testPackageName};
+                "--path", downloadFile, testPackageName};
         result = container.execCmd(downloadCmd);
         assertEquals(result.getExitCode(), 0);
 
@@ -119,7 +119,7 @@ public class PackagesCliTest extends TestRetrySupport {
 
         String contact = "test@apache.org";
         result = runPackagesCommand("update-metadata", "--description", "a test package",
-            "--contact", contact, "-PpropertyA=A", testPackageName);
+                "--contact", contact, "-PpropertyA=A", testPackageName);
         assertEquals(result.getExitCode(), 0);
 
         result = runPackagesCommand("get-metadata", testPackageName);