You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/19 01:04:38 UTC
[pulsar] 01/02: [pulsar-admin] allow create functions with package
URL (#11666)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d30c5d16525d7cb34a303a9f36915f262714e7f7
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Tue Aug 17 12:59:08 2021 +0800
[pulsar-admin] allow create functions with package URL (#11666)
Fix https://github.com/apache/pulsar/issues/11665
### Motivation
Allow user to create function with package URL with pulsar-admin.
### Modifications
- allow passing valid package URL from pulsar-admin functions
- added tests
(cherry picked from commit de86f4f615adbe0ce8ac1f5a0d9077153bc38bdb)
---
.../management/core/common/PackageType.java | 0
.../apache/pulsar/admin/cli/CmdFunctionsTest.java | 93 +++++++++++++++++++++-
.../org/apache/pulsar/common/functions/Utils.java | 10 ++-
.../functions/worker/rest/api/FunctionsImpl.java | 8 +-
.../functions/worker/rest/api/SinksImpl.java | 8 +-
.../functions/worker/rest/api/SourcesImpl.java | 8 +-
.../tests/integration/cli/PackagesCliTest.java | 12 +--
7 files changed, 111 insertions(+), 28 deletions(-)
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java
similarity index 100%
rename from pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java
rename to pulsar-client-admin-api/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index d7f68a4..3e32961 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -82,14 +82,18 @@ public class CmdFunctionsTest {
private static final String JAR_NAME = CmdFunctionsTest.class.getClassLoader().getResource("dummyexamples.jar").getFile();
private static final String GO_EXEC_FILE_NAME = "test-go-function-with-url";
private static final String PYTHON_FILE_NAME = "test-go-function-with-url";
- private static final String URL ="file:" + JAR_NAME;
- private static final String URL_WITH_GO ="file:" + GO_EXEC_FILE_NAME;
- private static final String URL_WITH_PY ="file:" + PYTHON_FILE_NAME;
+ private static final String URL = "file:" + JAR_NAME;
+ private static final String URL_WITH_GO = "file:" + GO_EXEC_FILE_NAME;
+ private static final String URL_WITH_PY = "file:" + PYTHON_FILE_NAME;
private static final String FN_NAME = TEST_NAME + "-function";
private static final String INPUT_TOPIC_NAME = TEST_NAME + "-input-topic";
private static final String OUTPUT_TOPIC_NAME = TEST_NAME + "-output-topic";
private static final String TENANT = TEST_NAME + "-tenant";
private static final String NAMESPACE = TEST_NAME + "-namespace";
+ private static final String PACKAGE_URL = "function://sample/ns1/jardummyexamples@1";
+ private static final String PACKAGE_GO_URL = "function://sample/ns1/godummyexamples@1";
+ private static final String PACKAGE_PY_URL = "function://sample/ns1/pydummyexamples@1";
+ private static final String PACKAGE_INVALID_URL = "functionsample.jar";
private PulsarAdmin admin;
private Functions functions;
@@ -362,6 +366,89 @@ public class CmdFunctionsTest {
}
@Test
+ public void testCreateFunctionWithPackageUrl() throws Exception {
+ cmd.run(new String[] {
+ "create",
+ "--name", FN_NAME,
+ "--inputs", INPUT_TOPIC_NAME,
+ "--output", OUTPUT_TOPIC_NAME,
+ "--jar", PACKAGE_URL,
+ "--tenant", "sample",
+ "--namespace", "ns1",
+ "--className", DummyFunction.class.getName(),
+ });
+
+ CreateFunction creater = cmd.getCreater();
+
+ assertEquals(FN_NAME, creater.getFunctionName());
+ assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
+ assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
+ verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
+ }
+
+ @Test
+ public void testCreateGoFunctionWithPackageUrl() throws Exception {
+ cmd.run(new String[] {
+ "create",
+ "--name", "test-go-function",
+ "--inputs", INPUT_TOPIC_NAME,
+ "--output", OUTPUT_TOPIC_NAME,
+ "--go", PACKAGE_GO_URL,
+ "--tenant", "sample",
+ "--namespace", "ns1",
+ });
+
+ CreateFunction creater = cmd.getCreater();
+
+ assertEquals("test-go-function", creater.getFunctionName());
+ assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
+ assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
+ verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
+ }
+
+ @Test
+ public void testCreatePyFunctionWithPackageUrl() throws Exception {
+ cmd.run(new String[] {
+ "create",
+ "--name", "test-py-function",
+ "--inputs", INPUT_TOPIC_NAME,
+ "--output", OUTPUT_TOPIC_NAME,
+ "--py", PACKAGE_PY_URL,
+ "--tenant", "sample",
+ "--namespace", "ns1",
+ "--className", "process_python_function",
+ });
+
+ CreateFunction creater = cmd.getCreater();
+
+ assertEquals("test-py-function", creater.getFunctionName());
+ assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
+ assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
+ verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
+ }
+
+ @Test
+ public void testCreateFunctionWithInvalidPackageUrl() throws Exception {
+ cmd.run(new String[] {
+ "create",
+ "--name", FN_NAME,
+ "--inputs", INPUT_TOPIC_NAME,
+ "--output", OUTPUT_TOPIC_NAME,
+ "--jar", PACKAGE_INVALID_URL,
+ "--tenant", "sample",
+ "--namespace", "ns1",
+ "--className", DummyFunction.class.getName(),
+ });
+
+ CreateFunction creater = cmd.getCreater();
+
+ assertEquals(FN_NAME, creater.getFunctionName());
+ assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
+ assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
+ verify(functions, times(0)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
+ }
+
+ @Test
public void testCreateFunctionWithoutBasicArguments() throws Exception {
cmd.run(new String[] {
"create",
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
index 9629aa8..abc601a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
@@ -21,8 +21,10 @@ package org.apache.pulsar.common.functions;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import java.util.Arrays;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.packages.management.core.common.PackageType;
/**
* Helper class to work with configuration.
@@ -34,7 +36,13 @@ public class Utils {
public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) {
return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(HTTP)
- || functionPkgUrl.startsWith(FILE));
+ || functionPkgUrl.startsWith(FILE)
+ || hasPackageTypePrefix(functionPkgUrl));
+ }
+
+ public static boolean hasPackageTypePrefix(String destPkgUrl) {
+ return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString())
+ && destPkgUrl.contains("://"));
}
public static void inferMissingFunctionName(FunctionConfig functionConfig) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index c656954..dba3645 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -154,7 +154,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
// validate parameters
try {
if (isPkgUrlProvided) {
- if (hasPackageTypePrefix(functionPkgUrl)) {
+ if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
componentPackageFile = downloadPackageFile(functionPkgUrl);
} else {
if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
@@ -323,7 +323,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
// validate parameters
try {
if (isNotBlank(functionPkgUrl)) {
- if (hasPackageTypePrefix(functionPkgUrl)) {
+ if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
componentPackageFile = downloadPackageFile(functionName);
} else {
try {
@@ -759,10 +759,6 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
}
- private static boolean hasPackageTypePrefix(String destPkgUrl) {
- return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
- }
-
private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
return downloadPackageFile(worker(), packageName);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index d0d2ed3..dd9e990 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -152,7 +152,7 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
// validate parameters
try {
if (isPkgUrlProvided) {
- if (hasPackageTypePrefix(sinkPkgUrl)) {
+ if (Utils.hasPackageTypePrefix(sinkPkgUrl)) {
componentPackageFile = downloadPackageFile(sinkPkgUrl);
} else {
if (!Utils.isFunctionPackageUrlSupported(sinkPkgUrl)) {
@@ -322,7 +322,7 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
// validate parameters
try {
if (isNotBlank(sinkPkgUrl)) {
- if (hasPackageTypePrefix(sinkPkgUrl)) {
+ if (Utils.hasPackageTypePrefix(sinkPkgUrl)) {
componentPackageFile = downloadPackageFile(sinkPkgUrl);
} else {
try {
@@ -742,10 +742,6 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
return SinkConfigUtils.convert(sinkConfig, sinkDetails);
}
- private static boolean hasPackageTypePrefix(String destPkgUrl) {
- return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
- }
-
private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
return FunctionsImpl.downloadPackageFile(worker(), packageName);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 47c379e..2e3295d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -152,7 +152,7 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
// validate parameters
try {
if (isPkgUrlProvided) {
- if (hasPackageTypePrefix(sourcePkgUrl)) {
+ if (Utils.hasPackageTypePrefix(sourcePkgUrl)) {
componentPackageFile = downloadPackageFile(sourcePkgUrl);
} else {
if (!Utils.isFunctionPackageUrlSupported(sourcePkgUrl)) {
@@ -320,7 +320,7 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
// validate parameters
try {
if (isNotBlank(sourcePkgUrl)) {
- if (hasPackageTypePrefix(sourcePkgUrl)) {
+ if (Utils.hasPackageTypePrefix(sourcePkgUrl)) {
componentPackageFile = downloadPackageFile(sourcePkgUrl);
} else {
try {
@@ -739,10 +739,6 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
return SourceConfigUtils.convert(sourceConfig, sourceDetails);
}
- private static boolean hasPackageTypePrefix(String destPkgUrl) {
- return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
- }
-
private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
return FunctionsImpl.downloadPackageFile(worker(), packageName);
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java
index 3d8c6f1..df4b48c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java
@@ -45,9 +45,9 @@ public class PackagesCliTest extends TestRetrySupport {
public final void setup() throws Exception {
incrementSetupNumber();
PulsarClusterSpec spec = PulsarClusterSpec.builder()
- .clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6)))
- .brokerEnvs(getPackagesManagementServiceEnvs())
- .build();
+ .clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6)))
+ .brokerEnvs(getPackagesManagementServiceEnvs())
+ .build();
pulsarCluster = PulsarCluster.forSpec(spec);
pulsarCluster.start();
}
@@ -88,13 +88,13 @@ public class PackagesCliTest extends TestRetrySupport {
public void testPackagesOperationsWithUploadingPackages() throws Exception {
String testPackageName = "function://public/default/test@v1";
ContainerExecResult result = runPackagesCommand("upload", "--description", "a test package",
- "--path", PulsarCluster.ADMIN_SCRIPT, testPackageName);
+ "--path", PulsarCluster.ADMIN_SCRIPT, testPackageName);
assertEquals(result.getExitCode(), 0);
BrokerContainer container = pulsarCluster.getBroker(0);
String downloadFile = "tmp-file-" + RandomStringUtils.randomAlphabetic(8);
String[] downloadCmd = new String[]{PulsarCluster.ADMIN_SCRIPT, "packages", "download",
- "--path", downloadFile, testPackageName};
+ "--path", downloadFile, testPackageName};
result = container.execCmd(downloadCmd);
assertEquals(result.getExitCode(), 0);
@@ -119,7 +119,7 @@ public class PackagesCliTest extends TestRetrySupport {
String contact = "test@apache.org";
result = runPackagesCommand("update-metadata", "--description", "a test package",
- "--contact", contact, "-PpropertyA=A", testPackageName);
+ "--contact", contact, "-PpropertyA=A", testPackageName);
assertEquals(result.getExitCode(), 0);
result = runPackagesCommand("get-metadata", testPackageName);