You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ragini Manjaiah <ra...@gmail.com> on 2021/09/17 11:40:10 UTC

Exception by flink kafka

HI,
In what scenarios we hit with *java.lang.OutOfMemoryError: Java heap space
while publishing to kafka . I hit with this exception and a resolution
added property *.setProperty("security.protocol","SSL");in the flink
application.

Later I started encountering org.apache.kafka.common.errors.TimeoutException:
Failed to update metadata after 60000 ms.

The flink applications consume data from topic and processes into 3 kafka
topics. and some one throws some insights on this.


I face this expectation intermittently and the jobs terminates.

I am using FlinkKafkaProducer010 with these properties set

producerConfig.setProperty(COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.
name);
producerConfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
producerConfig.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "300000
");
producerConfig.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "2000000"
);
producerConfig.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"52428800");
producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "900");
producerConfig.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "524288000"
);
producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "190000"
);
producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "0");
producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "2147483647");
producerConfig.setProperty("security.protocol","SSL");

Re: Exception by flink kafka

Posted by Nicolaus Weidner <ni...@ververica.com>.
Hi Ragini,

On Fri, Sep 17, 2021 at 1:40 PM Ragini Manjaiah <ra...@gmail.com>
wrote:

> Later I started encountering org.apache.kafka.common.errors.TimeoutException:
> Failed to update metadata after 60000 ms.
>

This message can have several causes. There may be network issues, your
Kafka configuration might be broken, or the broker could be overloaded, for
example. Did you see any messages reach the destination topic(s) or are
there none at all? See e.g. [1] or [2], where people discussed various
possible causes.


> I face this expectation intermittently and the jobs terminates.
>
> I am using FlinkKafkaProducer010 with these properties set
>
> producerConfig.setProperty(COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.
> name);
> producerConfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> bootstrapAddress);
> producerConfig.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "
> 300000");
> producerConfig.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "
> 2000000");
> producerConfig.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"52428800");
> producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "900");
> producerConfig.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "524288000
> ");
> producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "190000
> ");
> producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "0");
> producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "2147483647");
> producerConfig.setProperty("security.protocol","SSL");
>

I am no expert in Kafka configuration, but I notice that you have a very
high  RETRY_BACKOFF_MS_CONFIG (significantly higher than MAX_BLOCK_MS) and
very large BATCH_SIZE_CONFIG. You could try decreasing BATCH_SIZE_CONFIG to
be smaller than MAX_REQUEST_SIZE_CONFIG in particular.
I don't see why this would cause a OOM though. If your Kafka Producer does
not have a lot of heap space, you should try increasing that.

Hope some of this helps!
Nico

[1] https://github.com/dpkp/kafka-python/issues/607
[2]
https://stackoverflow.com/questions/54780605/guidelines-to-handle-timeout-exception-for-kafka-producer