You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mason Chen <ma...@gmail.com> on 2021/09/22 17:57:56 UTC

Kafka Partition Discovery

Hi all,

We are sometimes facing a connection issue with Kafka when a broker restarts

```
java.lang.RuntimeException:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while
fetching topic metadata
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:846)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:828)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired
while fetching topic metadata
```

Can a retry be added to the partition discovery mechanism?

Best,
Mason

Re: Kafka Partition Discovery

Posted by Roman Khachatryan <ro...@apache.org>.
Hi,

It seems like a useful feature, but it's probably better to have it in
the Kafka consumer. There is a related KIP in progress:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients

I'd like to pull Arvid into the discussion as he might be better
familiar with the subject.

Regards,
Roman


On Wed, Sep 22, 2021 at 7:58 PM Mason Chen <ma...@gmail.com> wrote:
>
> Hi all,
>
> We are sometimes facing a connection issue with Kafka when a broker restarts
>
> ```
> java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:846)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:828)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
> ```
>
> Can a retry be added to the partition discovery mechanism?
>
> Best,
> Mason