You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/09/30 20:05:49 UTC
[pulsar] branch master updated: [fix][functions] Fix the download of builtin Functions (#17877)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6651bbbab5b [fix][functions] Fix the download of builtin Functions (#17877)
6651bbbab5b is described below
commit 6651bbbab5b33f09cdde83de048d8116b2835de6
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Sep 30 22:05:35 2022 +0200
[fix][functions] Fix the download of builtin Functions (#17877)
---
.../functions/worker/rest/api/ComponentImpl.java | 46 ++++--
.../rest/api/v3/FunctionApiV3ResourceTest.java | 171 +++++++++++++--------
2 files changed, 136 insertions(+), 81 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 132641c8f01..7cd35352bd2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -46,7 +46,6 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
-import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -102,7 +101,7 @@ import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
-import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -1473,10 +1472,18 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
? functionMetaData.getTransformFunctionPackageLocation().getPackagePath()
: functionMetaData.getPackageLocation().getPackagePath();
- return getStreamingOutput(pkgPath);
+ FunctionDetails.ComponentType componentType = transformFunction
+ ? FunctionDetails.ComponentType.FUNCTION
+ : InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails());
+
+ return getStreamingOutput(pkgPath, componentType);
}
private StreamingOutput getStreamingOutput(String pkgPath) {
+ return getStreamingOutput(pkgPath, null);
+ }
+
+ private StreamingOutput getStreamingOutput(String pkgPath, FunctionDetails.ComponentType componentType) {
return output -> {
if (pkgPath.startsWith(Utils.HTTP)) {
URL url = URI.create(pkgPath).toURL();
@@ -1489,15 +1496,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
Files.copy(file.toPath(), output);
} else if (pkgPath.startsWith(Utils.BUILTIN)
&& !worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
- String sType = pkgPath.replaceFirst("^builtin://", "");
- final String connectorsDir = worker().getWorkerConfig().getConnectorsDirectory();
- log.warn("Processing package {} ; looking at the dir {}", pkgPath, connectorsDir);
- TreeMap<String, FunctionArchive> sinksOrSources =
- FunctionUtils.searchForFunctions(connectorsDir, true);
- Path narPath = sinksOrSources.get(sType).getArchivePath();
- if (narPath == null) {
- throw new IllegalStateException("Didn't find " + pkgPath + " in " + connectorsDir);
- }
+ Path narPath = getBuiltinArchivePath(pkgPath, componentType);
log.info("Loading {} from {}", pkgPath, narPath);
try (InputStream in = new FileInputStream(narPath.toString())) {
IOUtils.copy(in, output, 1024);
@@ -1511,7 +1510,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
output.flush();
}
} catch (Exception e) {
- log.error("Failed download package {} from packageMangment Service", pkgPath, e);
+ log.error("Failed download package {} from packageManagement Service", pkgPath, e);
}
} else {
@@ -1520,6 +1519,27 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
};
}
+ private Path getBuiltinArchivePath(String pkgPath, FunctionDetails.ComponentType componentType) {
+ String type = pkgPath.replaceFirst("^builtin://", "");
+ if (!FunctionDetails.ComponentType.FUNCTION.equals(componentType)) {
+ Connector connector = worker().getConnectorsManager().getConnector(type);
+ if (connector != null) {
+ return connector.getArchivePath();
+ }
+ if (componentType != null) {
+ throw new IllegalStateException("Didn't find " + type + " in built-in connectors");
+ }
+ }
+ FunctionArchive function = worker().getFunctionsManager().getFunction(type);
+ if (function != null) {
+ return function.getArchivePath();
+ }
+ if (componentType != null) {
+ throw new IllegalStateException("Didn't find " + type + " in built-in functions");
+ }
+ throw new IllegalStateException("Didn't find " + type + " in built-in connectors or functions");
+ }
+
@Override
public StreamingOutput downloadFunction(
final String path, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 8d19869b473..fb09a4026a5 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.functions.worker.rest.api.v3;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
@@ -29,7 +28,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
@@ -44,7 +42,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Consumer;
import javax.ws.rs.core.Response;
@@ -78,7 +75,8 @@ import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
-import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.io.Connector;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
@@ -1604,20 +1602,13 @@ public class FunctionApiV3ResourceTest {
String jarHttpUrl =
"https://repo1.maven.org/maven2/org/apache/pulsar/pulsar-common/2.4.2/pulsar-common-2.4.2.jar";
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- PulsarWorkerService worker = mock(PulsarWorkerService.class);
- doReturn(true).when(worker).isInitialized();
- WorkerConfig config = mock(WorkerConfig.class);
- when(config.isAuthorizationEnabled()).thenReturn(false);
- when(worker.getWorkerConfig()).thenReturn(config);
- FunctionsImpl function = new FunctionsImpl(() -> worker);
- StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl, null, null);
+
+ StreamingOutput streamOutput = resource.downloadFunction(jarHttpUrl, null, null);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
- if (pkgFile.exists()) {
- pkgFile.delete();
- }
+ pkgFile.delete();
}
@Test
@@ -1626,53 +1617,61 @@ public class FunctionApiV3ResourceTest {
File file = Paths.get(fileUrl.toURI()).toFile();
String fileLocation = file.getAbsolutePath().replace('\\', '/');
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- PulsarWorkerService worker = mock(PulsarWorkerService.class);
- doReturn(true).when(worker).isInitialized();
- WorkerConfig config = mock(WorkerConfig.class);
- when(config.isAuthorizationEnabled()).thenReturn(false);
- when(worker.getWorkerConfig()).thenReturn(config);
- FunctionsImpl function = new FunctionsImpl(() -> worker);
- StreamingOutput streamOutput = function.downloadFunction("file:///" + fileLocation, null, null);
+
+ StreamingOutput streamOutput = resource.downloadFunction("file:///" + fileLocation, null, null);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
- if (pkgFile.exists()) {
- pkgFile.delete();
- }
+ Assert.assertEquals(file.length(), pkgFile.length());
+ pkgFile.delete();
}
@Test
- public void testDownloadFunctionBuiltin() throws Exception {
- mockStatic(WorkerUtils.class, ctx -> {
- });
-
+ public void testDownloadFunctionBuiltinConnector() throws Exception {
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- PulsarWorkerService worker = mock(PulsarWorkerService.class);
- doReturn(true).when(worker).isInitialized();
+ WorkerConfig config = new WorkerConfig()
+ .setUploadBuiltinSinksSources(false);
+ when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
- WorkerConfig config = mock(WorkerConfig.class);
- when(config.isAuthorizationEnabled()).thenReturn(false);
- when(config.getUploadBuiltinSinksSources()).thenReturn(false);
- when(config.getConnectorsDirectory()).thenReturn("/connectors");
+ Connector connector = Connector.builder().archivePath(file.toPath()).build();
+ ConnectorsManager connectorsManager = mock(ConnectorsManager.class);
+ when(connectorsManager.getConnector("cassandra")).thenReturn(connector);
+ when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager);
- when(worker.getDlogNamespace()).thenReturn(mock(Namespace.class));
- when(worker.getWorkerConfig()).thenReturn(config);
- FunctionsImpl function = new FunctionsImpl(() -> worker);
+ StreamingOutput streamOutput = resource.downloadFunction("builtin://cassandra", null, null);
- TreeMap<String, FunctionArchive> functions = new TreeMap<>();
- FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
- functions.put("cassandra", functionArchive);
+ File pkgFile = new File(testDir, UUID.randomUUID().toString());
+ OutputStream output = new FileOutputStream(pkgFile);
+ streamOutput.write(output);
+ output.flush();
+ output.close();
+ Assert.assertTrue(pkgFile.exists());
+ Assert.assertTrue(pkgFile.exists());
+ Assert.assertEquals(file.length(), pkgFile.length());
+ pkgFile.delete();
+ }
- mockStatic(FunctionUtils.class, ctx -> {
- ctx.when(() -> FunctionUtils.searchForFunctions(anyString(), anyBoolean())).thenReturn(functions);
+ @Test
+ public void testDownloadFunctionBuiltinFunction() throws Exception {
+ URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
+ File file = Paths.get(fileUrl.toURI()).toFile();
+ String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- });
+ WorkerConfig config = new WorkerConfig()
+ .setUploadBuiltinSinksSources(false);
+ when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
+
+ FunctionsManager functionsManager = mock(FunctionsManager.class);
+ FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
+ when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive);
+ when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class));
+ when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager);
- StreamingOutput streamOutput = function.downloadFunction("builtin://cassandra", null, null);
+ StreamingOutput streamOutput = resource.downloadFunction("builtin://exclamation", null, null);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
@@ -1680,71 +1679,107 @@ public class FunctionApiV3ResourceTest {
output.flush();
output.close();
Assert.assertTrue(pkgFile.exists());
- if (pkgFile.exists()) {
- Assert.assertEquals(file.length(), pkgFile.length());
- pkgFile.delete();
- } else {
- fail("expected file");
- }
+ Assert.assertEquals(file.length(), pkgFile.length());
+ pkgFile.delete();
}
@Test
- public void testDownloadFunctionByName() throws Exception {
+ public void testDownloadFunctionBuiltinConnectorByName() throws Exception {
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
- String fileLocation = file.getAbsolutePath().replace('\\', '/');
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- doReturn(true).when(mockedWorkerService).isInitialized();
- WorkerConfig config = mock(WorkerConfig.class);
- when(config.isAuthorizationEnabled()).thenReturn(false);
+ WorkerConfig config = new WorkerConfig()
+ .setUploadBuiltinSinksSources(false);
when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
FunctionMetaData metaData = FunctionMetaData.newBuilder()
- .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("file:///" + fileLocation))
+ .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("builtin://cassandra"))
.setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid"))
+ .setFunctionDetails(FunctionDetails.newBuilder().setComponentType(FunctionDetails.ComponentType.SINK))
.build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
+ Connector connector = Connector.builder().archivePath(file.toPath()).build();
+ ConnectorsManager connectorsManager = mock(ConnectorsManager.class);
+ when(connectorsManager.getConnector("cassandra")).thenReturn(connector);
+ when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager);
+
StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, false);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
- if (pkgFile.exists()) {
- pkgFile.delete();
- }
+ Assert.assertEquals(file.length(), pkgFile.length());
+ pkgFile.delete();
}
@Test
- public void testDownloadTransformFunctionByName() throws Exception {
+ public void testDownloadFunctionBuiltinFunctionByName() throws Exception {
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
- String fileLocation = file.getAbsolutePath().replace('\\', '/');
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- doReturn(true).when(mockedWorkerService).isInitialized();
- WorkerConfig config = mock(WorkerConfig.class);
- when(config.isAuthorizationEnabled()).thenReturn(false);
+ WorkerConfig config = new WorkerConfig()
+ .setUploadBuiltinSinksSources(false);
when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+ FunctionMetaData metaData = FunctionMetaData.newBuilder()
+ .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("builtin://exclamation"))
+ .setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid"))
+ .setFunctionDetails(FunctionDetails.newBuilder().setComponentType(FunctionDetails.ComponentType.FUNCTION))
+ .build();
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
+
+ FunctionsManager functionsManager = mock(FunctionsManager.class);
+ FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
+ when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive);
+ when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class));
+ when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager);
+
+ StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, false);
+ File pkgFile = new File(testDir, UUID.randomUUID().toString());
+ OutputStream output = new FileOutputStream(pkgFile);
+ streamOutput.write(output);
+ Assert.assertTrue(pkgFile.exists());
+ Assert.assertEquals(file.length(), pkgFile.length());
+ pkgFile.delete();
+ }
+
+ @Test
+ public void testDownloadTransformFunctionByName() throws Exception {
+ URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
+ File file = Paths.get(fileUrl.toURI()).toFile();
+ String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+
+ WorkerConfig workerConfig = new WorkerConfig()
+ .setUploadBuiltinSinksSources(false);
+ when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
FunctionMetaData metaData = FunctionMetaData.newBuilder()
.setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid"))
.setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder()
- .setPackagePath("file:///" + fileLocation))
+ .setPackagePath("builtin://exclamation"))
.build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
+ FunctionsManager functionsManager = mock(FunctionsManager.class);
+ FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
+ when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive);
+ when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class));
+ when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager);
+
StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, true);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
- if (pkgFile.exists()) {
- pkgFile.delete();
- }
+ Assert.assertEquals(file.length(), pkgFile.length());
+ pkgFile.delete();
}