You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/08/01 12:08:41 UTC

[GitHub] [pulsar] sijie commented on a change in pull request #4557: [transaction][acknowledge] Persist pending ack messages to cursor meta store.

sijie commented on a change in pull request #4557: [transaction][acknowledge] Persist pending ack messages to cursor meta store.
URL: https://github.com/apache/pulsar/pull/4557#discussion_r309661731
 
 

 ##########
 File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 ##########
 @@ -426,6 +454,85 @@ public void operationFailed(ManagedLedgerException exception) {
         });
     }
 
+    public void recoverPendingAckPosition(CompletableFuture<SubscriptionPendingAckMessages> future) {
+        // Read the meta-data ledgerId from the store
+        log.info("[{}] Recovering pending ack messages from bookkeeper ledger cursor: {}", ledger.getName(), name);
+        ledger.getStore().asyncGetSubscriptionPendingAckMessages(ledger.getName(), name, new MetaStoreCallback<SubscriptionPendingAckMessages>() {
+            @Override
+            public void operationComplete(SubscriptionPendingAckMessages subscriptionPendingAckMessages, Stat stat) {
+
+                pendingAckMessagesStoreStat = stat;
+
+                if (subscriptionPendingAckMessages.getPendingAckMsgLedgerId() == -1L) {
+                    // Pending ack messages info is stored in ledger meta store.
+                    future.complete(subscriptionPendingAckMessages);
+                    startCreateNewPendingAckMsgLedger();
+                    return;
+                } else {
+                    // Pending ack message info is stored in ledger.
+                    ledger.mbean.startCursorLedgerOpenOp();
+                    long ledgerId = subscriptionPendingAckMessages.getPendingAckMsgLedgerId();
+                    bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), (rc, lh, ctx) -> {
+                        if (isBkErrorNotRecoverable(rc)) {
+                            log.error("[{}] Cursor {} fail to open pending ack message ledger {} for recover",
+                                    ledger.getName(), name, ledgerId);
+                            startCreateNewPendingAckMsgLedger();
 
 Review comment:
   If we fail to open the ledger, why do we call `startCreateNewPendingAckMsgLedger` here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services