You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "LCER (Jira)" <ji...@apache.org> on 2022/05/24 07:18:00 UTC

[jira] [Comment Edited] (FLINK-27738) instance KafkaSink support config topic properties

    [ https://issues.apache.org/jira/browse/FLINK-27738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17541231#comment-17541231 ] 

LCER edited comment on FLINK-27738 at 5/24/22 7:17 AM:
-------------------------------------------------------

[~martijnvisser] ,I use flink mysql cdc  collect  data from mysql to kafka, but  some table  row data size is large than kafka default config value , in this case ,throw an exception :
org.apache.kafka.common.errors.RecordTooLargeException: The message is XXX bytes when serialized which is larger than the maximum request size you have configured with  the max.request.size configuration.
so ,I want use  KafkaSinkBuilder to config Topic of  {{max.message.bytes}} proeperty to resoleve the problem;
 


was (Author: JIRAUSER289859):
[~martijnvisser] ,I use flink mysql cdc  collect  data from mysql to kafka, but  some table  row data size is large than kafka default config value , in this case ,throw an exception :
org.apache.kafka.common.errors.RecordTooLargeException: The message is XXX bytes when serialized which is larger than XXX, which is the value of the max.request.size configuration.
so ,I want use  KafkaSinkBuilder to config Topic proeperties to resoleve the problem;
 

> instance KafkaSink support config topic properties
> --------------------------------------------------
>
>                 Key: FLINK-27738
>                 URL: https://issues.apache.org/jira/browse/FLINK-27738
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.15.0
>            Reporter: LCER
>            Priority: Major
>
> I use KafkaSink to config Kafka information as following:
> *KafkaSink.<String>builder()*
>  *.setBootstrapServers(brokers)*
>  *.setRecordSerializer(KafkaRecordSerializationSchema.builder()*
>  *.setTopicSelector(topicSelector)*
>  *.setValueSerializationSchema(new SimpleStringSchema())*
>  *.build()*
>  *)*
>  *.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)*
>  *.setKafkaProducerConfig(properties)*
>  *.build();*
> *----------------*
> *I can't find any method to support config topic properties*



--
This message was sent by Atlassian Jira
(v8.20.7#820007)