You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/20 15:05:55 UTC

[pulsar] 13/31: [fix][package-management] Fix the new path `/data` introduced regression (#15367)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 46d6a7f41b780000201f3d2a548f0e9d94f35ad4
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Fri May 6 10:04:03 2022 +0800

    [fix][package-management] Fix the new path `/data` introduced regression (#15367)
    
    ---
    
    Fixes #15362
    
    *Motivation*
    
    The PR https://github.com/apache/pulsar/pull/13218 supports saving
    package data into filesystem. But it introduces a regression for the
    old versions.
    For example, we have a package function://public/default/package@v0.1,
    it will save the meta to the path function/public/default/package/v0.1/meta,
    and save the data to the path function/public/default/package/v0.1.
    By default, we are using distributed log as the package storage, and
    it supports saving data in a directory.
    But some storage like filesystem doesn't have the similar ability, it
    needs another path for saving data.
    This API provides the ability to support saving the data in another place.
    If you specify the data path as `/data`, the package will be saved into
    function/public/default/package/v0.1/data.
    
    *Modifications*
    
    - make the data path configurable in the storage implementation.
    
    (cherry picked from commit aa9aa1886079f6a244d07ee71ee129c44bccdb2a)
---
 .../packages/management/core/PackagesStorage.java  | 21 +++++++
 .../core/impl/PackagesManagementImpl.java          |  9 ++-
 .../core/impl/PackagesManagementImplTest.java      | 69 ++++++++++++++++++++--
 .../filesystem/FileSystemPackagesStorage.java      |  5 ++
 4 files changed, 95 insertions(+), 9 deletions(-)

diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorage.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorage.java
index 9672cf86603..6cebe6c6370 100644
--- a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorage.java
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorage.java
@@ -83,4 +83,25 @@ public interface PackagesStorage {
      * @return
      */
     CompletableFuture<Void> closeAsync();
+
+    /**
+     * The extra path for saving package data.
+     *
+     * For example, we have a package function://public/default/package@v0.1,
+     * it will save the meta to the path function/public/default/package/v0.1/meta,
+     * and save the data to the path function/public/default/package/v0.1.
+     * By default, we are using distributed log as the package storage, and it supports
+     * saving data at a directory.
+     * But some storage like filesystem don't have the similar ability, it needs another path
+     * for saving the data.
+     * This api provides the ability to support saving the data in another place.
+     * If you specify the data path as `/data`, the package will saved into
+     * function/public/default/package/v0.1/data.
+     *
+     * @return
+     *      the data path
+     */
+    default String dataPath() {
+        return "";
+    }
 }
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImpl.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImpl.java
index ca1adaa458d..792a1ecf106 100644
--- a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImpl.java
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImpl.java
@@ -143,8 +143,7 @@ public class PackagesManagementImpl implements PackagesManagement {
     public CompletableFuture<Void> delete(PackageName packageName) {
         return CompletableFuture.allOf(
             storage.deleteAsync(metadataPath(packageName)),
-            storage.deleteAsync(packagePath(packageName)),
-            storage.deleteAsync(packageName.toRestPath()));
+            storage.deleteAsync(packagePath(packageName)));
     }
 
     @Override
@@ -244,12 +243,12 @@ public class PackagesManagementImpl implements PackagesManagement {
         return future;
     }
 
-    private String metadataPath(PackageName packageName) {
+    protected String metadataPath(PackageName packageName) {
         return packageName.toRestPath() + "/meta";
     }
 
-    private String packagePath(PackageName packageName) {
-        return packageName.toRestPath() + "/data";
+    protected String packagePath(PackageName packageName) {
+        return packageName.toRestPath() + storage.dataPath();
     }
 
     private String packageWithoutVersionPath(PackageName packageName) {
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
index 6dc28352c00..aba86501372 100644
--- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
+++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
@@ -21,7 +21,10 @@ package org.apache.pulsar.packages.management.core.impl;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
 import org.apache.pulsar.packages.management.core.PackagesManagement;
@@ -32,15 +35,15 @@ import org.apache.pulsar.packages.management.core.common.PackageMetadataUtil;
 import org.apache.pulsar.packages.management.core.common.PackageName;
 import org.apache.pulsar.packages.management.core.exceptions.PackagesManagementException;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class PackagesManagementImplTest {
     private static PackagesStorage storage;
     private static PackagesManagement packagesManagement;
 
-    @BeforeClass
+    @BeforeMethod
     public static void setup() throws IOException {
         PackagesStorageProvider storageProvider = PackagesStorageProvider.newProvider(MockedPackagesStorageProvider.class.getName());
         DefaultPackagesStorageConfiguration packagesStorageConfiguration = new DefaultPackagesStorageConfiguration();
@@ -50,7 +53,7 @@ public class PackagesManagementImplTest {
         packagesManagement.initialize(storage);
     }
 
-    @AfterClass(alwaysRun = true)
+    @AfterMethod(alwaysRun = true)
     public static void teardown() throws ExecutionException, InterruptedException {
         storage.closeAsync().get();
     }
@@ -192,4 +195,62 @@ public class PackagesManagementImplTest {
             Assert.fail("should not throw any exception");
         }
     }
+
+    @Test
+    public void testPackagePath() {
+        PackagesManagementImpl impl = (PackagesManagementImpl) packagesManagement;
+        PackageName pn = PackageName.get("function://public/default/test@v1");
+        String metaPath = impl.metadataPath(pn);
+        Assert.assertEquals(metaPath, "function/public/default/test/v1/meta");
+        String dataPath = impl.packagePath(pn);
+        Assert.assertEquals(dataPath, "function/public/default/test/v1");
+
+        impl.initialize(new PackagesStorage() {
+            @Override
+            public void initialize() {
+
+            }
+
+            @Override
+            public CompletableFuture<Void> writeAsync(String path, InputStream inputStream) {
+                return null;
+            }
+
+            @Override
+            public CompletableFuture<Void> readAsync(String path, OutputStream outputStream) {
+                return null;
+            }
+
+            @Override
+            public CompletableFuture<Void> deleteAsync(String path) {
+                return null;
+            }
+
+            @Override
+            public CompletableFuture<List<String>> listAsync(String path) {
+                return null;
+            }
+
+            @Override
+            public CompletableFuture<Boolean> existAsync(String path) {
+                return null;
+            }
+
+            @Override
+            public CompletableFuture<Void> closeAsync() {
+                return null;
+            }
+
+            @Override
+            public String dataPath() {
+                return "/tmp";
+            }
+        });
+
+
+        metaPath = impl.metadataPath(pn);
+        Assert.assertEquals(metaPath, "function/public/default/test/v1/meta");
+        dataPath = impl.packagePath(pn);
+        Assert.assertEquals(dataPath, "function/public/default/test/v1/tmp");
+    }
 }
diff --git a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java
index ff34c482f15..74d77fe3572 100644
--- a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java
+++ b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java
@@ -147,4 +147,9 @@ public class FileSystemPackagesStorage implements PackagesStorage {
     public CompletableFuture<Void> closeAsync() {
         return CompletableFuture.completedFuture(null);
     }
+
+    @Override
+    public String dataPath() {
+        return "/data";
+    }
 }