You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/26 11:17:26 UTC

[pulsar] 01/04: Fix: don't attempt to clean up packages when Source/Sink is builtin (#9289)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7c3bbc0913bddd188305017b710700e84afe6763
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Jan 24 18:42:12 2021 -0800

    Fix: don't attempt to clean up packages when Source/Sink is builtin (#9289)
    
    ### Motivation
    
    For Pulsar Functions / IO, don't attempt to clean up packages when Source/Sink is builtin.  Though it doesn't really cause any problems even if we do just a nasty exception gets logged.
    
    
    
    (cherry picked from commit 47b05c45000371ad26f5a197023e34ade7834fb2)
---
 .../functions/worker/rest/api/ComponentImpl.java   |  5 +-
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  | 77 ++++++++++++++++++++++
 .../rest/api/v3/SourceApiV3ResourceTest.java       | 77 ++++++++++++++++++++++
 3 files changed, 158 insertions(+), 1 deletion(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 29766e6..56771f6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -400,7 +400,10 @@ public abstract class ComponentImpl {
                         ComponentTypeUtils.toString(componentType), tenant, namespace, componentName));
 
         // clean up component files stored in BK
-        if (!functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP) && !functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) {
+        String functionPackagePath = functionMetaData.getPackageLocation().getPackagePath();
+        if (!functionPackagePath.startsWith(Utils.HTTP)
+                && !functionPackagePath.startsWith(Utils.FILE)
+                && !functionPackagePath.startsWith(Utils.BUILTIN)) {
             try {
                 WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(), functionMetaData.getPackageLocation().getPackagePath());
             } catch (IOException e) {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 6b55106..ed1c116 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -73,6 +74,7 @@ import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.doNothing;
 import static org.powermock.api.mockito.PowerMockito.doReturn;
@@ -1152,6 +1154,81 @@ public class SinkApiV3ResourceTest {
         }
     }
 
+    @Test
+    public void testDeregisterSinkBKPackageCleanup() throws IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+        String packagePath = "public/default/test/591541f0-c7c5-40c0-983b-610c722f90b0-pulsar-io-batch-data-generator-2.7.0.nar";
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSink();
+
+        PowerMockito.verifyStatic(WorkerUtils.class, times(1));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
+    @Test
+    public void testDeregisterBuiltinSinkBKPackageCleanup() throws IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+        String packagePath = String.format("%s://data-generator", Utils.BUILTIN);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSink();
+
+        // if the sink is a builtin sink we shouldn't try to clean it up
+        PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
+    @Test
+    public void testDeregisterHTTPSinkBKPackageCleanup() throws IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+        String packagePath = String.format("http://foo.com/connector.jar", Utils.BUILTIN);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSink();
+
+        // if the sink is a is download from a http url, we shouldn't try to clean it up
+        PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
+    @Test
+    public void testDeregisterFileSinkBKPackageCleanup() throws IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+        String packagePath = String.format("file://foo/connector.jar", Utils.BUILTIN);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSink();
+
+        // if the sink package has a file url, we shouldn't try to clean it up
+        PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
     //
     // Get Sink Info
     //
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index cf6945f..3569e6a 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.doNothing;
 import static org.powermock.api.mockito.PowerMockito.doReturn;
@@ -50,6 +51,7 @@ import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -1182,6 +1184,81 @@ public class SourceApiV3ResourceTest {
         }
     }
 
+    @Test
+    public void testDeregisterSourceBKPackageCleanup() throws IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+        String packagePath = "public/default/test/591541f0-c7c5-40c0-983b-610c722f90b0-pulsar-io-batch-data-generator-2.7.0.nar";
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSource();
+
+        PowerMockito.verifyStatic(WorkerUtils.class, times(1));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
+    @Test
+    public void testDeregisterBuiltinSourceBKPackageCleanup() throws IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+        String packagePath = String.format("%s://data-generator", Utils.BUILTIN);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSource();
+
+        // if the source is a builtin source we shouldn't try to clean it up
+        PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
+    @Test
+    public void testDeregisterHTTPSourceBKPackageCleanup() throws IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+        String packagePath = String.format("http://foo.com/connector.jar", Utils.BUILTIN);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSource();
+
+        // if the source is a is download from a http url, we shouldn't try to clean it up
+        PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
+    @Test
+    public void testDeregisterFileSourceBKPackageCleanup() throws IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+        String packagePath = String.format("file://foo/connector.jar", Utils.BUILTIN);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSource();
+
+        // if the source has a file url, we shouldn't try to clean it up
+        PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
     //
     // Get Source Info
     //