You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/26 11:17:26 UTC
[pulsar] 01/04: Fix: don't attempt to clean up packages when
Source/Sink is builtin (#9289)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7c3bbc0913bddd188305017b710700e84afe6763
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Jan 24 18:42:12 2021 -0800
Fix: don't attempt to clean up packages when Source/Sink is builtin (#9289)
### Motivation
For Pulsar Functions / IO, don't attempt to clean up packages when Source/Sink is builtin. Though it doesn't really cause any problems even if we do just a nasty exception gets logged.
(cherry picked from commit 47b05c45000371ad26f5a197023e34ade7834fb2)
---
.../functions/worker/rest/api/ComponentImpl.java | 5 +-
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 77 ++++++++++++++++++++++
.../rest/api/v3/SourceApiV3ResourceTest.java | 77 ++++++++++++++++++++++
3 files changed, 158 insertions(+), 1 deletion(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 29766e6..56771f6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -400,7 +400,10 @@ public abstract class ComponentImpl {
ComponentTypeUtils.toString(componentType), tenant, namespace, componentName));
// clean up component files stored in BK
- if (!functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP) && !functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) {
+ String functionPackagePath = functionMetaData.getPackageLocation().getPackagePath();
+ if (!functionPackagePath.startsWith(Utils.HTTP)
+ && !functionPackagePath.startsWith(Utils.FILE)
+ && !functionPackagePath.startsWith(Utils.BUILTIN)) {
try {
WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(), functionMetaData.getPackageLocation().getPackagePath());
} catch (IOException e) {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 6b55106..ed1c116 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -73,6 +74,7 @@ import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.doNothing;
import static org.powermock.api.mockito.PowerMockito.doReturn;
@@ -1152,6 +1154,81 @@ public class SinkApiV3ResourceTest {
}
}
+ @Test
+ public void testDeregisterSinkBKPackageCleanup() throws IOException {
+
+ mockStatic(WorkerUtils.class);
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+ String packagePath = "public/default/test/591541f0-c7c5-40c0-983b-610c722f90b0-pulsar-io-batch-data-generator-2.7.0.nar";
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
+ .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+ Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+ deregisterDefaultSink();
+
+ PowerMockito.verifyStatic(WorkerUtils.class, times(1));
+ WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+ }
+
+ @Test
+ public void testDeregisterBuiltinSinkBKPackageCleanup() throws IOException {
+
+ mockStatic(WorkerUtils.class);
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+ String packagePath = String.format("%s://data-generator", Utils.BUILTIN);
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
+ .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+ Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+ deregisterDefaultSink();
+
+ // if the sink is a builtin sink we shouldn't try to clean it up
+ PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+ WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+ }
+
+ @Test
+ public void testDeregisterHTTPSinkBKPackageCleanup() throws IOException {
+
+ mockStatic(WorkerUtils.class);
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+ String packagePath = String.format("http://foo.com/connector.jar", Utils.BUILTIN);
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
+ .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+ Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+ deregisterDefaultSink();
+
+ // if the sink is a is download from a http url, we shouldn't try to clean it up
+ PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+ WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+ }
+
+ @Test
+ public void testDeregisterFileSinkBKPackageCleanup() throws IOException {
+
+ mockStatic(WorkerUtils.class);
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+ String packagePath = String.format("file://foo/connector.jar", Utils.BUILTIN);
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
+ .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+ Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+ deregisterDefaultSink();
+
+ // if the sink package has a file url, we shouldn't try to clean it up
+ PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+ WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+ }
+
//
// Get Sink Info
//
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index cf6945f..3569e6a 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.doNothing;
import static org.powermock.api.mockito.PowerMockito.doReturn;
@@ -50,6 +51,7 @@ import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -1182,6 +1184,81 @@ public class SourceApiV3ResourceTest {
}
}
+ @Test
+ public void testDeregisterSourceBKPackageCleanup() throws IOException {
+
+ mockStatic(WorkerUtils.class);
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+ String packagePath = "public/default/test/591541f0-c7c5-40c0-983b-610c722f90b0-pulsar-io-batch-data-generator-2.7.0.nar";
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source)))
+ .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+ PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+ deregisterDefaultSource();
+
+ PowerMockito.verifyStatic(WorkerUtils.class, times(1));
+ WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+ }
+
+ @Test
+ public void testDeregisterBuiltinSourceBKPackageCleanup() throws IOException {
+
+ mockStatic(WorkerUtils.class);
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+ String packagePath = String.format("%s://data-generator", Utils.BUILTIN);
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source)))
+ .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+ PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+ deregisterDefaultSource();
+
+ // if the source is a builtin source we shouldn't try to clean it up
+ PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+ WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+ }
+
+ @Test
+ public void testDeregisterHTTPSourceBKPackageCleanup() throws IOException {
+
+ mockStatic(WorkerUtils.class);
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+ String packagePath = String.format("http://foo.com/connector.jar", Utils.BUILTIN);
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source)))
+ .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+ PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+ deregisterDefaultSource();
+
+ // if the source is a is download from a http url, we shouldn't try to clean it up
+ PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+ WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+ }
+
+ @Test
+ public void testDeregisterFileSourceBKPackageCleanup() throws IOException {
+
+ mockStatic(WorkerUtils.class);
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+ String packagePath = String.format("file://foo/connector.jar", Utils.BUILTIN);
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source)))
+ .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+ PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+ deregisterDefaultSource();
+
+ // if the source has a file url, we shouldn't try to clean it up
+ PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+ WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+ }
+
//
// Get Source Info
//