You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Rens Groothuijsen (Jira)" <ji...@apache.org> on 2020/04/01 16:13:00 UTC
[jira] [Commented] (KAFKA-9547) Kafka transaction - skip one offset
when the application stops and be started again
[ https://issues.apache.org/jira/browse/KAFKA-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17072917#comment-17072917 ]
Rens Groothuijsen commented on KAFKA-9547:
------------------------------------------
[~cheatmenot] As far as I know this is the expected behavior, since the transaction itself is also written to the partition as a record.
> Kafka transaction - skip one offset when the application stops and be started again
> -----------------------------------------------------------------------------------
>
> Key: KAFKA-9547
> URL: https://issues.apache.org/jira/browse/KAFKA-9547
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 2.4.0
> Environment: I am using kafka-clients 2.4.0 and wurstmeister/kafka:2.12-2.3.0
> Reporter: Rumel
> Priority: Minor
>
> To be fair, I have tested it with normal kafka without transaction scheme, and it does not skip the offset when I try to rerun the ProducerTest like a lot of times.
> {code:java}
> object ProducerTest extends LazyLogging {
> def main(args: Array[String]): Unit = {
> val props = new Properties()
> props.put("bootstrap.servers", "kafka.local:9092")
> props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
> props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
> props.put("acks", "all")
> props.put("retries", "3")
> val producer = new KafkaProducer[String, String](props)
> val record = new ProducerRecord[String, String]("zxc", "key", "value")
> val record2 = new ProducerRecord[String, String]("zxc", "key2", "value2")
> val record3 = new ProducerRecord[String, String]("zxc", "key3", "value3")
> producer.send(record)
> producer.send(record2)
> producer.send(record3)
> Thread.sleep(3000)
> }
> }{code}
> But when I enable transaction on producer, it will skip one offset when the ProducerTestWithTransaction application is rerun. Like when I first started it, it has an offset of 0,1,2 then after rerun, it will be 4,5,6 which skips 3, and so on and so forth.
> {code:java}
> object ProducerTestWithTransaction extends LazyLogging {
> def main(args: Array[String]): Unit = {
> val props = new Properties()
> props.put("bootstrap.servers", "kafka.local:9092")
> props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
> props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
> props.put("enable.idempotence", "true")
> props.put("transactional.id", "alona")
> props.put("acks", "all")
> props.put("retries", "3")
> val producer = new KafkaProducer[String, String](props)
> val record = new ProducerRecord[String, String]("wew", "key", "value")
> val record2 = new ProducerRecord[String, String]("wew", "key2", "value2")
> val record3 = new ProducerRecord[String, String]("wew", "key3", "value3")
> producer.initTransactions()
> try {
> producer.beginTransaction()
> producer.send(record)
> producer.send(record2)
> producer.send(record3)
> producer.commitTransaction()
> } catch {
> case e: ProducerFencedException => producer.close()
> case e: Exception => producer.abortTransaction();
> }
> }
> }{code}
> Please enlighten me why this is happening? Is this the standard behavior when we are using transaction? Is there any workaround on this to not skip an offset. Thanks!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)