You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Rens Groothuijsen (Jira)" <ji...@apache.org> on 2020/04/01 16:13:00 UTC

[jira] [Commented] (KAFKA-9547) Kafka transaction - skip one offset when the application stops and be started again

    [ https://issues.apache.org/jira/browse/KAFKA-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17072917#comment-17072917 ] 

Rens Groothuijsen commented on KAFKA-9547:
------------------------------------------

[~cheatmenot] As far as I know this is the expected behavior, since the transaction itself is also written to the partition as a record.

> Kafka transaction - skip one offset when the application stops and be started again
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-9547
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9547
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.4.0
>         Environment: I am using kafka-clients 2.4.0 and wurstmeister/kafka:2.12-2.3.0
>            Reporter: Rumel
>            Priority: Minor
>
> To be fair, I have tested it with normal kafka without transaction scheme, and it does not skip the offset when I try to rerun the ProducerTest like a lot of times.
> {code:java}
> object ProducerTest extends LazyLogging {
>   def main(args: Array[String]): Unit = {
>     val props = new Properties()
>     props.put("bootstrap.servers", "kafka.local:9092")
>     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
>     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
>     props.put("acks", "all")
>     props.put("retries", "3")
>     val producer = new KafkaProducer[String, String](props)
>     val record = new ProducerRecord[String, String]("zxc", "key", "value")
>     val record2 = new ProducerRecord[String, String]("zxc", "key2", "value2")
>     val record3 = new ProducerRecord[String, String]("zxc", "key3", "value3")
>     producer.send(record)
>     producer.send(record2)
>     producer.send(record3)
>     Thread.sleep(3000)
>   }
> }{code}
> But when I enable transaction on producer, it will skip one offset when the ProducerTestWithTransaction application is rerun. Like when I first started it, it has an offset of 0,1,2 then after rerun, it will be 4,5,6 which skips 3, and so on and so forth.
> {code:java}
> object ProducerTestWithTransaction extends LazyLogging {
>   def main(args: Array[String]): Unit = {
>     val props = new Properties()
>     props.put("bootstrap.servers", "kafka.local:9092")
>     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
>     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
>     props.put("enable.idempotence", "true")
>     props.put("transactional.id", "alona")
>     props.put("acks", "all")
>     props.put("retries", "3")
>     val producer = new KafkaProducer[String, String](props)
>     val record = new ProducerRecord[String, String]("wew", "key", "value")
>     val record2 = new ProducerRecord[String, String]("wew", "key2", "value2")
>     val record3 = new ProducerRecord[String, String]("wew", "key3", "value3")
>     producer.initTransactions()
>     try {
>       producer.beginTransaction()
>       producer.send(record)
>       producer.send(record2)
>       producer.send(record3)
>       producer.commitTransaction()
>     } catch {
>       case e: ProducerFencedException => producer.close()
>       case e: Exception => producer.abortTransaction();
>     }
>   }
> }{code}
> Please enlighten me why this is happening? Is this the standard behavior when we are using transaction? Is there any workaround on this to not skip an offset. Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)