You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/12/30 10:23:38 UTC

[GitHub] [pulsar] wuzhanpeng opened a new pull request #13575: [WIP][PIP-129] Introduce intermediate state for ledger deletion

wuzhanpeng opened a new pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575


   
   Master Issue: #13526 
   
   ### Motivation
   
   This pull request is the specific implementation plan of PIP-129
   
   ### Modifications
   
   - Mark the deletable ledgers before actually delete them
   - Add the ability to check and track deletable ledgers
   
   Most changes are concentrated on `org.apache.bookkeeper.mledger.ManagedLedger` and `org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl`.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   
   - TBD
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
     
   - [x] `no-need-doc` 
     
   Only involves the improvement and reconstruction of internal logic.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao removed a comment on pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
mattisonchao removed a comment on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1003011467


   @wuzhanpeng 
   
   Hi, wuzhanpeng
   
   Can you help me understand the meaning of WIP in Pulsar?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1018081240


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] zymap commented on a change in pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
zymap commented on a change in pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#discussion_r807465780



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2523,7 +2541,25 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
             advanceCursorsIfNecessary(ledgersToDelete);
 
             PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
+
             // Update metadata
+            // Mark deletable ledgers
+            Set<Long> deletableLedgers = Stream
+                    .concat(

Review comment:
       Why do we need to concat the `ledgersToDelete` and `offloadedLedgersToDelete`? Looks like they may duplicate in the deletableLedgers set. 

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2547,21 +2582,15 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
             store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
                 @Override
                 public void operationComplete(Void result, Stat stat) {
+                    // perform actual deletion
+                    removeAllDeletableLedgers();

Review comment:
       We should get the metadata lock in the deleting steps. The ledger deletion may cost a lot of time which may block other metadata operations.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2718,42 +2747,52 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
         }
     }
 
-    private void asyncDeleteLedgerFromBookKeeper(long ledgerId) {
-        asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
-    }
-
-    private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
-        if (!info.getOffloadContext().getBookkeeperDeleted()) {
-            // only delete if it hasn't been previously deleted for offload
-            asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
-        }
-
+    private void asyncDeleteOffloadedLedger(long ledgerId, LedgerInfo info, int retry, DeleteLedgerCallback callback) {
         if (info.getOffloadContext().hasUidMsb()) {
             UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb());
             cleanupOffloaded(ledgerId, uuid,
                     OffloadUtils.getOffloadDriverName(info, config.getLedgerOffloader().getOffloadDriverName()),
                     OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()),
-                    "Trimming");
+                    "Trimming", retry, callback);
         }
     }
 
     private void asyncDeleteLedger(long ledgerId, long retry) {
+        asyncDeleteLedger(ledgerId, retry, new DeleteLedgerCallback() {
+            @Override
+            public void deleteLedgerComplete(Object ctx) {
+

Review comment:
       Don't we need to handle the callback?

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -146,6 +151,13 @@
 
     protected static final int AsyncOperationTimeoutSeconds = 30;
 
+    protected static final String DELETABLE_LEDGER_MARKER_KEYWORD = "pulsar.ml.deletable.ledgers";

Review comment:
       Maybe we keep the `d_ledgers` is enough? The `pulsar.ml` looks useless.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao removed a comment on pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
mattisonchao removed a comment on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1003011467


   @wuzhanpeng 
   
   Hi, wuzhanpeng
   
   Can you help me understand the meaning of WIP in Pulsar?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1021919683


   @wuzhanpeng We'd better expose the ledger delete state into metrics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1035912922


   According to the review comments, I have made the following modifications: 
   - removed the related ledger deletion methods from the managed-ledger interface
   - added related unit tests for two-stage ledger deletion 
   - added monitoring metrics during ledger deletion
   
   @codelipenghui @hangc0276 PTAL at your convenience~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on pull request #13575: [WIP][PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1009625659


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on a change in pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on a change in pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#discussion_r792386696



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2513,7 +2531,25 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
             advanceCursorsIfNecessary(ledgersToDelete);
 
             PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
+
             // Update metadata
+            // Mark deletable ledgers
+            Set<Long> deletableLedgers = Stream
+                    .concat(
+                            ledgersToDelete.stream()
+                                    .filter(ls -> !ls.getOffloadContext().getBookkeeperDeleted()),
+                            offloadedLedgersToDelete.stream())
+                    .map(LedgerInfo::getLedgerId)
+                    .collect(Collectors.toSet());
+
+            // Mark deletable offloaded ledgers
+            Set<Long> deletableOffloadedLedgers = ledgersToDelete.stream()
+                    .filter(ls -> ls.getOffloadContext().hasUidMsb())

Review comment:
       This is mainly implemented according to the original logic. See https://github.com/apache/pulsar/blob/c18d64526708c6dd051217feba3c31a73507620c/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2716-L2729




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on a change in pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#discussion_r807486500



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -146,6 +151,13 @@
 
     protected static final int AsyncOperationTimeoutSeconds = 30;
 
+    protected static final String DELETABLE_LEDGER_MARKER_KEYWORD = "pulsar.ml.deletable.ledgers";

Review comment:
       For current implementation, we shouldn't introduce more overload for current managedLedger Znode storage. we'd better reduce the prefix of keyword or redesign the data structure for properties storage.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2547,21 +2582,15 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
             store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
                 @Override
                 public void operationComplete(Void result, Stat stat) {
+                    // perform actual deletion
+                    removeAllDeletableLedgers();

Review comment:
       The operationComplete will be called by `bookkeeper-ml-scheduler` thread pool, If it executed by synchronously, it will block other operations. We can use another new thread pool the execute it. 
   For the property map problem, could we use another thread safe data structure the store it?

##########
File path: managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
##########
@@ -2103,6 +2103,39 @@ public void testRetentionSize() throws Exception {
         });
     }
 
+    @Test
+    public void testTwoPhraseDeletion() throws Exception {
+        @Cleanup("shutdown")
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(0);
+        config.setMaxEntriesPerLedger(1);
+        config.setRetentionTime(1, TimeUnit.SECONDS);
+        config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
+
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("two_phrase_deletion", config);
+        ManagedCursor c1 = ml.openCursor("testCursor1");
+        ml.addEntry("m1".getBytes());
+        ml.addEntry("m2".getBytes());
+        c1.skipEntries(2, IndividualDeletedEntries.Exclude);
+        // let current ledger close
+        ml.rollCurrentLedgerIfFull();
+        // let retention expire
+        Thread.sleep(1500);

Review comment:
       Avoiding to use sleep, it will introduce flaky test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on a change in pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on a change in pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#discussion_r792360487



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -4039,6 +4122,190 @@ public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
         return statFuture;
     }
 
+    /**
+     * During the execution of this method, lock {@code metadataMutex} needs to be held
+     * because the {@code propertiesMap} would be updated (not thread-safe).
+     * @param deletableLedgerIds
+     */
+    @Override
+    public void markDeletableLedgers(Collection<Long> deletableLedgerIds,
+                                     Collection<Long> deletableOffloadedLedgerIds) {
+        for (Long ledgerId : deletableLedgerIds) {
+            final String deletableLedgerMarker = DELETABLE_LEDGER_MARKER_PREFIX + ledgerId;
+            propertiesMap.put(deletableLedgerMarker, DELETABLE_LEDGER_PLACEHOLDER);
+        }
+        for (Long ledgerId : deletableOffloadedLedgerIds) {
+            final String deletableOffloadedLedgerMarker = DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + ledgerId;
+            // Offload context info is required in ledger cleanup, therefore the serialized info object
+            // is kept in the propertiesMap until the ledger deletion is done
+            final String offloadedLedgerInfo = BaseEncoding.base64().encode(ledgers.get(ledgerId).toByteArray());
+            propertiesMap.put(deletableOffloadedLedgerMarker, offloadedLedgerInfo);
+        }
+    }
+
+    private Set<Long> getAllDeletableLedgers(String prefix) {
+        Set<Long> deletableLedgers = propertiesMap.keySet().stream()
+                .filter(k -> k.startsWith(prefix))
+                .map(k -> {
+                    Long ledgerId = Long.parseLong(k.substring(prefix.length()));
+                    if (deletableLedgerRetryCounter.containsKey(ledgerId)
+                            && deletableLedgerRetryCounter.get(ledgerId).get() >= DEFAULT_LEDGER_DELETE_RETRIES) {
+                        log.error("[{}] Cannot delete ledger:{} after {} reties and now stop retrying on this broker",
+                                name, ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
+                        return null;
+                    }
+                    return ledgerId;
+                })
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+        if (!deletableLedgers.isEmpty()) {
+            return deletableLedgers;
+        }
+        return Sets.newHashSet();
+    }
+
+    @Override
+    public Set<Long> getAllDeletableLedgers() {
+        return getAllDeletableLedgers(DELETABLE_LEDGER_MARKER_PREFIX);
+    }
+
+    @Override
+    public Set<Long> getAllDeletableOffloadedLedgers() {
+        return getAllDeletableLedgers(DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX);
+    }
+
+    /**
+     * During the execution of this method, lock {@code metadataMutex} needs to be held
+     * because the {@code propertiesMap} would be updated (not thread-safe).
+     */
+    @Override
+    public void removeAllDeletableLedgers() {
+        Set<Long> deletableLedgers = getAllDeletableLedgers();
+        Set<Long> deletableOffloadedLedgers = getAllDeletableOffloadedLedgers();
+        final CountDownLatch counter = new CountDownLatch(deletableLedgers.size() + deletableOffloadedLedgers.size());
+
+        Set<Long> finishedDeletedLedgers = ConcurrentHashMap.newKeySet();
+        Set<Long> finishedDeletedOffloadedLedgers = ConcurrentHashMap.newKeySet();
+        Set<Long> timeoutDeletedLedgers = ConcurrentHashMap.newKeySet();
+
+        Set<Long> succeedDeletedLedgers = ConcurrentHashMap.newKeySet();
+        Set<Long> failDeletedLedgers = ConcurrentHashMap.newKeySet();
+
+        Set<Long> succeedDeletedOffloadedLedgers = ConcurrentHashMap.newKeySet();
+        Set<Long> failDeletedOffloadedLedgers = ConcurrentHashMap.newKeySet();
+
+        for (Long deletableLedger : deletableLedgers) {
+            asyncDeleteLedger(deletableLedger, DEFAULT_LEDGER_DELETE_RETRIES,
+                    new DeleteLedgerCallback() {
+                        @Override
+                        public void deleteLedgerComplete(Object ctx) {
+                            counter.countDown();
+                            finishedDeletedLedgers.add(deletableLedger);
+                            succeedDeletedLedgers.add(deletableLedger);
+                        }
+
+                        @Override
+                        public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                            log.warn("[{}] Failed to delete bookkeeper ledger:{} due to",
+                                    name, deletableLedger, exception);
+                            counter.countDown();
+                            finishedDeletedLedgers.add(deletableLedger);
+                            failDeletedLedgers.add(deletableLedger);
+                        }
+                    }, 0, null);
+        }
+
+        for (Long deletableOffloadedLedger : deletableOffloadedLedgers) {
+            final String deletableOffloadedLedgerMarker =
+                    DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + deletableOffloadedLedger;
+
+            try {
+                final LedgerInfo deletableOffloadedLedgerInfo = LedgerInfo.parseFrom(
+                        BaseEncoding.base64().decode(propertiesMap.get(deletableOffloadedLedgerMarker)));
+                asyncDeleteOffloadedLedger(deletableOffloadedLedger, deletableOffloadedLedgerInfo,
+                        DEFAULT_LEDGER_DELETE_RETRIES,
+                        new DeleteLedgerCallback() {
+                            @Override
+                            public void deleteLedgerComplete(Object ctx) {
+                                counter.countDown();
+                                finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                                succeedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                            }
+
+                            @Override
+                            public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                                log.warn("[{}] Failed to delete offloaded ledger:{} due to",
+                                        name, deletableOffloadedLedger, exception);
+                                counter.countDown();
+                                finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                                failDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                            }
+                        });
+            } catch (Exception e) {
+                log.warn("[{}] Failed to retrieve offloaded ledger info of {} due to",
+                        name, deletableOffloadedLedger, e);
+                counter.countDown();
+                finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                failDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+            }
+        }
+
+        try {
+            if (!counter.await(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
+                for (Long deletableLedger : deletableLedgers) {
+                    if (!finishedDeletedLedgers.contains(deletableLedger)) {
+                        log.warn("[{}] Failed to delete ledger:{} due to operation timeout", name, deletableLedger);
+                        timeoutDeletedLedgers.add(deletableLedger);
+                    }
+                }
+                for (Long deletableOffloadedLedger : deletableOffloadedLedgers) {
+                    if (!finishedDeletedOffloadedLedgers.contains(deletableOffloadedLedger)) {
+                        log.warn("[{}] Failed to delete offloaded ledger:{} due to operation timeout",
+                                name, deletableOffloadedLedger);
+                        timeoutDeletedLedgers.add(deletableOffloadedLedger);
+                    }
+                }
+            }
+
+            // remove markers after deleting ledgers
+            for (Long ledgerId : succeedDeletedLedgers) {
+                final String deletableLedgerMarker = DELETABLE_LEDGER_MARKER_PREFIX + ledgerId;
+                propertiesMap.remove(deletableLedgerMarker);
+            }
+            for (Long ledgerId : succeedDeletedOffloadedLedgers) {
+                final String deletableLedgerMarker = DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + ledgerId;
+                propertiesMap.remove(deletableLedgerMarker);
+            }
+
+            // update retry count to track whether the max limit is reached
+            Set<Long> allFailedLedgers = new HashSet<>();
+            allFailedLedgers.addAll(failDeletedLedgers);
+            allFailedLedgers.addAll(failDeletedOffloadedLedgers);
+            allFailedLedgers.addAll(timeoutDeletedLedgers);
+
+            if (allFailedLedgers.isEmpty()) {
+                log.info("[{}] ledgers: {} and offloaded ledgers: {} are deleted successfully.",
+                        name, deletableLedgers, deletableOffloadedLedgers);

Review comment:
       Those collection objects that inherit `java.util.AbstractCollection` have the ability to return a string representation of this collection (`java.util.AbstractCollection#toString`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on pull request #13575: [WIP][PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1008829412


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1020855861


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on a change in pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#discussion_r792338181



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2513,7 +2531,25 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
             advanceCursorsIfNecessary(ledgersToDelete);
 
             PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
+
             // Update metadata
+            // Mark deletable ledgers
+            Set<Long> deletableLedgers = Stream
+                    .concat(
+                            ledgersToDelete.stream()
+                                    .filter(ls -> !ls.getOffloadContext().getBookkeeperDeleted()),
+                            offloadedLedgersToDelete.stream())
+                    .map(LedgerInfo::getLedgerId)
+                    .collect(Collectors.toSet());
+
+            // Mark deletable offloaded ledgers
+            Set<Long> deletableOffloadedLedgers = ledgersToDelete.stream()
+                    .filter(ls -> ls.getOffloadContext().hasUidMsb())

Review comment:
       You'd better check `ls.getOffloadContext().isComplete` instead of `ls.getOffloadContext().hasUidMsb()`

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -4039,6 +4122,190 @@ public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
         return statFuture;
     }
 
+    /**
+     * During the execution of this method, lock {@code metadataMutex} needs to be held
+     * because the {@code propertiesMap} would be updated (not thread-safe).
+     * @param deletableLedgerIds
+     */
+    @Override
+    public void markDeletableLedgers(Collection<Long> deletableLedgerIds,
+                                     Collection<Long> deletableOffloadedLedgerIds) {
+        for (Long ledgerId : deletableLedgerIds) {
+            final String deletableLedgerMarker = DELETABLE_LEDGER_MARKER_PREFIX + ledgerId;
+            propertiesMap.put(deletableLedgerMarker, DELETABLE_LEDGER_PLACEHOLDER);
+        }
+        for (Long ledgerId : deletableOffloadedLedgerIds) {
+            final String deletableOffloadedLedgerMarker = DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + ledgerId;
+            // Offload context info is required in ledger cleanup, therefore the serialized info object
+            // is kept in the propertiesMap until the ledger deletion is done
+            final String offloadedLedgerInfo = BaseEncoding.base64().encode(ledgers.get(ledgerId).toByteArray());
+            propertiesMap.put(deletableOffloadedLedgerMarker, offloadedLedgerInfo);
+        }
+    }
+
+    private Set<Long> getAllDeletableLedgers(String prefix) {
+        Set<Long> deletableLedgers = propertiesMap.keySet().stream()
+                .filter(k -> k.startsWith(prefix))
+                .map(k -> {
+                    Long ledgerId = Long.parseLong(k.substring(prefix.length()));
+                    if (deletableLedgerRetryCounter.containsKey(ledgerId)
+                            && deletableLedgerRetryCounter.get(ledgerId).get() >= DEFAULT_LEDGER_DELETE_RETRIES) {
+                        log.error("[{}] Cannot delete ledger:{} after {} reties and now stop retrying on this broker",
+                                name, ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
+                        return null;
+                    }
+                    return ledgerId;
+                })
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+        if (!deletableLedgers.isEmpty()) {
+            return deletableLedgers;
+        }
+        return Sets.newHashSet();
+    }
+
+    @Override
+    public Set<Long> getAllDeletableLedgers() {
+        return getAllDeletableLedgers(DELETABLE_LEDGER_MARKER_PREFIX);
+    }
+
+    @Override
+    public Set<Long> getAllDeletableOffloadedLedgers() {
+        return getAllDeletableLedgers(DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX);
+    }
+
+    /**
+     * During the execution of this method, lock {@code metadataMutex} needs to be held
+     * because the {@code propertiesMap} would be updated (not thread-safe).
+     */
+    @Override
+    public void removeAllDeletableLedgers() {
+        Set<Long> deletableLedgers = getAllDeletableLedgers();
+        Set<Long> deletableOffloadedLedgers = getAllDeletableOffloadedLedgers();
+        final CountDownLatch counter = new CountDownLatch(deletableLedgers.size() + deletableOffloadedLedgers.size());
+
+        Set<Long> finishedDeletedLedgers = ConcurrentHashMap.newKeySet();
+        Set<Long> finishedDeletedOffloadedLedgers = ConcurrentHashMap.newKeySet();
+        Set<Long> timeoutDeletedLedgers = ConcurrentHashMap.newKeySet();
+
+        Set<Long> succeedDeletedLedgers = ConcurrentHashMap.newKeySet();
+        Set<Long> failDeletedLedgers = ConcurrentHashMap.newKeySet();
+
+        Set<Long> succeedDeletedOffloadedLedgers = ConcurrentHashMap.newKeySet();
+        Set<Long> failDeletedOffloadedLedgers = ConcurrentHashMap.newKeySet();
+
+        for (Long deletableLedger : deletableLedgers) {
+            asyncDeleteLedger(deletableLedger, DEFAULT_LEDGER_DELETE_RETRIES,
+                    new DeleteLedgerCallback() {
+                        @Override
+                        public void deleteLedgerComplete(Object ctx) {
+                            counter.countDown();
+                            finishedDeletedLedgers.add(deletableLedger);
+                            succeedDeletedLedgers.add(deletableLedger);
+                        }
+
+                        @Override
+                        public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                            log.warn("[{}] Failed to delete bookkeeper ledger:{} due to",
+                                    name, deletableLedger, exception);
+                            counter.countDown();
+                            finishedDeletedLedgers.add(deletableLedger);
+                            failDeletedLedgers.add(deletableLedger);
+                        }
+                    }, 0, null);
+        }
+
+        for (Long deletableOffloadedLedger : deletableOffloadedLedgers) {
+            final String deletableOffloadedLedgerMarker =
+                    DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + deletableOffloadedLedger;
+
+            try {
+                final LedgerInfo deletableOffloadedLedgerInfo = LedgerInfo.parseFrom(
+                        BaseEncoding.base64().decode(propertiesMap.get(deletableOffloadedLedgerMarker)));
+                asyncDeleteOffloadedLedger(deletableOffloadedLedger, deletableOffloadedLedgerInfo,
+                        DEFAULT_LEDGER_DELETE_RETRIES,
+                        new DeleteLedgerCallback() {
+                            @Override
+                            public void deleteLedgerComplete(Object ctx) {
+                                counter.countDown();
+                                finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                                succeedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                            }
+
+                            @Override
+                            public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                                log.warn("[{}] Failed to delete offloaded ledger:{} due to",
+                                        name, deletableOffloadedLedger, exception);
+                                counter.countDown();
+                                finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                                failDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                            }
+                        });
+            } catch (Exception e) {
+                log.warn("[{}] Failed to retrieve offloaded ledger info of {} due to",
+                        name, deletableOffloadedLedger, e);
+                counter.countDown();
+                finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                failDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+            }
+        }
+
+        try {
+            if (!counter.await(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
+                for (Long deletableLedger : deletableLedgers) {
+                    if (!finishedDeletedLedgers.contains(deletableLedger)) {
+                        log.warn("[{}] Failed to delete ledger:{} due to operation timeout", name, deletableLedger);
+                        timeoutDeletedLedgers.add(deletableLedger);
+                    }
+                }
+                for (Long deletableOffloadedLedger : deletableOffloadedLedgers) {
+                    if (!finishedDeletedOffloadedLedgers.contains(deletableOffloadedLedger)) {
+                        log.warn("[{}] Failed to delete offloaded ledger:{} due to operation timeout",
+                                name, deletableOffloadedLedger);
+                        timeoutDeletedLedgers.add(deletableOffloadedLedger);
+                    }
+                }
+            }
+
+            // remove markers after deleting ledgers
+            for (Long ledgerId : succeedDeletedLedgers) {
+                final String deletableLedgerMarker = DELETABLE_LEDGER_MARKER_PREFIX + ledgerId;
+                propertiesMap.remove(deletableLedgerMarker);
+            }
+            for (Long ledgerId : succeedDeletedOffloadedLedgers) {
+                final String deletableLedgerMarker = DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + ledgerId;
+                propertiesMap.remove(deletableLedgerMarker);
+            }
+
+            // update retry count to track whether the max limit is reached
+            Set<Long> allFailedLedgers = new HashSet<>();
+            allFailedLedgers.addAll(failDeletedLedgers);
+            allFailedLedgers.addAll(failDeletedOffloadedLedgers);
+            allFailedLedgers.addAll(timeoutDeletedLedgers);
+
+            if (allFailedLedgers.isEmpty()) {
+                log.info("[{}] ledgers: {} and offloaded ledgers: {} are deleted successfully.",
+                        name, deletableLedgers, deletableOffloadedLedgers);

Review comment:
       `deletableLedgers` and `deletableOffloadedLedgers` are Set and not override `toString`, So it will print the object address instead of the items.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -649,4 +651,31 @@ void asyncSetProperties(Map<String, String> properties, AsyncCallbacks.UpdatePro
      * @return the future of managed ledger internal stats
      */
     CompletableFuture<ManagedLedgerInternalStats> getManagedLedgerInternalStats(boolean includeLedgerMetadata);
+
+    /**
+     * Mark deletable ledgers for bookkeeper and offload storage.
+     *
+     * @param deletableLedgerIds
+     * @param deletableOffloadedLedgerIds
+     */
+    void markDeletableLedgers(Collection<Long> deletableLedgerIds, Collection<Long> deletableOffloadedLedgerIds);
+
+    /**
+     * Get all deletable ledgers.
+     *
+     * @return all the deletable ledgers of the managed-ledger
+     */
+    Set<Long> getAllDeletableLedgers();
+
+    /**
+     * Get all deletable offloaded ledgers.
+     *
+     * @return all the deletable offloaded ledgers of the managed-ledger
+     */
+    Set<Long> getAllDeletableOffloadedLedgers();
+
+    /**
+     * Check and remove all the deletable ledgers.
+     */
+    void removeAllDeletableLedgers();

Review comment:
       +1

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -146,6 +151,13 @@
 
     protected static final int AsyncOperationTimeoutSeconds = 30;
 
+    protected static final String DELETABLE_LEDGER_MARKER_KEYWORD = "pulsar.ml.deletable.ledgers";

Review comment:
       We will append the prefix for each ledger, and store the property map into ZNode. However, if there are thousands of ledgers to delete, the property map will cost too many storage space especially into Znode, which limit by 5MB. The key point is the prefix will duplicate for each ledger. We'd better redesign the data structure of the properties storage.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2547,21 +2582,15 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
             store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
                 @Override
                 public void operationComplete(Void result, Stat stat) {
+                    // perform actual deletion
+                    removeAllDeletableLedgers();

Review comment:
       it will block at most 30s to wait all the ledger delete operation completed. However, this method is called by meta store callback thread, which will block the thread and lead to deadlock.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -649,4 +651,31 @@ void asyncSetProperties(Map<String, String> properties, AsyncCallbacks.UpdatePro
      * @return the future of managed ledger internal stats
      */
     CompletableFuture<ManagedLedgerInternalStats> getManagedLedgerInternalStats(boolean includeLedgerMetadata);
+
+    /**
+     * Mark deletable ledgers for bookkeeper and offload storage.
+     *
+     * @param deletableLedgerIds
+     * @param deletableOffloadedLedgerIds
+     */
+    void markDeletableLedgers(Collection<Long> deletableLedgerIds, Collection<Long> deletableOffloadedLedgerIds);
+
+    /**
+     * Get all deletable ledgers.
+     *
+     * @return all the deletable ledgers of the managed-ledger
+     */
+    Set<Long> getAllDeletableLedgers();
+
+    /**
+     * Get all deletable offloaded ledgers.
+     *
+     * @return all the deletable offloaded ledgers of the managed-ledger
+     */
+    Set<Long> getAllDeletableOffloadedLedgers();
+
+    /**
+     * Check and remove all the deletable ledgers.
+     */
+    void removeAllDeletableLedgers();

Review comment:
       +1

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -4039,6 +4122,190 @@ public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
         return statFuture;
     }
 
+    /**
+     * During the execution of this method, lock {@code metadataMutex} needs to be held
+     * because the {@code propertiesMap} would be updated (not thread-safe).
+     * @param deletableLedgerIds
+     */
+    @Override
+    public void markDeletableLedgers(Collection<Long> deletableLedgerIds,
+                                     Collection<Long> deletableOffloadedLedgerIds) {
+        for (Long ledgerId : deletableLedgerIds) {
+            final String deletableLedgerMarker = DELETABLE_LEDGER_MARKER_PREFIX + ledgerId;
+            propertiesMap.put(deletableLedgerMarker, DELETABLE_LEDGER_PLACEHOLDER);
+        }
+        for (Long ledgerId : deletableOffloadedLedgerIds) {
+            final String deletableOffloadedLedgerMarker = DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + ledgerId;
+            // Offload context info is required in ledger cleanup, therefore the serialized info object
+            // is kept in the propertiesMap until the ledger deletion is done
+            final String offloadedLedgerInfo = BaseEncoding.base64().encode(ledgers.get(ledgerId).toByteArray());
+            propertiesMap.put(deletableOffloadedLedgerMarker, offloadedLedgerInfo);
+        }
+    }
+
+    private Set<Long> getAllDeletableLedgers(String prefix) {
+        Set<Long> deletableLedgers = propertiesMap.keySet().stream()
+                .filter(k -> k.startsWith(prefix))
+                .map(k -> {
+                    Long ledgerId = Long.parseLong(k.substring(prefix.length()));
+                    if (deletableLedgerRetryCounter.containsKey(ledgerId)
+                            && deletableLedgerRetryCounter.get(ledgerId).get() >= DEFAULT_LEDGER_DELETE_RETRIES) {
+                        log.error("[{}] Cannot delete ledger:{} after {} reties and now stop retrying on this broker",
+                                name, ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
+                        return null;
+                    }
+                    return ledgerId;
+                })
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+        if (!deletableLedgers.isEmpty()) {
+            return deletableLedgers;
+        }
+        return Sets.newHashSet();
+    }
+
+    @Override
+    public Set<Long> getAllDeletableLedgers() {
+        return getAllDeletableLedgers(DELETABLE_LEDGER_MARKER_PREFIX);
+    }
+
+    @Override
+    public Set<Long> getAllDeletableOffloadedLedgers() {
+        return getAllDeletableLedgers(DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX);
+    }
+
+    /**
+     * During the execution of this method, lock {@code metadataMutex} needs to be held
+     * because the {@code propertiesMap} would be updated (not thread-safe).
+     */
+    @Override
+    public void removeAllDeletableLedgers() {
+        Set<Long> deletableLedgers = getAllDeletableLedgers();
+        Set<Long> deletableOffloadedLedgers = getAllDeletableOffloadedLedgers();
+        final CountDownLatch counter = new CountDownLatch(deletableLedgers.size() + deletableOffloadedLedgers.size());
+
+        Set<Long> finishedDeletedLedgers = ConcurrentHashMap.newKeySet();
+        Set<Long> finishedDeletedOffloadedLedgers = ConcurrentHashMap.newKeySet();
+        Set<Long> timeoutDeletedLedgers = ConcurrentHashMap.newKeySet();
+
+        Set<Long> succeedDeletedLedgers = ConcurrentHashMap.newKeySet();
+        Set<Long> failDeletedLedgers = ConcurrentHashMap.newKeySet();
+
+        Set<Long> succeedDeletedOffloadedLedgers = ConcurrentHashMap.newKeySet();
+        Set<Long> failDeletedOffloadedLedgers = ConcurrentHashMap.newKeySet();
+
+        for (Long deletableLedger : deletableLedgers) {
+            asyncDeleteLedger(deletableLedger, DEFAULT_LEDGER_DELETE_RETRIES,
+                    new DeleteLedgerCallback() {
+                        @Override
+                        public void deleteLedgerComplete(Object ctx) {
+                            counter.countDown();
+                            finishedDeletedLedgers.add(deletableLedger);
+                            succeedDeletedLedgers.add(deletableLedger);
+                        }
+
+                        @Override
+                        public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                            log.warn("[{}] Failed to delete bookkeeper ledger:{} due to",
+                                    name, deletableLedger, exception);
+                            counter.countDown();
+                            finishedDeletedLedgers.add(deletableLedger);
+                            failDeletedLedgers.add(deletableLedger);
+                        }
+                    }, 0, null);
+        }
+
+        for (Long deletableOffloadedLedger : deletableOffloadedLedgers) {
+            final String deletableOffloadedLedgerMarker =
+                    DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + deletableOffloadedLedger;
+
+            try {
+                final LedgerInfo deletableOffloadedLedgerInfo = LedgerInfo.parseFrom(
+                        BaseEncoding.base64().decode(propertiesMap.get(deletableOffloadedLedgerMarker)));
+                asyncDeleteOffloadedLedger(deletableOffloadedLedger, deletableOffloadedLedgerInfo,
+                        DEFAULT_LEDGER_DELETE_RETRIES,
+                        new DeleteLedgerCallback() {
+                            @Override
+                            public void deleteLedgerComplete(Object ctx) {
+                                counter.countDown();
+                                finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                                succeedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                            }
+
+                            @Override
+                            public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                                log.warn("[{}] Failed to delete offloaded ledger:{} due to",
+                                        name, deletableOffloadedLedger, exception);
+                                counter.countDown();
+                                finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                                failDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                            }
+                        });
+            } catch (Exception e) {
+                log.warn("[{}] Failed to retrieve offloaded ledger info of {} due to",
+                        name, deletableOffloadedLedger, e);
+                counter.countDown();
+                finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                failDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+            }
+        }
+
+        try {
+            if (!counter.await(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
+                for (Long deletableLedger : deletableLedgers) {
+                    if (!finishedDeletedLedgers.contains(deletableLedger)) {
+                        log.warn("[{}] Failed to delete ledger:{} due to operation timeout", name, deletableLedger);
+                        timeoutDeletedLedgers.add(deletableLedger);
+                    }
+                }
+                for (Long deletableOffloadedLedger : deletableOffloadedLedgers) {
+                    if (!finishedDeletedOffloadedLedgers.contains(deletableOffloadedLedger)) {
+                        log.warn("[{}] Failed to delete offloaded ledger:{} due to operation timeout",
+                                name, deletableOffloadedLedger);
+                        timeoutDeletedLedgers.add(deletableOffloadedLedger);
+                    }
+                }
+            }
+
+            // remove markers after deleting ledgers
+            for (Long ledgerId : succeedDeletedLedgers) {
+                final String deletableLedgerMarker = DELETABLE_LEDGER_MARKER_PREFIX + ledgerId;
+                propertiesMap.remove(deletableLedgerMarker);
+            }
+            for (Long ledgerId : succeedDeletedOffloadedLedgers) {
+                final String deletableLedgerMarker = DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + ledgerId;
+                propertiesMap.remove(deletableLedgerMarker);
+            }
+
+            // update retry count to track whether the max limit is reached
+            Set<Long> allFailedLedgers = new HashSet<>();
+            allFailedLedgers.addAll(failDeletedLedgers);
+            allFailedLedgers.addAll(failDeletedOffloadedLedgers);
+            allFailedLedgers.addAll(timeoutDeletedLedgers);
+
+            if (allFailedLedgers.isEmpty()) {
+                log.info("[{}] ledgers: {} and offloaded ledgers: {} are deleted successfully.",
+                        name, deletableLedgers, deletableOffloadedLedgers);
+            } else {
+                for (Long failDeletedLedger : allFailedLedgers) {
+                    deletableLedgerRetryCounter
+                            .computeIfAbsent(failDeletedLedger, k -> new AtomicInteger()).incrementAndGet();
+                }
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Successfully delete bookkeeper ledgers: {} and offloaded ledgers: {}. "
+                                + "Failed to delete bookkeeper ledgers: {} and offloaded ledgers: {}", name,
+                        succeedDeletedLedgers, succeedDeletedOffloadedLedgers,

Review comment:
       The same above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1078580446


   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1047403893


   I have made some updates based on the comments, PTAL~ @hangc0276 @zymap 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on a change in pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on a change in pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#discussion_r811038733



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2718,42 +2747,52 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
         }
     }
 
-    private void asyncDeleteLedgerFromBookKeeper(long ledgerId) {
-        asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
-    }
-
-    private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
-        if (!info.getOffloadContext().getBookkeeperDeleted()) {
-            // only delete if it hasn't been previously deleted for offload
-            asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
-        }
-
+    private void asyncDeleteOffloadedLedger(long ledgerId, LedgerInfo info, int retry, DeleteLedgerCallback callback) {
         if (info.getOffloadContext().hasUidMsb()) {
             UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb());
             cleanupOffloaded(ledgerId, uuid,
                     OffloadUtils.getOffloadDriverName(info, config.getLedgerOffloader().getOffloadDriverName()),
                     OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()),
-                    "Trimming");
+                    "Trimming", retry, callback);
         }
     }
 
     private void asyncDeleteLedger(long ledgerId, long retry) {
+        asyncDeleteLedger(ledgerId, retry, new DeleteLedgerCallback() {
+            @Override
+            public void deleteLedgerComplete(Object ctx) {
+

Review comment:
       This is the default implementation, which is used to be compatible with the original logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#discussion_r792315746



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -649,4 +651,31 @@ void asyncSetProperties(Map<String, String> properties, AsyncCallbacks.UpdatePro
      * @return the future of managed ledger internal stats
      */
     CompletableFuture<ManagedLedgerInternalStats> getManagedLedgerInternalStats(boolean includeLedgerMetadata);
+
+    /**
+     * Mark deletable ledgers for bookkeeper and offload storage.
+     *
+     * @param deletableLedgerIds
+     * @param deletableOffloadedLedgerIds
+     */
+    void markDeletableLedgers(Collection<Long> deletableLedgerIds, Collection<Long> deletableOffloadedLedgerIds);
+
+    /**
+     * Get all deletable ledgers.
+     *
+     * @return all the deletable ledgers of the managed-ledger
+     */
+    Set<Long> getAllDeletableLedgers();
+
+    /**
+     * Get all deletable offloaded ledgers.
+     *
+     * @return all the deletable offloaded ledgers of the managed-ledger
+     */
+    Set<Long> getAllDeletableOffloadedLedgers();
+
+    /**
+     * Check and remove all the deletable ledgers.
+     */
+    void removeAllDeletableLedgers();

Review comment:
       Do we need to expose these methods to the ManagedLedger interface? Looks they are only used in the internal of the ManagedLedgerImpl.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1018290268


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on a change in pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on a change in pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#discussion_r811046224



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2523,7 +2541,25 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
             advanceCursorsIfNecessary(ledgersToDelete);
 
             PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
+
             // Update metadata
+            // Mark deletable ledgers
+            Set<Long> deletableLedgers = Stream
+                    .concat(

Review comment:
       There may be ledgers in both `ledgersToDelete` and `offloadedLedgersToDelete` that need to be deleted. They do have the potential to have duplicate ledgers and that's why `toSet` is used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on pull request #13575: [WIP][PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1009523659


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1020784901


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1020784901






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on a change in pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on a change in pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#discussion_r792396590



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2547,21 +2582,15 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
             store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
                 @Override
                 public void operationComplete(Void result, Stat stat) {
+                    // perform actual deletion
+                    removeAllDeletableLedgers();

Review comment:
       The reason for performming `removeAllDeletableLedgers` synchronously here is that two locks of `trimmer` and `metadata` need to be released after the actual deletion. If the execution is asynchronous, the update process of the property map cannot be protected.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on a change in pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on a change in pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#discussion_r792373311



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -146,6 +151,13 @@
 
     protected static final int AsyncOperationTimeoutSeconds = 30;
 
+    protected static final String DELETABLE_LEDGER_MARKER_KEYWORD = "pulsar.ml.deletable.ledgers";

Review comment:
       Thanks for your question. IMO, the problem of over-size znode should theoretically be a discussion about whether `ManagedLedgerInfo` should be stored on zk. Or we can think of it this way, the ledger information stored in the property map actually comes from the `LedgerInfoList` structure, and this part of the data is also stored in a znode. Therefore, if the number of ledgers is large, the corresponding ml-znode must be huge. To avoid complicating matters, I think we can skip this part in this proposal, and start a new discussion thread on this problem.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1009755025


   @codelipenghui @hangc0276 Could you help review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on pull request #13575: [PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1047456480


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on pull request #13575: [WIP][PIP-129] Introduce intermediate state for ledger deletion

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#issuecomment-1003011467


   @wuzhanpeng 
   
   Hi, wuzhanpeng
   
   Can you help me understand the meaning of WIP in Pulsar?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org