You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by PATEL Himaliben <hi...@thalesgroup.com> on 2020/08/06 10:17:32 UTC

Kafka Time-based offset fetching not working when in Kafka Produce Transactional semantics are enabled

Hi,



We are implementing exactly once in our application with help of KafakaConsumer's offsetsForTimes API.



Out kafka producer is using transactional semantics  as below : pseudo steps



It is enabled using :          producerProps.put("transactional.id", "h3");

                                                KafkaProducer<String,String> producer = new KafkaProducer<String,String>(producerProps);



And transactions are committed using :

                1. producer.initTransactions();

                2. producer.beginTransaction();

                3. // push some data

                4. producer.commitTransaction();

                5. producer.beginTransaction();

                // Repeat 3rd, 4th and 5th statements in loop

                //when done

                6. producer.commitTransaction();

                7. producer.close();



Our observations are :

                I have tried combinations for transaction semantics enabled/disabled + CreateTime is current / 1 day old.

                1. Transaction disabled , CreateTime not provided (Will take system time) : Offset fetching is proper

                2. Transaction enabled , CreateTime not provided (Will take system time) : Offset fetching is proper

                3. Transaction disabled , CreateTime is 1 day old : Offset fetching is proper

                4. Transaction enabled , CreateTime is 1 day old : Offset return is seems like very first batch commit offset only.

                (Regarding above case #4 , we have tried to use properties message.timestamp.difference.max.ms and retention.ms also. But result is same. And our data is ranging from few hours to days but again result is same.)





Please help if we are missing something when we are using transaction semantics for populating kafka topic.