You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "mzz (Jira)" <ji...@apache.org> on 2020/07/21 02:08:00 UTC

[jira] [Closed] (FLINK-18575) Failed to send data to Kafka

     [ https://issues.apache.org/jira/browse/FLINK-18575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

mzz closed FLINK-18575.
-----------------------
    Release Note: kafka  has some error,its not about flink
      Resolution: Not A Problem

> Failed to send data to Kafka
> ----------------------------
>
>                 Key: FLINK-18575
>                 URL: https://issues.apache.org/jira/browse/FLINK-18575
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.0
>            Reporter: mzz
>            Priority: Major
>
> Flink version: 1.10.0
> Kafka version: 2.2
> *code:*
> {code:java}
>  private def producerKafka(aggs_result: DataStream[String], topic: String, parallelism: Int) = {
>     val kafkaPro = new Properties()
>     kafkaPro.setProperty("bootstrap.servers", SINK_BROKERS)
>     kafkaPro.setProperty("zookeeper.connect", SINK_ZK)
>     kafkaPro.setProperty("request.timeout.ms", "10000")
>     kafkaPro.setProperty("compression.type", "snappy")
>     kafkaPro.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "")
>     // 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试:
>     kafkaPro.setProperty(ProducerConfig.RETRIES_CONFIG, "5")
>     val kafka = new FlinkKafkaProducer[String](topic, new ResultDtSerialization(topic), kafkaPro, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
>     aggs_result.addSink(kafka).setParallelism(parallelism)
>   }
> {code}
> *when i use this code to produce  to kafka ,its report a Error :
> *{code:java}
> 2020-07-13 10:25:47,624 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Pending record count must be zero at this point: 1
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
> 	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480)
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Pending record count must be zero at this point: 1
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
> 	... 8 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)