You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by jingguo yao <ya...@gmail.com> on 2018/08/22 15:34:43 UTC

Is it possible to send a message more than once with transactional.id set?

I am sending some Kafka messages over the Internet. The message sizes
are about 400K. The essential logic of my code is as follows:

Properties config = new Properties();
config.put("bootstrap.servers", "...");
config.put("client.id", "...");
config.put("key.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
config.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
config.put("transactional.id", "...");

producer = new KafkaProducer<>(config());
producer.initTransactions();

producer.beginTransaction();
for (byte[] bytesMessage : batch) {
  producer.send(
      new ProducerRecord<>("...", null, bytesMessage));
}
producer.commitTransaction();

I found that there were two records for some bytesMessage on the
topic. Is there something wrong with my code? Or duplicated message
deliveries are still possible with transactional.id set.

-- 
Jingguo

Re: Is it possible to send a message more than once with transactional.id set?

Posted by jingguo yao <ya...@gmail.com>.
Matthias:

Thanks for your reply. With your answer, I have found the cause of my
problem.

There is nothing wrong with the KafkaProducer code. The problem is
with the use of KafkaComsuer. I am storing committed offsets outside
of Kafka. I am counting the received consumer records to compute
committed offset. Since I do not take the "commit markers" into
account, duplicated consumer records are received.
Matthias J. Sax <ma...@confluent.io> 于2018年8月23日周四 上午12:41写道:
>
> I would assume, that you refer to "commit markers". Each time you call
> commitTransaction(), a special message called commit marker is written
> to the log to indicate a successful transaction (there are also "abort
> markers" if a transaction gets aborted).
>
> Those markers "eat up" one offset, but wont' be delivered to the
> application but are filtered out on read. Thus, using transaction, you
> cannot infer from `endOffset - startOffset` of a partition how many
> message are actually in the topic.
>
> You can verify this by consuming the topic and inspecting the offsets of
> returned messages -- commit/abort markers are skipped and you wont
> receive message with consecutive offsets.
>
> -Matthias
>
> On 8/22/18 8:34 AM, jingguo yao wrote:
> > I am sending some Kafka messages over the Internet. The message sizes
> > are about 400K. The essential logic of my code is as follows:
> >
> > Properties config = new Properties();
> > config.put("bootstrap.servers", "...");
> > config.put("client.id", "...");
> > config.put("key.serializer",
> > "org.apache.kafka.common.serialization.ByteArraySerializer");
> > config.put("value.serializer",
> > "org.apache.kafka.common.serialization.ByteArraySerializer");
> > config.put("transactional.id", "...");
> >
> > producer = new KafkaProducer<>(config());
> > producer.initTransactions();
> >
> > producer.beginTransaction();
> > for (byte[] bytesMessage : batch) {
> >   producer.send(
> >       new ProducerRecord<>("...", null, bytesMessage));
> > }
> > producer.commitTransaction();
> >
> > I found that there were two records for some bytesMessage on the
> > topic. Is there something wrong with my code? Or duplicated message
> > deliveries are still possible with transactional.id set.
> >
>


-- 
Jingguo

Re: Is it possible to send a message more than once with transactional.id set?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I would assume, that you refer to "commit markers". Each time you call
commitTransaction(), a special message called commit marker is written
to the log to indicate a successful transaction (there are also "abort
markers" if a transaction gets aborted).

Those markers "eat up" one offset, but wont' be delivered to the
application but are filtered out on read. Thus, using transaction, you
cannot infer from `endOffset - startOffset` of a partition how many
message are actually in the topic.

You can verify this by consuming the topic and inspecting the offsets of
returned messages -- commit/abort markers are skipped and you wont
receive message with consecutive offsets.

-Matthias

On 8/22/18 8:34 AM, jingguo yao wrote:
> I am sending some Kafka messages over the Internet. The message sizes
> are about 400K. The essential logic of my code is as follows:
> 
> Properties config = new Properties();
> config.put("bootstrap.servers", "...");
> config.put("client.id", "...");
> config.put("key.serializer",
> "org.apache.kafka.common.serialization.ByteArraySerializer");
> config.put("value.serializer",
> "org.apache.kafka.common.serialization.ByteArraySerializer");
> config.put("transactional.id", "...");
> 
> producer = new KafkaProducer<>(config());
> producer.initTransactions();
> 
> producer.beginTransaction();
> for (byte[] bytesMessage : batch) {
>   producer.send(
>       new ProducerRecord<>("...", null, bytesMessage));
> }
> producer.commitTransaction();
> 
> I found that there were two records for some bytesMessage on the
> topic. Is there something wrong with my code? Or duplicated message
> deliveries are still possible with transactional.id set.
>