You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/03 15:28:16 UTC
[pulsar] branch master updated: Added MetadataStore deleteRecursive
operation (#11867)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 79e5424 Added MetadataStore deleteRecursive operation (#11867)
79e5424 is described below
commit 79e5424af8786adeb7c4d40887e040b261e50292
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Sep 3 08:27:32 2021 -0700
Added MetadataStore deleteRecursive operation (#11867)
* Added MetadataStore deleteRecursive operation
* Added note about non-atomicity
---
.../apache/pulsar/metadata/api/MetadataStore.java | 12 ++++++++++
.../metadata/impl/AbstractMetadataStore.java | 20 ++++++++++++++++-
.../metadata/impl/FaultInjectionMetadataStore.java | 10 +++++++++
.../apache/pulsar/metadata/MetadataStoreTest.java | 26 ++++++++++++++++++++++
4 files changed, 67 insertions(+), 1 deletion(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index b75df50..fbebc7b 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -113,6 +113,18 @@ public interface MetadataStore extends AutoCloseable {
CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion);
/**
+ * Delete a key-value pair and all the children nodes.
+ *
+ * Note: the operation might not be carried in an atomic fashion. If the operation fails, the deletion of the
+ * tree might be only partial.
+ *
+ * @param path
+ * the path of the key to delete from the store
+ * @return a future to track the async request
+ */
+ CompletableFuture<Void> deleteRecursive(String path);
+
+ /**
* Register a listener that will be called on changes in the underlying store.
*
* @param listener
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index f06b8b5..179820f 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -39,6 +39,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.FutureUtil;
@@ -198,8 +199,25 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
});
}
+ @Override
+ public CompletableFuture<Void> deleteRecursive(String path) {
+ return getChildren(path)
+ .thenCompose(children -> FutureUtil.waitForAll(
+ children.stream()
+ .map(child -> deleteRecursive(path + "/" + child))
+ .collect(Collectors.toList())))
+ .thenCompose(__ -> exists(path))
+ .thenCompose(exists -> {
+ if (exists) {
+ return delete(path, Optional.empty());
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ }
+
protected abstract CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
- EnumSet<CreateOption> options);
+ EnumSet<CreateOption> options);
@Override
public final CompletableFuture<Stat> put(String path, byte[] data, Optional<Long> optExpectedVersion,
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 749ce46..6236aed 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -116,6 +116,16 @@ public class FaultInjectionMetadataStore implements MetadataStore {
}
@Override
+ public CompletableFuture<Void> deleteRecursive(String path) {
+ Optional<MetadataStoreException> ex = programmedFailure(OperationType.DELETE, path);
+ if (ex.isPresent()) {
+ return FutureUtil.failedFuture(ex.get());
+ }
+
+ return store.deleteRecursive(path);
+ }
+
+ @Override
public void registerListener(Consumer<Notification> listener) {
store.registerListener(listener);
}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 34a1373..be18bde 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -298,4 +298,30 @@ public class MetadataStoreTest extends BaseMetadataStoreTest {
assertEquals(n.getType(), NotificationType.ChildrenChanged);
assertEquals(n.getPath(), key1);
}
+
+ @Test(dataProvider = "impl")
+ public void testDeleteRecursive(String provider, String url) throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());
+
+ String prefix = newKey();
+
+ String key1 = newKey();
+ store.put(prefix + key1, "value-1".getBytes(), Optional.of(-1L)).join();
+
+ store.put(prefix + key1 + "/c1", "value".getBytes(), Optional.of(-1L)).join();
+ store.put(prefix + key1 + "/c2", "value".getBytes(), Optional.of(-1L)).join();
+ store.put(prefix + key1 + "/c1/x1", "value".getBytes(), Optional.of(-1L)).join();
+ store.put(prefix + key1 + "/c1/x2", "value".getBytes(), Optional.of(-1L)).join();
+ store.put(prefix + key1 + "/c2/y2", "value".getBytes(), Optional.of(-1L)).join();
+ store.put(prefix + key1 + "/c3", "value".getBytes(), Optional.of(-1L)).join();
+
+ String key2 = newKey();
+ store.put(prefix + key2, "value-2".getBytes(), Optional.of(-1L)).join();
+
+ store.deleteRecursive(prefix + key1).join();
+
+ assertEquals(store.getChildren(prefix).join(), Collections.singletonList(key2.substring(1)));
+ }
+
}