You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2021/06/24 16:59:35 UTC

[pulsar] branch master updated: Made the PulsarClusterMetadataTeardown deletes idempotent (#11042)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7efabc4  Made the PulsarClusterMetadataTeardown deletes idempotent (#11042)
7efabc4 is described below

commit 7efabc4909e3a4a70d56a8e790443f3b349a1a89
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jun 24 18:58:37 2021 +0200

    Made the PulsarClusterMetadataTeardown deletes idempotent (#11042)
    
    ### Motivation
    
    If some of the z-nodes are not there anymore, the cluster teardown operation should not fail.
    
    Also changed the delete recursive operation to delete in parallel for children of same node, to speed up the overall execution time.
---
 .../pulsar/PulsarClusterMetadataTeardown.java      | 27 +++++++++++++++-------
 .../cli/ClusterMetadataTearDownTest.java           |  3 +++
 2 files changed, 22 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
index 53fff01..c4c2c18 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
@@ -22,6 +22,8 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -31,6 +33,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
@@ -100,7 +103,7 @@ public class PulsarClusterMetadataTeardown {
         }
 
         for (String localZkNode : localZkNodes) {
-            deleteRecursively(metadataStore, "/" + localZkNode);
+            deleteRecursively(metadataStore, "/" + localZkNode).join();
         }
 
         if (arguments.configurationStore != null && arguments.cluster != null) {
@@ -108,18 +111,26 @@ public class PulsarClusterMetadataTeardown {
             @Cleanup
             MetadataStore configMetadataStore = MetadataStoreFactory.create(arguments.configurationStore,
                     MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build());
-            deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster);
+            deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster).join();
         }
 
         log.info("Cluster metadata for '{}' teardown.", arguments.cluster);
     }
 
-    private static void deleteRecursively(MetadataStore metadataStore, String path){
-        metadataStore.getChildren(path).join().forEach(child -> {
-            deleteRecursively(metadataStore, path + "/" + child);
-        });
-
-        metadataStore.delete(path, Optional.empty()).join();
+    private static CompletableFuture<Void> deleteRecursively(MetadataStore metadataStore, String path) {
+        return metadataStore.getChildren(path)
+                .thenCompose(children -> FutureUtil.waitForAll(
+                        children.stream()
+                                .map(child -> deleteRecursively(metadataStore, path + "/" + child))
+                                .collect(Collectors.toList())))
+                .thenCompose(__ -> metadataStore.exists(path))
+                .thenCompose(exists -> {
+                    if (exists) {
+                        return metadataStore.delete(path, Optional.empty());
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
     }
 
     private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java
index a60ecc2..8aeb4bf 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java
@@ -199,6 +199,9 @@ public class ClusterMetadataTearDownTest extends TestRetrySupport {
         }
         List<String> clusterNodes = configStore.getChildren( "/admin/clusters").join();
         assertFalse(clusterNodes.contains(pulsarCluster.getClusterName()));
+
+        // Try delete again, should not fail
+        PulsarClusterMetadataTeardown.main(args);
     }
 
     private long getNumOfLedgers() {