You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by jianbzhou <gi...@git.apache.org> on 2016/05/09 04:57:15 UTC

[GitHub] storm pull request: STORM-822: Kafka Spout New Consumer API

Github user jianbzhou commented on the pull request:

    https://github.com/apache/storm/pull/1131#issuecomment-217776015
  
    Hi Hmcl,
    
    During our testing we found sometime the poll method was not called for long time, I suspect it is caused by below condition:
    
    private boolean poll() {
        return !waitingToEmit() && **numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets()**;
    }
    
    I found numUncommittedOffsets will be incremented in either of the below situation:
    1.	(!retryService.isScheduled(msgId) \u2013 this is the most common situation \u2013 one message was polled and it is not in the toRetryMsg \u2013 it is a normal emit instead of a retry;
    2.	retryService.isReady(msgId) \u2013 this means the message was emitted previously \u2013 now will be re-emitted as per the retry logic.
    
    As per below logic \u2013 for one message 50, in the first time it was polled and emiited, numUncommittedOffsets will be incremented by 1, then this message failed and retried for 10 times, so totally numUncommittedOffsets will be increamented by 11.
    
    
    private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
        \u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026
        else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
            final List<Object> tuple = tuplesBuilder.buildTuple(record);
            kafkaSpoutStreams.emit(collector, tuple, msgId);
            emitted.add(msgId);
            **numUncommittedOffsets++;**
        \u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026
    }
    
    But as per below logic \u2013 after successful commit, numUncommittedOffsets will subtract the actual number of message that got commited. If it commit one message 50, then will only substract 1 instead of 11.
    
    public void commit(OffsetAndMetadata committedOffset) {
        \u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026
            **numUncommittedOffsets-= numCommittedOffsets;**
        \u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026\u2026
    }
    
    Under some circumstances \u2013 say a rebalance happened and we seek back to a very small/early offset, seems this would cause emitTupleIfNotEmitted have a quite big number \u2013 finally this will be greater than kafkaSpoutConfig.getMaxUncommittedOffsets, and got poll() method not be called.
    
    
    I am not sure if I corrrectly understand your code or miss anything \u2013 could you please kindly help confirm if above situtaion is possible or not?
    
    Please feel free to let me know if you need any further info and thanks for your help.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---