You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2017/07/18 16:24:00 UTC

[jira] [Reopened] (KAFKA-5301) Improve exception handling on consumer path

     [ https://issues.apache.org/jira/browse/KAFKA-5301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Guozhang Wang reopened KAFKA-5301:
----------------------------------

[~enothereska] I think we should not resolve this JIRA in a hurry. Have you made a thorough pass over the consumer code path and confirmed that {{the rest are OK}}?

For example, one obvious pitfall I can observe is the {{rebalanceException}} used in the {{StreamThread}}: we throw the exception in the {{onPartitionsRevoked}} and {{onPartitionsAssigned}} and at the same time remember that exception in this variable, the thrown exception from the callback will be swallowed by the {{ConsumerCoordinator}} and logged as an error, while we will later on rethrow the exception again. I can see two issues here:

1) throw the exception twice, with the first thrown exception only causing a error log4j entry is redundant. If we will anyways rethrow the exception after the rebalance, we may consider not throwing it anymore inside the callbacks.

2) when we throw the exception in the {{..Revoked}} callback, we are effectively leaving the assignor in an unstable state such that the suspended tasks / prev tasks etc are not set correctly, however we will still call {{..Assigned}} later which may be problematic; should we consider skipping the later callback if an exception has already been thrown, or should we cleanup the cached maps while throwing the exception?

> Improve exception handling on consumer path
> -------------------------------------------
>
>                 Key: KAFKA-5301
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5301
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>    Affects Versions: 0.11.0.0
>            Reporter: Eno Thereska
>            Assignee: Eno Thereska
>             Fix For: 0.11.1.0
>
>
> Used in StreamsThread.java, mostly to .poll() but also to restore data.
> Used in StreamsTask.java, mostly to .pause(), .resume()
> All exceptions here are currently caught all the way up to the main running loop in a broad catch(Exception e) statement in StreamThread.run().
> One main concern on the consumer path is handling deserialization errors that happen before streams has even had a chance to look at the data: https://issues.apache.org/jira/browse/KAFKA-5157  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)