You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Lynn Chen <al...@163.com> on 2021/03/01 03:10:30 UTC

flink job 一直失败重试, 生产者id 没有被分配事务 id

hi,  all 


flink 生产数据到 kafka 报错,  导致 job  一直重试


跟踪情况:  每个 job 启动后,  大约正常跑 20 天左右就开始出现这个问题了, 导致任务一直重试, 一直未找到问题的真实原因




报错信息:


org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id.
	at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
	at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
	at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
	at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
	at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
	at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
	at java.lang.Thread.run(Thread.java:748)




kafka 生产配置: 


// InstanceAlreadyExistsException
prop.setProperty("client.id", "")
// 修改生产者的事务超时属性transaction.timeout.ms
prop.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "")
prop.setProperty("max_in_flight_requests_per_connection", "1")
// 幂等性 Producer ENABLE_IDEMPOTENCE_CONFIG
prop.setProperty("enable_idempotence_config", "true")
// RETRIES_CONFIG
prop.setProperty("retries_config", "5")

val kafkaSink: FlinkKafkaProducer[String] = new FlinkKafkaProducer[String](
  topic,
new ResultStringKafkaSerializationSchema(topic),
prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
)


flink 同样配置的 EXACTLY_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)


谢谢大家!