You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/10/13 04:55:42 UTC

[GitHub] [pulsar] BewareMyPower opened a new pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

BewareMyPower opened a new pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244


   ### Motivation
   
   #8169 introduced a command tool to delete a cluster's metadata from ZK. This PR intends to delete the cluster's ledgers from BK.
   
   ### Modifications
   
   - Retrieve ledger ids from related ZK nodes
   - Add an optional argument to specify BK metadata service URI, then delete these ledgers if it's specified
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-714263112


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #8244: Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#discussion_r506792469



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -107,5 +128,86 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr
         }
     }
 
+    private static List<String> getChildren(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getChildren(path, null);
+        } catch (InterruptedException | KeeperException e) {
+            log.error("Failed to get children of {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte[] getData(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getData(path, null, null);
+        } catch (KeeperException | InterruptedException e) {
+            log.error("Failed to get data from {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
+        try {
+            bookKeeper.deleteLedger(ledgerId);
+            log.debug("Delete ledger id: {}", ledgerId);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (BKException e) {
+            log.warn("Failed to delete ledger {}: {}", ledgerId, e);
+        }
+    }
+
+    private static void deleteManagedLedgers(ZooKeeper zooKeeper, BookKeeper bookKeeper) {
+        final String managedLedgersRoot = "/managed-ledgers";
+        getChildren(zooKeeper, managedLedgersRoot).forEach(tenant -> {
+            final String tenantRoot = managedLedgersRoot + "/" + tenant;
+            getChildren(zooKeeper, tenantRoot).forEach(namespace -> {
+                final String namespaceRoot = String.join("/", tenantRoot, namespace, "persistent");
+                getChildren(zooKeeper, namespaceRoot).forEach(topic -> {

Review comment:
       Yes, seems that it's better than reading from ZK manually. I'll fix it soon.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#discussion_r506952515



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -107,5 +128,86 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr
         }
     }
 
+    private static List<String> getChildren(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getChildren(path, null);
+        } catch (InterruptedException | KeeperException e) {
+            log.error("Failed to get children of {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte[] getData(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getData(path, null, null);
+        } catch (KeeperException | InterruptedException e) {
+            log.error("Failed to get data from {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
+        try {
+            bookKeeper.deleteLedger(ledgerId);
+            log.debug("Delete ledger id: {}", ledgerId);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (BKException e) {
+            log.warn("Failed to delete ledger {}: {}", ledgerId, e);
+        }
+    }
+
+    private static void deleteManagedLedgers(ZooKeeper zooKeeper, BookKeeper bookKeeper) {
+        final String managedLedgersRoot = "/managed-ledgers";

Review comment:
       ok. looking forward for news




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-708166037


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-710974475


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-714285659


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#discussion_r506572994



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -107,5 +128,86 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr
         }
     }
 
+    private static List<String> getChildren(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getChildren(path, null);
+        } catch (InterruptedException | KeeperException e) {
+            log.error("Failed to get children of {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte[] getData(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getData(path, null, null);
+        } catch (KeeperException | InterruptedException e) {
+            log.error("Failed to get data from {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
+        try {
+            bookKeeper.deleteLedger(ledgerId);
+            log.debug("Delete ledger id: {}", ledgerId);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (BKException e) {
+            log.warn("Failed to delete ledger {}: {}", ledgerId, e);
+        }
+    }
+
+    private static void deleteManagedLedgers(ZooKeeper zooKeeper, BookKeeper bookKeeper) {
+        final String managedLedgersRoot = "/managed-ledgers";
+        getChildren(zooKeeper, managedLedgersRoot).forEach(tenant -> {
+            final String tenantRoot = managedLedgersRoot + "/" + tenant;
+            getChildren(zooKeeper, tenantRoot).forEach(namespace -> {
+                final String namespaceRoot = String.join("/", tenantRoot, namespace, "persistent");
+                getChildren(zooKeeper, namespaceRoot).forEach(topic -> {

Review comment:
       I think the logic here can be replaced by using `ManagedLedgerFactory#delete` to delete the ledger, no?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -107,5 +128,86 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr
         }
     }
 
+    private static List<String> getChildren(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getChildren(path, null);
+        } catch (InterruptedException | KeeperException e) {
+            log.error("Failed to get children of {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte[] getData(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getData(path, null, null);
+        } catch (KeeperException | InterruptedException e) {
+            log.error("Failed to get data from {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
+        try {
+            bookKeeper.deleteLedger(ledgerId);
+            log.debug("Delete ledger id: {}", ledgerId);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (BKException e) {
+            log.warn("Failed to delete ledger {}: {}", ledgerId, e);
+        }
+    }
+
+    private static void deleteManagedLedgers(ZooKeeper zooKeeper, BookKeeper bookKeeper) {
+        final String managedLedgersRoot = "/managed-ledgers";
+        getChildren(zooKeeper, managedLedgersRoot).forEach(tenant -> {
+            final String tenantRoot = managedLedgersRoot + "/" + tenant;
+            getChildren(zooKeeper, tenantRoot).forEach(namespace -> {
+                final String namespaceRoot = String.join("/", tenantRoot, namespace, "persistent");
+                getChildren(zooKeeper, namespaceRoot).forEach(topic -> {
+                    final String topicRoot = namespaceRoot + "/" + topic;
+                    byte[] topicData = getData(zooKeeper, topicRoot);
+                    try {
+                        ManagedLedgerInfo.parseFrom(topicData).getLedgerInfoList().stream()
+                                .map(ManagedLedgerInfo.LedgerInfo::getLedgerId)
+                                .forEach(ledgerId -> deleteLedger(bookKeeper, ledgerId));
+
+                        getChildren(zooKeeper, topicRoot).stream().map(subscription -> {
+                            final String subscriptionRoot = topicRoot + "/" + subscription;
+                            try {
+                                return ManagedCursorInfo.parseFrom(getData(zooKeeper, subscriptionRoot)).getCursorsLedgerId();
+                            } catch (InvalidProtocolBufferException e) {
+                                log.warn("Invalid data format from {}: {}", subscriptionRoot, e);
+                                return -1L;

Review comment:
       I don't think you should return `-1L` here. This will trigger `deleteLedger(bookkeeper, -1L)` in line 183. This is not a good implementation. You should return `null` and skip deletion if it is `null`.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -107,5 +128,86 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr
         }
     }
 
+    private static List<String> getChildren(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getChildren(path, null);
+        } catch (InterruptedException | KeeperException e) {
+            log.error("Failed to get children of {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte[] getData(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getData(path, null, null);
+        } catch (KeeperException | InterruptedException e) {
+            log.error("Failed to get data from {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
+        try {
+            bookKeeper.deleteLedger(ledgerId);
+            log.debug("Delete ledger id: {}", ledgerId);

Review comment:
       we wrap `log.debug` in `if (log.isDebugEnabled())`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#discussion_r506924369



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -72,6 +84,13 @@ public static void main(String[] args) throws InterruptedException {
 
         ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
 
+        if (arguments.bkMetadataServiceUri != null) {
+            BookKeeper bookKeeper = new BookKeeper(new ClientConfiguration().setMetadataServiceUri(arguments.bkMetadataServiceUri));

Review comment:
       Makes sense.
   We should create a ticket to movie to the new API.
   I will take care of it.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#discussion_r506789452



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -107,5 +128,86 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr
         }
     }
 
+    private static List<String> getChildren(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getChildren(path, null);
+        } catch (InterruptedException | KeeperException e) {
+            log.error("Failed to get children of {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte[] getData(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getData(path, null, null);
+        } catch (KeeperException | InterruptedException e) {
+            log.error("Failed to get data from {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
+        try {
+            bookKeeper.deleteLedger(ledgerId);
+            log.debug("Delete ledger id: {}", ledgerId);

Review comment:
       Ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-714069262


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#discussion_r506928124



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -107,5 +128,86 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr
         }
     }
 
+    private static List<String> getChildren(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getChildren(path, null);
+        } catch (InterruptedException | KeeperException e) {
+            log.error("Failed to get children of {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte[] getData(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getData(path, null, null);
+        } catch (KeeperException | InterruptedException e) {
+            log.error("Failed to get data from {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
+        try {
+            bookKeeper.deleteLedger(ledgerId);
+            log.debug("Delete ledger id: {}", ledgerId);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (BKException e) {
+            log.warn("Failed to delete ledger {}: {}", ledgerId, e);
+        }
+    }
+
+    private static void deleteManagedLedgers(ZooKeeper zooKeeper, BookKeeper bookKeeper) {
+        final String managedLedgersRoot = "/managed-ledgers";

Review comment:
       Yes, as @sijie says, the `ManagedLedgerFactory#delete` method could delete these ledgers simply. I'm testing it now, the only problem is after create the factory to delete ledgers, the program cannot terminate normally. After I solved the problem, new commits would be pushed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-708139122


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#discussion_r506839953



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -72,6 +84,13 @@ public static void main(String[] args) throws InterruptedException {
 
         ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
 
+        if (arguments.bkMetadataServiceUri != null) {
+            BookKeeper bookKeeper = new BookKeeper(new ClientConfiguration().setMetadataServiceUri(arguments.bkMetadataServiceUri));

Review comment:
       Can we use the org.apache.bookkeeper.client.api.Bookkeeper interface? 
   You are using the old, soon deprecated, API




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-711026779


   Now all checks have passed, PTAL again, @sijie @eolivelli 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#discussion_r507018134



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -107,5 +135,77 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr
         }
     }
 
+    private static List<String> getChildren(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getChildren(path, null);
+        } catch (InterruptedException | KeeperException e) {
+            if (e instanceof KeeperException.NoNodeException) {
+                return new ArrayList<>();
+            }
+            log.error("Failed to get children of {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte[] getData(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getData(path, null, null);
+        } catch (KeeperException | InterruptedException e) {
+            log.error("Failed to get data from {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
+        try {
+            bookKeeper.deleteLedger(ledgerId);
+            if (log.isDebugEnabled()) {
+                log.debug("Delete ledger id: {}", ledgerId);
+            }
+        } catch (InterruptedException | BKException e) {
+            log.error("Failed to delete ledger {}: {}", ledgerId, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteManagedLedgers(ZooKeeper zooKeeper, ManagedLedgerFactory managedLedgerFactory) {
+        final String managedLedgersRoot = "/managed-ledgers";
+        getChildren(zooKeeper, managedLedgersRoot).forEach(tenant -> {
+            final String tenantRoot = managedLedgersRoot + "/" + tenant;
+            getChildren(zooKeeper, tenantRoot).forEach(namespace -> {
+                final String namespaceRoot = String.join("/", tenantRoot, namespace, "persistent");
+                getChildren(zooKeeper, namespaceRoot).forEach(topic -> {
+                    final TopicName topicName = TopicName.get(String.join("/", tenant, namespace, topic));
+                    try {
+                        managedLedgerFactory.delete(topicName.getPersistenceNamingEncoding());
+                    } catch (InterruptedException | ManagedLedgerException e) {
+                        log.error("Failed to delete ledgers of {}: {}", topicName, e);
+                        throw new RuntimeException(e);
+                    }
+                });
+            });
+        });
+    }
+
+    private static void deleteSchemaLedgers(ZooKeeper zooKeeper, BookKeeper bookKeeper) {
+        final String schemaLedgersRoot = "/schemas";

Review comment:
       After looking into code again, I think it's not proper to use `BookkeeperSchemaStorage` here. Because `BookKeeper` will be initialized until `start()` method is invoked. If a constructor which accepts a `BookKeeper` instance was added, the `BookkeeperSchemaStorage` shouldn't invoke `start()` and some other methods. On the other hand, the current implementation of `BookkeeperSchemaStorage#delete` doesn't access ZooKeeper. So at this moment, we need to access to ZooKeeper directly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-707521644


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-708864285


   The change is done, PTAL, @sijie .
   
   In addition, some schema ledgers are not recorded in ZK, so it's not possible to locate them now. In [#8528](https://github.com/apache/pulsar/pull/8258) I try to solve the issue. If one of these two PRs was merged first, I would complete the rest work in the other PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#discussion_r506959623



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -107,5 +135,77 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr
         }
     }
 
+    private static List<String> getChildren(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getChildren(path, null);
+        } catch (InterruptedException | KeeperException e) {
+            if (e instanceof KeeperException.NoNodeException) {
+                return new ArrayList<>();
+            }
+            log.error("Failed to get children of {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte[] getData(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getData(path, null, null);
+        } catch (KeeperException | InterruptedException e) {
+            log.error("Failed to get data from {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
+        try {
+            bookKeeper.deleteLedger(ledgerId);
+            if (log.isDebugEnabled()) {
+                log.debug("Delete ledger id: {}", ledgerId);
+            }
+        } catch (InterruptedException | BKException e) {
+            log.error("Failed to delete ledger {}: {}", ledgerId, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteManagedLedgers(ZooKeeper zooKeeper, ManagedLedgerFactory managedLedgerFactory) {
+        final String managedLedgersRoot = "/managed-ledgers";
+        getChildren(zooKeeper, managedLedgersRoot).forEach(tenant -> {
+            final String tenantRoot = managedLedgersRoot + "/" + tenant;
+            getChildren(zooKeeper, tenantRoot).forEach(namespace -> {
+                final String namespaceRoot = String.join("/", tenantRoot, namespace, "persistent");
+                getChildren(zooKeeper, namespaceRoot).forEach(topic -> {
+                    final TopicName topicName = TopicName.get(String.join("/", tenant, namespace, topic));
+                    try {
+                        managedLedgerFactory.delete(topicName.getPersistenceNamingEncoding());
+                    } catch (InterruptedException | ManagedLedgerException e) {
+                        log.error("Failed to delete ledgers of {}: {}", topicName, e);
+                        throw new RuntimeException(e);
+                    }
+                });
+            });
+        });
+    }
+
+    private static void deleteSchemaLedgers(ZooKeeper zooKeeper, BookKeeper bookKeeper) {
+        final String schemaLedgersRoot = "/schemas";

Review comment:
       Schema's ledgers are managed by `BookkeeperSchemaStorage`, but it's coupled with `PulsarService` currently because it's a part of `pulsar-broker`, not an independent module like `managed-ledger`. So it uses `PulsarService`'s BK client now. The change needs some refactor of schema storage.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-713687614


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-714214522


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-708092600


   @sijie OK


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#discussion_r506958608



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -107,5 +135,77 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr
         }
     }
 
+    private static List<String> getChildren(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getChildren(path, null);
+        } catch (InterruptedException | KeeperException e) {
+            if (e instanceof KeeperException.NoNodeException) {
+                return new ArrayList<>();
+            }
+            log.error("Failed to get children of {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte[] getData(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getData(path, null, null);
+        } catch (KeeperException | InterruptedException e) {
+            log.error("Failed to get data from {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
+        try {
+            bookKeeper.deleteLedger(ledgerId);
+            if (log.isDebugEnabled()) {
+                log.debug("Delete ledger id: {}", ledgerId);
+            }
+        } catch (InterruptedException | BKException e) {
+            log.error("Failed to delete ledger {}: {}", ledgerId, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteManagedLedgers(ZooKeeper zooKeeper, ManagedLedgerFactory managedLedgerFactory) {
+        final String managedLedgersRoot = "/managed-ledgers";
+        getChildren(zooKeeper, managedLedgersRoot).forEach(tenant -> {
+            final String tenantRoot = managedLedgersRoot + "/" + tenant;
+            getChildren(zooKeeper, tenantRoot).forEach(namespace -> {
+                final String namespaceRoot = String.join("/", tenantRoot, namespace, "persistent");
+                getChildren(zooKeeper, namespaceRoot).forEach(topic -> {
+                    final TopicName topicName = TopicName.get(String.join("/", tenant, namespace, topic));
+                    try {
+                        managedLedgerFactory.delete(topicName.getPersistenceNamingEncoding());
+                    } catch (InterruptedException | ManagedLedgerException e) {
+                        log.error("Failed to delete ledgers of {}: {}", topicName, e);
+                        throw new RuntimeException(e);
+                    }
+                });
+            });
+        });
+    }
+
+    private static void deleteSchemaLedgers(ZooKeeper zooKeeper, BookKeeper bookKeeper) {
+        final String schemaLedgersRoot = "/schemas";

Review comment:
       Probably all of this direct access to Zookeeper is to be moved inside ManagedLedgerFactory.
   
   Wdyt @sijie?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-713467402


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#discussion_r506925233



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -107,5 +128,86 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr
         }
     }
 
+    private static List<String> getChildren(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getChildren(path, null);
+        } catch (InterruptedException | KeeperException e) {
+            log.error("Failed to get children of {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte[] getData(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getData(path, null, null);
+        } catch (KeeperException | InterruptedException e) {
+            log.error("Failed to get data from {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
+        try {
+            bookKeeper.deleteLedger(ledgerId);
+            log.debug("Delete ledger id: {}", ledgerId);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (BKException e) {
+            log.warn("Failed to delete ledger {}: {}", ledgerId, e);
+        }
+    }
+
+    private static void deleteManagedLedgers(ZooKeeper zooKeeper, BookKeeper bookKeeper) {
+        final String managedLedgersRoot = "/managed-ledgers";

Review comment:
       Isn't there any Pulsar internal API to get this list of ledgers?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-714308624


   I've added an integration test, PTAL, @sijie 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#discussion_r506960018



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -107,5 +135,77 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr
         }
     }
 
+    private static List<String> getChildren(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getChildren(path, null);
+        } catch (InterruptedException | KeeperException e) {
+            if (e instanceof KeeperException.NoNodeException) {
+                return new ArrayList<>();
+            }
+            log.error("Failed to get children of {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte[] getData(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getData(path, null, null);
+        } catch (KeeperException | InterruptedException e) {
+            log.error("Failed to get data from {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
+        try {
+            bookKeeper.deleteLedger(ledgerId);
+            if (log.isDebugEnabled()) {
+                log.debug("Delete ledger id: {}", ledgerId);
+            }
+        } catch (InterruptedException | BKException e) {
+            log.error("Failed to delete ledger {}: {}", ledgerId, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteManagedLedgers(ZooKeeper zooKeeper, ManagedLedgerFactory managedLedgerFactory) {
+        final String managedLedgersRoot = "/managed-ledgers";
+        getChildren(zooKeeper, managedLedgersRoot).forEach(tenant -> {
+            final String tenantRoot = managedLedgersRoot + "/" + tenant;
+            getChildren(zooKeeper, tenantRoot).forEach(namespace -> {
+                final String namespaceRoot = String.join("/", tenantRoot, namespace, "persistent");
+                getChildren(zooKeeper, namespaceRoot).forEach(topic -> {
+                    final TopicName topicName = TopicName.get(String.join("/", tenant, namespace, topic));
+                    try {
+                        managedLedgerFactory.delete(topicName.getPersistenceNamingEncoding());
+                    } catch (InterruptedException | ManagedLedgerException e) {
+                        log.error("Failed to delete ledgers of {}: {}", topicName, e);
+                        throw new RuntimeException(e);
+                    }
+                });
+            });
+        });
+    }
+
+    private static void deleteSchemaLedgers(ZooKeeper zooKeeper, BookKeeper bookKeeper) {
+        final String schemaLedgersRoot = "/schemas";

Review comment:
       However, since `BookkeeperSchemaStorage` only uses `PulsarService` for creating a `BookKeeper` interface, the refactor may not need too much changes. I'll try to refactor it later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#issuecomment-713612160


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#discussion_r506789940



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -107,5 +128,86 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr
         }
     }
 
+    private static List<String> getChildren(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getChildren(path, null);
+        } catch (InterruptedException | KeeperException e) {
+            log.error("Failed to get children of {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte[] getData(ZooKeeper zooKeeper, String path) {
+        try {
+            return zooKeeper.getData(path, null, null);
+        } catch (KeeperException | InterruptedException e) {
+            log.error("Failed to get data from {}: {}", path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
+        try {
+            bookKeeper.deleteLedger(ledgerId);
+            log.debug("Delete ledger id: {}", ledgerId);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (BKException e) {
+            log.warn("Failed to delete ledger {}: {}", ledgerId, e);
+        }
+    }
+
+    private static void deleteManagedLedgers(ZooKeeper zooKeeper, BookKeeper bookKeeper) {
+        final String managedLedgersRoot = "/managed-ledgers";
+        getChildren(zooKeeper, managedLedgersRoot).forEach(tenant -> {
+            final String tenantRoot = managedLedgersRoot + "/" + tenant;
+            getChildren(zooKeeper, tenantRoot).forEach(namespace -> {
+                final String namespaceRoot = String.join("/", tenantRoot, namespace, "persistent");
+                getChildren(zooKeeper, namespaceRoot).forEach(topic -> {
+                    final String topicRoot = namespaceRoot + "/" + topic;
+                    byte[] topicData = getData(zooKeeper, topicRoot);
+                    try {
+                        ManagedLedgerInfo.parseFrom(topicData).getLedgerInfoList().stream()
+                                .map(ManagedLedgerInfo.LedgerInfo::getLedgerId)
+                                .forEach(ledgerId -> deleteLedger(bookKeeper, ledgerId));
+
+                        getChildren(zooKeeper, topicRoot).stream().map(subscription -> {
+                            final String subscriptionRoot = topicRoot + "/" + subscription;
+                            try {
+                                return ManagedCursorInfo.parseFrom(getData(zooKeeper, subscriptionRoot)).getCursorsLedgerId();
+                            } catch (InvalidProtocolBufferException e) {
+                                log.warn("Invalid data format from {}: {}", subscriptionRoot, e);
+                                return -1L;

Review comment:
       From the comment after this `try-catch` block, I noted that some valid cursor ledger id could also be -1, so I return -1L here to simplify the filter logic from `filter(ledgerId -> (ledgerId != null && ledgerId >= 0))` to `filter(ledgerId -> ledgerId >= 0)`. But somehow I deleted the `filter` sentence, maybe it's during one refactor. it's my fault, I'll add `filter` here and still returns -1L in `catch` block. Or should still return `null`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #8244: [broker] Delete associated ledgers before deleting cluster metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #8244:
URL: https://github.com/apache/pulsar/pull/8244#discussion_r506914289



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
##########
@@ -72,6 +84,13 @@ public static void main(String[] args) throws InterruptedException {
 
         ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
 
+        if (arguments.bkMetadataServiceUri != null) {
+            BookKeeper bookKeeper = new BookKeeper(new ClientConfiguration().setMetadataServiceUri(arguments.bkMetadataServiceUri));

Review comment:
       IMO, it's better to be consistent with the project. The Pulsar project uses the old API currently which can be verified by running
   
   ```bash
   find . -name "*.java" | xargs grep -n "import .*\.BookKeeper"
   ```
   in the project directory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org