You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jérémy Albrecht <ja...@skapane.ai> on 2021/12/07 15:31:23 UTC

Customize Kafka client (module.yaml)

Hi All,

I encounter a blocking problem linked to exchanging messages between Stateful functions.
The context is: I am sending a very large payload from a Stateful Function to a Kafka topic. I am blocked by the Kafka client (I think) because here is the output of the statefun-manager container:
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 6660172 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

Now if I take a look at the documentation (https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/io/apache-kafka/) they refer to the Confluent doc to customize the configuration of the Kafka client. It is unclear on how to implement this into the module.yaml file. I tried several ways:

ingresses:
  - ingress:
      meta:
        type: io.statefun.kafka/ingress
        id: project.A/input
      spec:
        max:
          request:
            size: 104857600
        max.request.size: 110000000
        message:
          max:
            bytes: 104857600
        address: kafka:9092
        consumerGroupId: my-consumer-group
        startupPosition:
          type: earliest
        topics:
          - topic: entry # used for retrop-compatibility, to be removed in next release
            valueType: project.A/Message
            targets:
              - project.redacted/Entry

None of the above solutions seems to be working.
Does anyone have the ability to clarify what I am not doing correctly ?

Thanks in advance,
Jérémy

Re: Customize Kafka client (module.yaml)

Posted by Jérémy Albrecht <ja...@skapane.ai>.
Hi Robert,

Thanks for your answer.

Indeed, you were right. The properties attribute have to be specified and then it is the non-nested variant. In fact, it is documented for the Egress but not the Ingress but the same behaviour applies.

Have a great day,
Jérémy
________________________________
From: Robert Metzger <me...@gmail.com>
Sent: Wednesday, December 8, 2021 8:04 AM
To: Jérémy Albrecht <ja...@skapane.ai>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: Customize Kafka client (module.yaml)

Hi Jérémy,

In my understanding of the StateFun docs, you need to pass custom properties using "ingress.spec.properties".
For example:

ingresses:
  - ingress:
      meta:
        type: io.statefun.kafka/ingress
        id: project.A/input
      spec:
        properties:

          max.request.size: 110000000

(or the nested variant?)



On Tue, Dec 7, 2021 at 4:31 PM Jérémy Albrecht <ja...@skapane.ai>> wrote:
Hi All,

I encounter a blocking problem linked to exchanging messages between Stateful functions.
The context is: I am sending a very large payload from a Stateful Function to a Kafka topic. I am blocked by the Kafka client (I think) because here is the output of the statefun-manager container:
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 6660172 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

Now if I take a look at the documentation (https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/io/apache-kafka/) they refer to the Confluent doc to customize the configuration of the Kafka client. It is unclear on how to implement this into the module.yaml file. I tried several ways:

ingresses:
  - ingress:
      meta:
        type: io.statefun.kafka/ingress
        id: project.A/input
      spec:
        max:
          request:
            size: 104857600
        max.request.size: 110000000
        message:
          max:
            bytes: 104857600
        address: kafka:9092
        consumerGroupId: my-consumer-group
        startupPosition:
          type: earliest
        topics:
          - topic: entry # used for retrop-compatibility, to be removed in next release
            valueType: project.A/Message
            targets:
              - project.redacted/Entry

None of the above solutions seems to be working.
Does anyone have the ability to clarify what I am not doing correctly ?

Thanks in advance,
Jérémy

Re: Customize Kafka client (module.yaml)

Posted by Robert Metzger <me...@gmail.com>.
Hi Jérémy,

In my understanding of the StateFun docs, you need to pass custom
properties using "ingress.spec.properties".
For example:

ingresses:
  - ingress:
      meta:
        type: io.statefun.kafka/ingress
        id: project.A/input
      spec:
        properties:

          max.request.size: 110000000

(or the nested variant?)



On Tue, Dec 7, 2021 at 4:31 PM Jérémy Albrecht <ja...@skapane.ai> wrote:

> Hi All,
>
> I encounter a blocking problem linked to exchanging messages between
> Stateful functions.
> The context is: I am sending a very large payload from a Stateful Function
> to a Kafka topic. I am blocked by the Kafka client (I think) because here
> is the output of the statefun-manager container:
> Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The
> message is 6660172 bytes when serialized which is larger than the maximum
> request size you have configured with the max.request.size configuration.
>
> Now if I take a look at the documentation (
> https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/io/apache-kafka/) they
> refer to the Confluent doc to customize the configuration of the Kafka
> client. It is unclear on how to implement this into the module.yaml file. I
> tried several ways:
>
> ingresses:
>   - ingress:
>       meta:
>         type: io.statefun.kafka/ingress
>         id: project.A/input
>       spec:
>         max:
>           request:
>             size: 104857600
>         max.request.size: 110000000
>         message:
>           max:
>             bytes: 104857600
>         address: kafka:9092
>         consumerGroupId: my-consumer-group
>         startupPosition:
>           type: earliest
>         topics:
>           - topic: entry # used for retrop-compatibility, to be removed in next release
>             valueType: project.A/Message
>             targets:
>               - project.redacted/Entry
>
>
> None of the above solutions seems to be working.
> Does anyone have the ability to clarify what I am not doing correctly ?
>
> Thanks in advance,
> Jérémy
>