You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jason Gustafson (JIRA)" <ji...@apache.org> on 2016/04/01 20:20:25 UTC

[jira] [Commented] (KAFKA-3491) Issue with consumer close() in finally block with 'enable.auto.commit=true'

    [ https://issues.apache.org/jira/browse/KAFKA-3491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15222102#comment-15222102 ] 

Jason Gustafson commented on KAFKA-3491:
----------------------------------------

Good catch. The best workaround at the moment would probably be to call unsubscribe() prior to closing in the exception handler:

{code}
 public void run() {
    try {
      consumer.subscribe(topics);

      while (true) {
        ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
        records.forEach(record -> process(record));
      }
    } catch (WakeupException e) {
      // ignore, we're closing
    } catch (Exception e) {
      log.error("Unexpected error", e);
      consumer.unsubscribe();
    } finally {
      consumer.close();
    }
  }
{code}

This will cause the consumer to abandon its current positions, which prevents automatic commit in close(). I think we can either update the documentation to use that pattern or we can try to fix the underlying behavior. One option would be to turn off the commit on close() behavior entirely. This would be unfortunate since we'd have duplicates on virtually every close(), but it doesn't require any change to the API and we only try to provide "at least once" delivery anyway. In that case, we may as well start officially discouraging automatic commit in the documentation. We could also add a new close() method with a flag to indicate whether it is clean or not, but I think this would be even uglier than the unsubscribe pattern above. 

[~guozhang] [~ewencp] Thoughts?

> Issue with consumer close() in finally block with 'enable.auto.commit=true'
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-3491
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3491
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.0, 0.9.0.1
>            Reporter: dan norwood
>            Assignee: Jason Gustafson
>            Priority: Minor
>
> imagine you have a run loop that looks like the following:
> {code:java}
>   public void run() {
>     try {
>       consumer.subscribe(topics);
>       while (true) {
>         ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
>         records.forEach(record -> process(record));
>       }
>     } catch (WakeupException e) {
>       // ignore, we're closing
>     } catch (Exception e) {
>       log.error("Unexpected error", e);
>     } finally {
>       consumer.close();
>     }
>   }
> {code}
> if you run this with 'enable.auto.commit=true' and throw an exception in the 'process()' method you will still try to commit all the read, but unprocessed, offsets in the most recent batch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)