You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jérémy Albrecht <ja...@skapane.ai> on 2021/12/07 15:31:23 UTC
Customize Kafka client (module.yaml)
Hi All,
I encounter a blocking problem linked to exchanging messages between Stateful functions.
The context is: I am sending a very large payload from a Stateful Function to a Kafka topic. I am blocked by the Kafka client (I think) because here is the output of the statefun-manager container:
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 6660172 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
Now if I take a look at the documentation (https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/io/apache-kafka/) they refer to the Confluent doc to customize the configuration of the Kafka client. It is unclear on how to implement this into the module.yaml file. I tried several ways:
ingresses:
- ingress:
meta:
type: io.statefun.kafka/ingress
id: project.A/input
spec:
max:
request:
size: 104857600
max.request.size: 110000000
message:
max:
bytes: 104857600
address: kafka:9092
consumerGroupId: my-consumer-group
startupPosition:
type: earliest
topics:
- topic: entry # used for retrop-compatibility, to be removed in next release
valueType: project.A/Message
targets:
- project.redacted/Entry
None of the above solutions seems to be working.
Does anyone have the ability to clarify what I am not doing correctly ?
Thanks in advance,
Jérémy
Re: Customize Kafka client (module.yaml)
Posted by Jérémy Albrecht <ja...@skapane.ai>.
Hi Robert,
Thanks for your answer.
Indeed, you were right. The properties attribute have to be specified and then it is the non-nested variant. In fact, it is documented for the Egress but not the Ingress but the same behaviour applies.
Have a great day,
Jérémy
________________________________
From: Robert Metzger <me...@gmail.com>
Sent: Wednesday, December 8, 2021 8:04 AM
To: Jérémy Albrecht <ja...@skapane.ai>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: Customize Kafka client (module.yaml)
Hi Jérémy,
In my understanding of the StateFun docs, you need to pass custom properties using "ingress.spec.properties".
For example:
ingresses:
- ingress:
meta:
type: io.statefun.kafka/ingress
id: project.A/input
spec:
properties:
max.request.size: 110000000
(or the nested variant?)
On Tue, Dec 7, 2021 at 4:31 PM Jérémy Albrecht <ja...@skapane.ai>> wrote:
Hi All,
I encounter a blocking problem linked to exchanging messages between Stateful functions.
The context is: I am sending a very large payload from a Stateful Function to a Kafka topic. I am blocked by the Kafka client (I think) because here is the output of the statefun-manager container:
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 6660172 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
Now if I take a look at the documentation (https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/io/apache-kafka/) they refer to the Confluent doc to customize the configuration of the Kafka client. It is unclear on how to implement this into the module.yaml file. I tried several ways:
ingresses:
- ingress:
meta:
type: io.statefun.kafka/ingress
id: project.A/input
spec:
max:
request:
size: 104857600
max.request.size: 110000000
message:
max:
bytes: 104857600
address: kafka:9092
consumerGroupId: my-consumer-group
startupPosition:
type: earliest
topics:
- topic: entry # used for retrop-compatibility, to be removed in next release
valueType: project.A/Message
targets:
- project.redacted/Entry
None of the above solutions seems to be working.
Does anyone have the ability to clarify what I am not doing correctly ?
Thanks in advance,
Jérémy
Re: Customize Kafka client (module.yaml)
Posted by Robert Metzger <me...@gmail.com>.
Hi Jérémy,
In my understanding of the StateFun docs, you need to pass custom
properties using "ingress.spec.properties".
For example:
ingresses:
- ingress:
meta:
type: io.statefun.kafka/ingress
id: project.A/input
spec:
properties:
max.request.size: 110000000
(or the nested variant?)
On Tue, Dec 7, 2021 at 4:31 PM Jérémy Albrecht <ja...@skapane.ai> wrote:
> Hi All,
>
> I encounter a blocking problem linked to exchanging messages between
> Stateful functions.
> The context is: I am sending a very large payload from a Stateful Function
> to a Kafka topic. I am blocked by the Kafka client (I think) because here
> is the output of the statefun-manager container:
> Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The
> message is 6660172 bytes when serialized which is larger than the maximum
> request size you have configured with the max.request.size configuration.
>
> Now if I take a look at the documentation (
> https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/io/apache-kafka/) they
> refer to the Confluent doc to customize the configuration of the Kafka
> client. It is unclear on how to implement this into the module.yaml file. I
> tried several ways:
>
> ingresses:
> - ingress:
> meta:
> type: io.statefun.kafka/ingress
> id: project.A/input
> spec:
> max:
> request:
> size: 104857600
> max.request.size: 110000000
> message:
> max:
> bytes: 104857600
> address: kafka:9092
> consumerGroupId: my-consumer-group
> startupPosition:
> type: earliest
> topics:
> - topic: entry # used for retrop-compatibility, to be removed in next release
> valueType: project.A/Message
> targets:
> - project.redacted/Entry
>
>
> None of the above solutions seems to be working.
> Does anyone have the ability to clarify what I am not doing correctly ?
>
> Thanks in advance,
> Jérémy
>