You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2018/07/14 21:00:00 UTC

[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

    [ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544335#comment-16544335 ] 

Guozhang Wang commented on KAFKA-6520:
--------------------------------------

Here is my current analysis on this issue:

When {{KafkaConsumer.poll}} is called, in which {{updateAssignmentMetadataIfNeeded}} is called and {{coordinator.poll()}} is called. If the broker(s) are not available then the last call will always return false after timeout, and hence {{KafkaConsumer.poll}} will return empty set without any indicator that brokers are disconnect. In fact, if the brokers are unavailable at the very beginning when streams application starts, we will hit the same issue as the instance will also be in the {{RUNNING}} state since the onPartitionRevoked would not be called ever.

So I think if we want to improve on this situation it should be on the consumer client, not the streams client. I.e. today if you use a KafkaConsumer to fetch data directly, and the brokers become unavailable, you will observe the same scenario. On the other hand, it is by-design of the consumer to not let users worry about the connectivity of the broker, or to handle any disconnect issues. Here is my proposed options:

1. Expose metrics from Consumer#NetworkClient on the {{ClusterConnectionStates}}. And then let users to watch on this metrics to watch out for dis-connectivity issues.
2. We can add a new State, say named {{IDLE}}, to StreamThread, that if {{consumer.poll}} returns nothing, and in the same iteration, restore consumer also returns nothing, transit to {{IDLE}}. So only {{RUNNING}} can be transited to {{IDLE}} and {{IDLE}} can only be transited to {{RUNNING}} (if the next poll call returns some data) or {{PARTITION_REVOKED}} (if the next poll call triggers rebalance, and the callback is triggered). For global thread it is simpler, that if one poll call returns nothing transit to {{IDLE}}. And also the KafkaStreams can add a new {{IDLE}} as well, which will be transited from {{RUNNING}} if all its thread (including global thread) transit to {{IDLE}}.

Thoughts?

> When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-6520
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6520
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Michael Kohout
>            Assignee: Milind Jain
>            Priority: Major
>              Labels: newbie, user-experience
>
> When you execute the following scenario the application is always in RUNNING state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer connected(Stream State is still RUNNING, and the uncaught exception handler isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] for a discussion from the google user forum.  [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)