You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bookkeeper.apache.org by "Sijie Guo (JIRA)" <ji...@apache.org> on 2012/10/22 09:22:12 UTC
[jira] [Updated] (BOOKKEEPER-439) No more messages delivered after
deleted consumed ledgers.
[ https://issues.apache.org/jira/browse/BOOKKEEPER-439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sijie Guo updated BOOKKEEPER-439:
---------------------------------
Attachment: BOOKKEEPER-439.diff
Attach a patch to address the issue. The main idea is to open first ledger to get num entries to compute right start seq id.
but it could not resolve a case (a test case I added in the patch testScanMessagesOnEmptyLedgerAfterDeleteLedger).
1) published 2 messages into a topic.
2) release/acquire topic to force a new ledger
3) consumed published messages, so the previous ledger is deleted, but no messages are in current ledger.
4) release/acquire topic, the ledger created in 2) will be pruned due to there is no ledgers in it. a new ledger is created to serving new messages.
5) publish 2 more messages.
the expected message ids published in 5) would be 3 and 4. but we had no knowledge about start seq id, so messages would assigned seq id with 1 and 2.
in order to fix this case, I improve protocol of LedgerRanges to add a field 'startSeqId'.
Besides these two fixes, I added 3 test cases to verify them.
> No more messages delivered after deleted consumed ledgers.
> ----------------------------------------------------------
>
> Key: BOOKKEEPER-439
> URL: https://issues.apache.org/jira/browse/BOOKKEEPER-439
> Project: Bookkeeper
> Issue Type: Bug
> Components: hedwig-server
> Affects Versions: 4.1.0, 4.2.0
> Reporter: Sijie Guo
> Assignee: Sijie Guo
> Priority: Critical
> Fix For: 4.2.0
>
> Attachments: BOOKKEEPER-439.diff
>
>
> We encountered exception as below:
> {quote}
> 2012-10-18 09:27:27,248 - DEBUG [CacheThread:BookkeeperPersistenceManager$RangeScanOp@247] - Issuing a bk read for ledger: L2 from entry-id: 100 to entry-id: 103
> 2012-10-18 09:27:27,248 - ERROR [CacheThread:BookkeeperPersistenceManager$RangeScanOp$2@261] - Error while reading from ledger: L2 for topic: TOPIC
> org.apache.bookkeeper.client.BKException$BKReadException
> at org.apache.bookkeeper.client.BKException.create(BKException.java:48)
> at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp$2.safeReadComplete(BookkeeperPersistenceManager.java:260)
> at org.apache.hedwig.zookeeper.SafeAsynBKCallback$ReadCallback.readComplete(SafeAsynBKCallback.java:61)
> at org.apache.bookkeeper.client.LedgerHandle.asyncReadEntries(LedgerHandle.java:380)
> at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.read(BookkeeperPersistenceManager.java:252)
> at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.startReadingFrom(BookkeeperPersistenceManager.java:327)
> at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.runInternal(BookkeeperPersistenceManager.java:217)
> at org.apache.hedwig.server.common.TopicOpQueuer$SynchronousOp.run(TopicOpQueuer.java:77)
> at org.apache.hedwig.server.common.TopicOpQueuer.pushAndMaybeRun(TopicOpQueuer.java:105)
> at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.scanMessages(BookkeeperPersistenceManager.java:336)
> at org.apache.hedwig.server.persistence.ReadAheadCache$ScanRequestWrapper.performRequest(ReadAheadCache.java:704)
> at org.apache.hedwig.server.persistence.ReadAheadCache.run(ReadAheadCache.java:291)
> at java.lang.Thread.run(Thread.java:662)
> {quote}
> topic TOPIC has 2 ledgers L1, L2, each ledger has 100 entries.
> 1) all the 100 entries in L1 has been delivered and consumed.
> 2) 100 entries have been wrote to L2 but not delivered.
> 3) L1 is deleted since all its entries have been consumed.
> 4) hub server shuts down
> 5) TOPIC recovered L2 and started delivering from 101.
> TOPIC was expected to issue a read [0-3] from L2, but a read [100-103] was issued from the exception log, so no entries would be expected to read from L2 at [100-103].
> The problem of this issue is that we used 0 and 1 for the start of message id and ledger id even we had some consumed ledgers deleted.
> {code}
> void processTopicLedgerRanges(final LedgerRanges ranges, final Version version) {
> Iterator<LedgerRange> lrIterator = ranges.getRangesList().iterator();
> TopicInfo topicInfo = new TopicInfo();
> long startOfLedger = 1;
> while (lrIterator.hasNext()) {
> LedgerRange range = lrIterator.next();
> if (range.hasEndSeqIdIncluded()) {
> // this means it was a valid and completely closed ledger
> long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
> topicInfo.ledgerRanges.put(endOfLedger, new InMemoryLedgerRange(range, startOfLedger)); startOfLedger = endOfLedger + 1;
> continue;
> }
> // If it doesn't have a valid end, it must be the last ledger
> if (lrIterator.hasNext()) {
> String msg = "Ledger-id: " + range.getLedgerId() + " for topic: " + topic. toStringUtf8() + " is not the last one but still does not have an end seq-id";
> logger.error(msg);
> cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
> return; }
> // The last ledger does not have a valid seq-id, lets try to
> // find it out
> recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(), version, topicInfo);
> return;
> }
> {code}
> {code}
> long prevLedgerEnd = topicInfo.ledgerRanges.isEmpty() ? 0 : topicInfo. ledgerRanges
> .lastKey();
> LedgerRange lr = LedgerRange.newBuilder().setLedgerId(ledgerId)
> .setEndSeqIdIncluded(lastMessage.getMsgId()).build();
> topicInfo.ledgerRanges.put(lr.getEndSeqIdIncluded().getLocalComponent(),
> new InMemoryLedgerRange(lr, prevLedgerEnd + 1, lh));
> {code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira