You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bookkeeper.apache.org by "Sijie Guo (JIRA)" <ji...@apache.org> on 2012/10/22 09:22:12 UTC

[jira] [Updated] (BOOKKEEPER-439) No more messages delivered after deleted consumed ledgers.

     [ https://issues.apache.org/jira/browse/BOOKKEEPER-439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sijie Guo updated BOOKKEEPER-439:
---------------------------------

    Attachment: BOOKKEEPER-439.diff

Attach a patch to address the issue. The main idea is to open first ledger to get num entries to compute right start seq id.

but it could not resolve a case (a test case I added in the patch testScanMessagesOnEmptyLedgerAfterDeleteLedger).

1) published 2 messages into a topic.
2) release/acquire topic to force a new ledger
3) consumed published messages, so the previous ledger is deleted, but no messages are in current ledger.
4) release/acquire topic, the ledger created in 2) will be pruned due to there is no ledgers in it. a new ledger is created to serving new messages.
5) publish 2 more messages.

the expected message ids published in 5) would be 3 and 4. but we had no knowledge about start seq id, so messages would assigned seq id with 1 and 2.

in order to fix this case, I improve protocol of LedgerRanges to add a field 'startSeqId'.

Besides these two fixes, I added 3 test cases to verify them.
                
> No more messages delivered after deleted consumed ledgers.
> ----------------------------------------------------------
>
>                 Key: BOOKKEEPER-439
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-439
>             Project: Bookkeeper
>          Issue Type: Bug
>          Components: hedwig-server
>    Affects Versions: 4.1.0, 4.2.0
>            Reporter: Sijie Guo
>            Assignee: Sijie Guo
>            Priority: Critical
>             Fix For: 4.2.0
>
>         Attachments: BOOKKEEPER-439.diff
>
>
> We encountered exception as below:
> {quote}
> 2012-10-18 09:27:27,248 - DEBUG [CacheThread:BookkeeperPersistenceManager$RangeScanOp@247] - Issuing a bk read for ledger: L2 from entry-id: 100 to entry-id: 103
> 2012-10-18 09:27:27,248 - ERROR [CacheThread:BookkeeperPersistenceManager$RangeScanOp$2@261] - Error while reading from ledger: L2 for topic: TOPIC
> org.apache.bookkeeper.client.BKException$BKReadException
>         at org.apache.bookkeeper.client.BKException.create(BKException.java:48)
>         at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp$2.safeReadComplete(BookkeeperPersistenceManager.java:260)
>         at org.apache.hedwig.zookeeper.SafeAsynBKCallback$ReadCallback.readComplete(SafeAsynBKCallback.java:61)
>         at org.apache.bookkeeper.client.LedgerHandle.asyncReadEntries(LedgerHandle.java:380)
>         at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.read(BookkeeperPersistenceManager.java:252)
>         at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.startReadingFrom(BookkeeperPersistenceManager.java:327)
>         at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.runInternal(BookkeeperPersistenceManager.java:217)
>         at org.apache.hedwig.server.common.TopicOpQueuer$SynchronousOp.run(TopicOpQueuer.java:77)
>         at org.apache.hedwig.server.common.TopicOpQueuer.pushAndMaybeRun(TopicOpQueuer.java:105)
>         at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.scanMessages(BookkeeperPersistenceManager.java:336)
>         at org.apache.hedwig.server.persistence.ReadAheadCache$ScanRequestWrapper.performRequest(ReadAheadCache.java:704)
>         at org.apache.hedwig.server.persistence.ReadAheadCache.run(ReadAheadCache.java:291)
>         at java.lang.Thread.run(Thread.java:662)
> {quote}
> topic TOPIC has 2 ledgers L1, L2, each ledger has 100 entries.
> 1) all the 100 entries in L1 has been delivered and consumed.
> 2) 100 entries have been wrote to L2 but not delivered.
> 3) L1 is deleted since all its entries have been consumed.
> 4) hub server shuts down
> 5) TOPIC recovered L2 and started delivering from 101.
> TOPIC was expected to issue a read [0-3] from L2, but a read [100-103] was issued from the exception log, so no entries would be expected to read from L2 at [100-103].
> The problem of this issue is that we used 0 and 1 for the start of message id and ledger id even we had some consumed ledgers deleted.
> {code}
>         void processTopicLedgerRanges(final LedgerRanges ranges, final Version version) {
>             Iterator<LedgerRange> lrIterator = ranges.getRangesList().iterator();
>             TopicInfo topicInfo = new TopicInfo();
>             long startOfLedger = 1;
>             while (lrIterator.hasNext()) {
>                 LedgerRange range = lrIterator.next();
>                 if (range.hasEndSeqIdIncluded()) {
>                     // this means it was a valid and completely closed ledger
>                     long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
>                     topicInfo.ledgerRanges.put(endOfLedger, new InMemoryLedgerRange(range,           startOfLedger));                             startOfLedger = endOfLedger + 1;
>                     continue;
>                 }        
>                 // If it doesn't have a valid end, it must be the last ledger
>                 if (lrIterator.hasNext()) {
>                     String msg = "Ledger-id: " + range.getLedgerId() + " for topic: " + topic.       toStringUtf8()                                            + " is not the last one but still does not have an end seq-id";
>                     logger.error(msg);
>                     cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
>                     return;                }
>                 // The last ledger does not have a valid seq-id, lets try to
>                 // find it out
>                 recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(), version, topicInfo);
>                 return;
>             }
> {code}
> {code}
>                             long prevLedgerEnd = topicInfo.ledgerRanges.isEmpty() ? 0 : topicInfo.   ledgerRanges
>                                                  .lastKey();
>                             LedgerRange lr = LedgerRange.newBuilder().setLedgerId(ledgerId)
>                                              .setEndSeqIdIncluded(lastMessage.getMsgId()).build();
>                             topicInfo.ledgerRanges.put(lr.getEndSeqIdIncluded().getLocalComponent(),
>                                     new InMemoryLedgerRange(lr, prevLedgerEnd + 1, lh));
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira