You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/03 15:28:16 UTC

[pulsar] branch master updated: Added MetadataStore deleteRecursive operation (#11867)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 79e5424  Added MetadataStore deleteRecursive operation (#11867)
79e5424 is described below

commit 79e5424af8786adeb7c4d40887e040b261e50292
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Sep 3 08:27:32 2021 -0700

    Added MetadataStore deleteRecursive operation (#11867)
    
    * Added MetadataStore deleteRecursive operation
    
    * Added note about non-atomicity
---
 .../apache/pulsar/metadata/api/MetadataStore.java  | 12 ++++++++++
 .../metadata/impl/AbstractMetadataStore.java       | 20 ++++++++++++++++-
 .../metadata/impl/FaultInjectionMetadataStore.java | 10 +++++++++
 .../apache/pulsar/metadata/MetadataStoreTest.java  | 26 ++++++++++++++++++++++
 4 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index b75df50..fbebc7b 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -113,6 +113,18 @@ public interface MetadataStore extends AutoCloseable {
     CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion);
 
     /**
+     * Delete a key-value pair and all the children nodes.
+     *
+     * Note: the operation might not be carried in an atomic fashion. If the operation fails, the deletion of the
+     *       tree might be only partial.
+     *
+     * @param path
+     *            the path of the key to delete from the store
+     * @return a future to track the async request
+     */
+    CompletableFuture<Void> deleteRecursive(String path);
+
+    /**
      * Register a listener that will be called on changes in the underlying store.
      *
      * @param listener
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index f06b8b5..179820f 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -39,6 +39,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.common.util.FutureUtil;
@@ -198,8 +199,25 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
                 });
     }
 
+    @Override
+    public CompletableFuture<Void> deleteRecursive(String path) {
+        return getChildren(path)
+                .thenCompose(children -> FutureUtil.waitForAll(
+                        children.stream()
+                                .map(child -> deleteRecursive(path + "/" + child))
+                                .collect(Collectors.toList())))
+                .thenCompose(__ -> exists(path))
+                .thenCompose(exists -> {
+                    if (exists) {
+                        return delete(path, Optional.empty());
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
+    }
+
     protected abstract CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
-            EnumSet<CreateOption> options);
+                                                        EnumSet<CreateOption> options);
 
     @Override
     public final CompletableFuture<Stat> put(String path, byte[] data, Optional<Long> optExpectedVersion,
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 749ce46..6236aed 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -116,6 +116,16 @@ public class FaultInjectionMetadataStore implements MetadataStore {
     }
 
     @Override
+    public CompletableFuture<Void> deleteRecursive(String path) {
+        Optional<MetadataStoreException> ex = programmedFailure(OperationType.DELETE, path);
+        if (ex.isPresent()) {
+            return FutureUtil.failedFuture(ex.get());
+        }
+
+        return store.deleteRecursive(path);
+    }
+
+    @Override
     public void registerListener(Consumer<Notification> listener) {
         store.registerListener(listener);
     }
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 34a1373..be18bde 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -298,4 +298,30 @@ public class MetadataStoreTest extends BaseMetadataStoreTest {
         assertEquals(n.getType(), NotificationType.ChildrenChanged);
         assertEquals(n.getPath(), key1);
     }
+
+    @Test(dataProvider = "impl")
+    public void testDeleteRecursive(String provider, String url) throws Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());
+
+        String prefix = newKey();
+
+        String key1 = newKey();
+        store.put(prefix + key1, "value-1".getBytes(), Optional.of(-1L)).join();
+
+        store.put(prefix + key1 + "/c1", "value".getBytes(), Optional.of(-1L)).join();
+        store.put(prefix + key1 + "/c2", "value".getBytes(), Optional.of(-1L)).join();
+        store.put(prefix + key1 + "/c1/x1", "value".getBytes(), Optional.of(-1L)).join();
+        store.put(prefix + key1 + "/c1/x2", "value".getBytes(), Optional.of(-1L)).join();
+        store.put(prefix + key1 + "/c2/y2", "value".getBytes(), Optional.of(-1L)).join();
+        store.put(prefix + key1 + "/c3", "value".getBytes(), Optional.of(-1L)).join();
+
+        String key2 = newKey();
+        store.put(prefix + key2, "value-2".getBytes(), Optional.of(-1L)).join();
+
+        store.deleteRecursive(prefix + key1).join();
+
+        assertEquals(store.getChildren(prefix).join(), Collections.singletonList(key2.substring(1)));
+    }
+
 }