You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bookkeeper.apache.org by "Jiannan Wang (JIRA)" <ji...@apache.org> on 2012/12/03 13:25:58 UTC

[jira] [Updated] (BOOKKEEPER-442) Failed to deliver messages due to inconsistency between SubscriptionState and LedgerRanges.

     [ https://issues.apache.org/jira/browse/BOOKKEEPER-442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jiannan Wang updated BOOKKEEPER-442:
------------------------------------

    Attachment: BOOKKEEPER-442.diff

Update patch by Ivan's suggestion, also update it in review board.
                
> Failed to deliver messages due to inconsistency between SubscriptionState and LedgerRanges.
> -------------------------------------------------------------------------------------------
>
>                 Key: BOOKKEEPER-442
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-442
>             Project: Bookkeeper
>          Issue Type: Bug
>          Components: hedwig-server
>    Affects Versions: 4.1.0, 4.2.0
>            Reporter: Sijie Guo
>            Assignee: Jiannan Wang
>            Priority: Critical
>             Fix For: 4.2.0
>
>         Attachments: BOOKKEEPER-442.diff, BOOKKEEPER-442.diff, BOOKKEEPER-442.diff
>
>
> The problems encountered when failed to updateSubscriptionState but deleted consumed ledgers. 
> The issue is described as below:
> 1) A subscriber setLastConsumeSeqId to move consume ptr. If the consume ptr is moved over consume interval, an update subscription state operation is issued to update to ZooKeeper.
> {code}
> AbstractSubscriptionManager:
>             
>             if (subState.setLastConsumeSeqId(consumeSeqId, cfg.getConsumeInterval())) {                updateSubscriptionState(topic, subscriberId, subState, cb, ctx);
>             }
> {code}
> 2) when move consume ptr, it also changed in-memory subscription state before the subscription state is persisted to ZooKeeper.
> {code}
>     public boolean setLastConsumeSeqId(MessageSeqId lastConsumeSeqId, int consumeInterval) {
>         long interval = lastConsumeSeqId.getLocalComponent() - subscriptionState.getMsgId().          getLocalComponent();
>         if (interval <= 0) {
>             return false;
>         }
>         // set consume seq id when it is larger
>         this.lastConsumeSeqId = lastConsumeSeqId;
>         if (interval < consumeInterval) {
>             return false;
>         }
>         // subscription state will be updated, marked it as clean
>         subscriptionState = SubscriptionState.newBuilder(subscriptionState).                          setMsgId(lastConsumeSeqId).build();
>         return true;
>     }
> {code}
> 3) MessageConsumedTask runs periodically to delete consumed ledgers. it would use in-memory subscription state to perform such deletion. so if ledger is deleted first and failed to update subscription state. it would cause inconsistent state, when hub restarts and subscriber reconnects, it would use old seq id to start delivering but the ledger has messages with old seq id has been deleted.
> {code}
> for (InMemorySubscriptionState curSubscription : topicSubscriptions.values()) {
>                     if (curSubscription.getSubscriptionState().getMsgId().getLocalComponent() <       minConsumedMessage)
>                         minConsumedMessage = curSubscription.getSubscriptionState().getMsgId().       getLocalComponent();
>                     hasBound = hasBound && curSubscription.getSubscriptionPreferences().              hasMessageBound();
>                 }
> {code} 
> The fix would be let message consume task only use persistence state to performance deletions only. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira