You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/05/09 04:58:12 UTC

[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

    [ https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15275946#comment-15275946 ] 

ASF GitHub Bot commented on STORM-822:
--------------------------------------

Github user jianbzhou commented on the pull request:

    https://github.com/apache/storm/pull/1131#issuecomment-217776015
  
    Hi Hmcl,
    
    During our testing we found sometime the poll method was not called for long time, I suspect it is caused by below condition:
    
    private boolean poll() {
        return !waitingToEmit() && **numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets()**;
    }
    
    I found numUncommittedOffsets will be incremented in either of the below situation:
    1.	(!retryService.isScheduled(msgId) – this is the most common situation – one message was polled and it is not in the toRetryMsg – it is a normal emit instead of a retry;
    2.	retryService.isReady(msgId) – this means the message was emitted previously – now will be re-emitted as per the retry logic.
    
    As per below logic – for one message 50, in the first time it was polled and emiited, numUncommittedOffsets will be incremented by 1, then this message failed and retried for 10 times, so totally numUncommittedOffsets will be increamented by 11.
    
    
    private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
        ………………………………
        else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
            final List<Object> tuple = tuplesBuilder.buildTuple(record);
            kafkaSpoutStreams.emit(collector, tuple, msgId);
            emitted.add(msgId);
            **numUncommittedOffsets++;**
        ………………………………
    }
    
    But as per below logic – after successful commit, numUncommittedOffsets will subtract the actual number of message that got commited. If it commit one message 50, then will only substract 1 instead of 11.
    
    public void commit(OffsetAndMetadata committedOffset) {
        ………………………………
            **numUncommittedOffsets-= numCommittedOffsets;**
        ………………………………
    }
    
    Under some circumstances – say a rebalance happened and we seek back to a very small/early offset, seems this would cause emitTupleIfNotEmitted have a quite big number – finally this will be greater than kafkaSpoutConfig.getMaxUncommittedOffsets, and got poll() method not be called.
    
    
    I am not sure if I corrrectly understand your code or miss anything – could you please kindly help confirm if above situtaion is possible or not?
    
    Please feel free to let me know if you need any further info and thanks for your help.



> Kafka Spout New Consumer API
> ----------------------------
>
>                 Key: STORM-822
>                 URL: https://issues.apache.org/jira/browse/STORM-822
>             Project: Apache Storm
>          Issue Type: Story
>          Components: storm-kafka
>            Reporter: Thomas Becker
>            Assignee: Hugo Louro
>             Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)