You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "LCER (Jira)" <ji...@apache.org> on 2022/05/24 07:18:00 UTC
[jira] [Comment Edited] (FLINK-27738) instance KafkaSink support config topic properties
[ https://issues.apache.org/jira/browse/FLINK-27738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17541231#comment-17541231 ]
LCER edited comment on FLINK-27738 at 5/24/22 7:17 AM:
-------------------------------------------------------
[~martijnvisser] ,I use flink mysql cdc collect data from mysql to kafka, but some table row data size is large than kafka default config value , in this case ,throw an exception :
org.apache.kafka.common.errors.RecordTooLargeException: The message is XXX bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
so ,I want use KafkaSinkBuilder to config Topic of {{max.message.bytes}} proeperty to resoleve the problem;
was (Author: JIRAUSER289859):
[~martijnvisser] ,I use flink mysql cdc collect data from mysql to kafka, but some table row data size is large than kafka default config value , in this case ,throw an exception :
org.apache.kafka.common.errors.RecordTooLargeException: The message is XXX bytes when serialized which is larger than XXX, which is the value of the max.request.size configuration.
so ,I want use KafkaSinkBuilder to config Topic proeperties to resoleve the problem;
> instance KafkaSink support config topic properties
> --------------------------------------------------
>
> Key: FLINK-27738
> URL: https://issues.apache.org/jira/browse/FLINK-27738
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.15.0
> Reporter: LCER
> Priority: Major
>
> I use KafkaSink to config Kafka information as following:
> *KafkaSink.<String>builder()*
> *.setBootstrapServers(brokers)*
> *.setRecordSerializer(KafkaRecordSerializationSchema.builder()*
> *.setTopicSelector(topicSelector)*
> *.setValueSerializationSchema(new SimpleStringSchema())*
> *.build()*
> *)*
> *.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)*
> *.setKafkaProducerConfig(properties)*
> *.build();*
> *----------------*
> *I can't find any method to support config topic properties*
--
This message was sent by Atlassian Jira
(v8.20.7#820007)