You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Julius Michaelis (Jira)" <ji...@apache.org> on 2020/07/06 01:24:00 UTC

[jira] [Comment Edited] (FLINK-18150) A single failing Kafka broker may cause jobs to fail indefinitely with TimeoutException: Timeout expired while fetching topic metadata

    [ https://issues.apache.org/jira/browse/FLINK-18150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17144768#comment-17144768 ] 

Julius Michaelis edited comment on FLINK-18150 at 7/6/20, 1:23 AM:
-------------------------------------------------------------------

My current guess is that this relates to the way that the Kafka client [tries to pick a broker|https://github.com/apache/kafka/blob/2.2.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L643] for retrieving the metadata from:
{{request.timeout.ms}} defaults to 30000 but {{default.api.timeout.ms}} and {{max.block.ms}} default to 60000. Hence, only two attempts are made. However, if retrieving metadata from one broker failed, that broker may be retired, leading to at least a few subtasks attempting to get metadata from a dead broker twice, failing the task.

A log of the situation, for reference:
{code:none}
[Producer clientId=producer-202] Removing node kafka1:9091 (id: -1 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 0
[Producer clientId=producer-202] Found least loaded node kafka2:9091 (id: -2 rack: null)
[Producer clientId=producer-480] Removing node kafka1:9091 (id: -1 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 0
[Producer clientId=producer-480] Found least loaded node kafka2:9091 (id: -2 rack: null)
[Producer clientId=producer-158] Removing node kafka1:9091 (id: -1 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 0
[Producer clientId=producer-158] Found least loaded node kafka2:9091 (id: -2 rack: null)
[Producer clientId=producer-285] Removing node kafka2:9091 (id: -2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 0
[Producer clientId=producer-285] Found least loaded node kafka1:9091 (id: -1 rack: null)
[Producer clientId=producer-321] Removing node kafka2:9091 (id: -2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 0
[Producer clientId=producer-321] Found least loaded node kafka1:9091 (id: -1 rack: null)
[Producer clientId=producer-477] Removing node kafka1:9091 (id: -1 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 0
[Producer clientId=producer-477] Found least loaded node kafka2:9091 (id: -2 rack: null)
{code}

I've played around a bit, and setting
{code:yaml}
session.timeout.ms: 5000
request.timeout.ms: 10000
default.api.timeout.ms: 180000
{code}
on the consumer and
{code:yaml}
max.block.ms: 181000
{code}
on the producer seem to make the problem go away. But I'm puzzled as to why.





was (Author: caesar):
My current guess is that this relates to the way that the Kafka client [tries to pick a broker|https://github.com/apache/kafka/blob/2.2.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L643] for retrieving the metadata from:
{{request.timeout.ms}} defaults to 30000 but {{default.api.timeout.ms}} and {{max.block.ms}} default to 60000. Hence, only two attempts are made. However, if retrieving metadata from one broker failed, that broker may be retired, leading to at least a few subtasks attempting to get metadata from a dead broker twice, failing the task.

A log of the situation, for reference:
{code:none}
[Producer clientId=producer-202] Removing node kafka1:9091 (id: -1 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 0
[Producer clientId=producer-202] Found least loaded node kafka2:9091 (id: -2 rack: null)
[Producer clientId=producer-480] Removing node kafka1:9091 (id: -1 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 0
[Producer clientId=producer-480] Found least loaded node kafka2:9091 (id: -2 rack: null)
[Producer clientId=producer-158] Removing node kafka1:9091 (id: -1 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 0
[Producer clientId=producer-158] Found least loaded node kafka2:9091 (id: -2 rack: null)
[Producer clientId=producer-285] Removing node kafka2:9091 (id: -2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 0
[Producer clientId=producer-285] Found least loaded node kafka1:9091 (id: -1 rack: null)
[Producer clientId=producer-321] Removing node kafka2:9091 (id: -2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 0
[Producer clientId=producer-321] Found least loaded node kafka1:9091 (id: -1 rack: null)
[Producer clientId=producer-477] Removing node kafka1:9091 (id: -1 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 0
[Producer clientId=producer-477] Found least loaded node kafka2:9091 (id: -2 rack: null)
{code}

I've played around a bit, and setting
{code:yaml}
session.timeout.ms: 5000
request.timeout.ms: 10000
default.api.timeout.ms: 180000
{code}
on the consumer and
{code:yaml}
-default.api.timeout.ms: 180000- # nvm, this is a consumer only option
max.block.ms: 181000
{code}
on the producer seem to make the problem go away. But I'm puzzled as to why.




> A single failing Kafka broker may cause jobs to fail indefinitely with TimeoutException: Timeout expired while fetching topic metadata
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18150
>                 URL: https://issues.apache.org/jira/browse/FLINK-18150
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.1
>         Environment: It is a bit unclear to me under what circumstances this can be reproduced. I created a "minimum" non-working example at https://github.com/jcaesar/flink-kafka-ha-failure. Note that this deploys the minimum number of Kafka brokers, but it works just as well with replication factor 3 and 8 brokers, e.g.
> I run this with
> {code:bash}
> docker-compose kill; and docker-compose rm -vf; and docker-compose up --abort-on-container-exit --build
> {code}
> The exception should appear on the webui after 5~6 minutes.
> To make sure that this isn't dependent on my machine, I've also checked reproducibility on a m5a.2xlarge EC2 instance.
> You verify that the Kafka cluster is running "normally" e.g. with:
> {code:bash}
> kafkacat -b localhost,localhost:9093 -L
> {code}
> So far, I only know that
> * {{flink.partition-discovery.interval-millis}} must be set.
> * The broker that failed must be part of the {{bootstrap.servers}}
> * There needs to be a certain amount of topics or producers, but I'm unsure which is crucial
> * Changing the values of {{metadata.request.timeout.ms}} or {{flink.partition-discovery.interval-millis}} does not seem to have any effect.
>            Reporter: Julius Michaelis
>            Priority: Major
>
> When a Kafka broker fails that is listed among the bootstrap servers and partition discovery is active, the Flink job reading from that Kafka may enter a failing loop.
> At first, the job seems to react normally without failure with only a short latency spike when switching Kafka leaders.
> Then, it fails with a
> {code:none}
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>         at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
>         at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:821)
>         at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
>         at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
>         at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> It recovers, but processes fewer than the expected amount of records.
> Finally,  the job fails with
> {code:none}
> 2020-06-05 13:59:37
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
> {code}
> and repeats doing so while not processing any records. (The exception comes without any backtrace or otherwise interesting information)
> I have also observed this behavior with partition-discovery turned off, but only when the Flink job failed (after a broker failure) and had to run checkpoint recovery for some other reason.
> Please see the [Environment] description for information on how to reproduce the issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)