You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jiangjie Qin (JIRA)" <ji...@apache.org> on 2019/07/12 02:38:00 UTC

[jira] [Comment Edited] (FLINK-11792) Make KafkaConsumer more resilient to Kafka Broker Failures

    [ https://issues.apache.org/jira/browse/FLINK-11792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16883478#comment-16883478 ] 

Jiangjie Qin edited comment on FLINK-11792 at 7/12/19 2:37 AM:
---------------------------------------------------------------

[~knaufk] Thanks for the explanation. There might be some misunderstanding on this.

The protocol_partitioning parts are mostly for Kafka client developers. For Kafka there are a few [3rd party clients libraries|[https://cwiki.apache.org/confluence/display/KAFKA/Clients]] out there. These libraries talk to Kafka brokers according to these the RPC protocols defined by Kafka, so they need to know how to handle things like leader migration. In our case, we are using the official Kafka java clients ({{KafkaConsumer}} and {{KafkaProducer}}), so everything described in the protocol guide has been handled by the Kafka java clients already.

 

{quote}

`KafkaConsumer:assign`, which we use, also states that " As such, there will be no rebalance operation triggered when group membership or cluster and topic metadata change."

{quote}

 

The "topic metadata change" and _rebalance_ here actually means something orthogonal. In Kafka the consumers may belong to a consumer group, and the consumers in the same consumer group may be consuming from some topics together in a coordinated way. The most important part of such coordination is to decide which partitions should each consumer in the same consumer group to consume from, a.k.a _partition assignment_. The partition assignment may change when the members in the consumer group changes (e.g. new consumer joins, existing consumer dies, etc) or when "topic metadata changes" (e.g. increasing the number of partitions of a topic, creating a new topic that matches a pattern). In those case, the partition assignment should be changed accordingly. Such partition assignment change is called a _rebalance_.

Leader migrations won't trigger any partition reassignment. It only means that the consumer who is assigned the partition needs to fetch data from another location. But the partition itself is still assigned to that consumer. KafkaConsumer internally handles such leader migration by refreshing metadata and resending the FetchRequest to the new leader. This process is transparent to the users.

 


was (Author: becket_qin):
[~knaufk] Thanks for the explanation. There might be some misunderstanding on this.

The protocol_partitioning parts are mostly for Kafka client developers. For Kafka there are a few [3rd party clients libraries|[https://cwiki.apache.org/confluence/display/KAFKA/Clients]] out there. These libraries talk to Kafka brokers according to these the RPC protocols defined by Kafka, so they need to know how to handle things like leader migration. In our case, we are using the official Kafka java clients (\{{KafkaConsumer}} and \{{KafkaProducer}}), so everything described in the protocol guide has been handled by the Kafka java clients already.

{quote}`KafkaConsumer:assign`, which we use, also states that " As such, there will be no rebalance operation triggered when group membership or cluster and topic metadata change."\{quote}

The "topic metadata change" and _rebalance_ here actually means something orthogonal. In Kafka the consumers may belong to a consumer group, and the consumers in the same consumer group may be consuming from some topics together in a coordinated way. The most important part of such coordination is to decide which partitions should each consumer in the same consumer group to consume from, a.k.a _partition assignment_. The partition assignment may change when the members in the consumer group changes (e.g. new consumer joins, existing consumer dies, etc) or when "topic metadata changes" (e.g. increasing the number of partitions of a topic, creating a new topic that matches a pattern). In those case, the partition assignment should be changed accordingly. Such partition assignment change is called a _rebalance_.

Leader migrations won't trigger any partition reassignment. It only means that the consumer who is assigned the partition needs to fetch data from another location. But the partition itself is still assigned to that consumer. KafkaConsumer internally handles such leader migration by refreshing metadata and resending the FetchRequest to the new leader. This process is transparent to the users.

 

> Make KafkaConsumer more resilient to Kafka Broker Failures 
> -----------------------------------------------------------
>
>                 Key: FLINK-11792
>                 URL: https://issues.apache.org/jira/browse/FLINK-11792
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.7.2
>            Reporter: Konstantin Knauf
>            Priority: Major
>
> When consuming from a topic with replication factor > 1, the FlinkKafkaConsumer could continue reading from this topic, when a single broker fails, by "simply" switching to the new leader `s for all lost partitions after Kafka failover. Currently, the KafkaConsumer will most likely throw in exception as topic metadata is only periodically fetched from the Kafka cluster.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)