You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/10/27 03:51:58 UTC
[pulsar] branch branch-2.8 updated: allow download package from package management service (#14814)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 7837ecb36a4 allow download package from package management service (#14814)
7837ecb36a4 is described below
commit 7837ecb36a49ee8b48a290065e3795362b1b8b7e
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Tue Apr 5 03:48:05 2022 +0800
allow download package from package management service (#14814)
(cherry picked from commit 7cc5cf1fadf8835fcc3166494f0e05632894da43)
---
.../pulsar/functions/worker/FunctionActioner.java | 7 +++-
.../functions/worker/FunctionActionerTest.java | 49 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 6497e154789..e069be74569 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -68,6 +68,7 @@ import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.common.functions.Utils.FILE;
import static org.apache.pulsar.common.functions.Utils.HTTP;
+import static org.apache.pulsar.common.functions.Utils.hasPackageTypePrefix;
import static org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
@@ -192,7 +193,8 @@ public class FunctionActioner {
return instanceConfig;
}
- private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData, int instanceId) throws FileNotFoundException, IOException {
+ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData,
+ int instanceId) throws FileNotFoundException, IOException, PulsarAdminException {
FunctionDetails details = functionMetaData.getFunctionDetails();
File pkgDir = pkgFile.getParentFile();
@@ -214,12 +216,15 @@ public class FunctionActioner {
}
String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath();
boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(HTTP);
+ boolean downloadFromPackageManagementService = isPkgUrlProvided && hasPackageTypePrefix(pkgLocationPath);
log.info("{}/{}/{} Function package file {} will be downloaded from {}", tempPkgFile, details.getTenant(),
details.getNamespace(), details.getName(),
downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation());
if(downloadFromHttp) {
FunctionCommon.downloadFromHttpUrl(pkgLocationPath, tempPkgFile);
+ } else if (downloadFromPackageManagementService) {
+ getPulsarAdmin().packages().download(pkgLocationPath, tempPkgFile.getPath());
} else {
FileOutputStream tempPkgFos = new FileOutputStream(tempPkgFile);
WorkerUtils.downloadFromBookkeeper(
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index dc49036a5be..7502e247d99 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.Packages;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
@@ -220,4 +221,52 @@ public class FunctionActionerTest {
verify(functionAuthProvider.get(), times(0)).cleanUpAuthData(any(), any());
}
+ @Test
+ public void testStartFunctionWithPackageUrl() throws Exception {
+
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setWorkerId("worker-1");
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+ workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setStateStorageServiceUrl("foo");
+ workerConfig.setFunctionAssignmentTopicName("assignments");
+ String downloadDir = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+ workerConfig.setDownloadDirectory(downloadDir);
+
+ RuntimeFactory factory = mock(RuntimeFactory.class);
+ Runtime runtime = mock(Runtime.class);
+ doReturn(runtime).when(factory).createContainer(any(), any(), any(), any());
+ doNothing().when(runtime).start();
+ Namespace dlogNamespace = mock(Namespace.class);
+ final String exceptionMsg = "dl namespace not-found";
+ doThrow(new IllegalArgumentException(exceptionMsg)).when(dlogNamespace).openLog(any());
+ PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
+ Packages packages = mock(Packages.class);
+ doReturn(packages).when(pulsarAdmin).packages();
+ doNothing().when(packages).download(any(), any());
+
+ @SuppressWarnings("resource")
+ FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
+ new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), pulsarAdmin);
+
+ // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call
+ // RuntimeSpawner
+ String pkgPathLocation = "function://public/default/test-function@latest";
+ Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
+ .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
+ .setNamespace("test-namespace").setName("func-1"))
+ .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build())
+ .build();
+ Function.Instance instance = Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
+ .build();
+ FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
+ doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
+
+ actioner.startFunction(functionRuntimeInfo);
+ verify(runtime, times(1)).start();
+ }
+
}