You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ross Black <ro...@gmail.com> on 2013/08/23 06:32:21 UTC

Producer message ordering problem

Hi,

I am using Kafka 0.7.1, and using the low-level SyncProducer to send
messages to a *single* partition from a *single* thread.
The client sends messages that contain sequential numbers so it is obvious
at the consumer when message order is shuffled.
I have noticed that messages can be saved out-or-order by Kafka when there
are connection problems, and am looking for possible solutions (I think I
already know the cause).

The client sends messages in a retry loop so that it will wait for a short
period and then retry to send on any IO errors.  In SyncProducer, any
IOException triggers a disconnect.  Next time send is called a new
connection is established.  I believe that it is this disconnect/reconnect
cycle that can cause messages to be saved to the kafka log in a different
order to that of the client.

I had previously had the same sort of issue with reconnect.interval/time,
which was fixed by disabling those reconnect settings.
http://mail-archives.apache.org/mod_mbox/kafka-users/201305.mbox/%3CCAM%2BbZhjssxmUhn_L%3Do0bGsD7PAXFGQHRpOKABcLz29vF3cNOzA%40mail.gmail.com%3E

Is there anything in 0.7 that would allow me to solve this problem?  The
only option I can see at the moment is to not perform retries.

Does 0.8 handle this issue any differently?

Thanks,
Ross

Re: Producer message ordering problem

Posted by Ross Black <ro...@gmail.com>.
Hi Jay, Jun,

Thanks for your comments - you have confirmed what I thought was most
likely the case.
I will attempt to work around the issue for the moment in the client to
minimise the chance of the out-of-order problem occurring (probably by
stopping retries and triggering a fail-fast of the JVM so that by the time
it restarts there is little chance of pending requests on the prior
connection).

I look forward to seeing a design proposal.

Thanks,
Ross



On 24 August 2013 01:34, Jay Kreps <ja...@gmail.com> wrote:

> Yeah I agree, this is a problem.
>
> The issue is that a produce request which is either in the network buffer
> or in the request processing queue on the broker may still be processed
> after a disconnect. So there is a race condition between that processing
> and the reconnect/retry logic. You could work around this in a hacky way
> using the reconnect backoff time, but the fundamental race condition
> exists. We could easily make this more transparent by having some mode
> where disconnection throws an error back to the client, but in fact there
> is no way for the client to solve this either.
>
> Neither Storm nor Samza nor any other framework would actually fix this
> issue for you, since they are in turn dependent on Kafka's ordering (though
> they might solve a lot of other problems).
>
> As Jun mentions we have been thinking of having a per-producer sequence
> number to enforce ordering. This would allow us to make produce calls
> idempotent, enforce strong ordering in the case of retries, as well as fix
> a number of other corner cases. I think it would handle this issue as well.
> But it's not a quick patch.
>
> I will try to get a design proposal up by next week so we have something
> concrete to discuss.
>
> -Jay
>
>
> On Thu, Aug 22, 2013 at 9:32 PM, Ross Black <ro...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I am using Kafka 0.7.1, and using the low-level SyncProducer to send
> > messages to a *single* partition from a *single* thread.
> > The client sends messages that contain sequential numbers so it is
> obvious
> > at the consumer when message order is shuffled.
> > I have noticed that messages can be saved out-or-order by Kafka when
> there
> > are connection problems, and am looking for possible solutions (I think I
> > already know the cause).
> >
> > The client sends messages in a retry loop so that it will wait for a
> short
> > period and then retry to send on any IO errors.  In SyncProducer, any
> > IOException triggers a disconnect.  Next time send is called a new
> > connection is established.  I believe that it is this
> disconnect/reconnect
> > cycle that can cause messages to be saved to the kafka log in a different
> > order to that of the client.
> >
> > I had previously had the same sort of issue with reconnect.interval/time,
> > which was fixed by disabling those reconnect settings.
> >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201305.mbox/%3CCAM%2BbZhjssxmUhn_L%3Do0bGsD7PAXFGQHRpOKABcLz29vF3cNOzA%40mail.gmail.com%3E
> >
> > Is there anything in 0.7 that would allow me to solve this problem?  The
> > only option I can see at the moment is to not perform retries.
> >
> > Does 0.8 handle this issue any differently?
> >
> > Thanks,
> > Ross
> >
>

Re: Producer message ordering problem

Posted by Jay Kreps <ja...@gmail.com>.
Yeah I agree, this is a problem.

The issue is that a produce request which is either in the network buffer
or in the request processing queue on the broker may still be processed
after a disconnect. So there is a race condition between that processing
and the reconnect/retry logic. You could work around this in a hacky way
using the reconnect backoff time, but the fundamental race condition
exists. We could easily make this more transparent by having some mode
where disconnection throws an error back to the client, but in fact there
is no way for the client to solve this either.

Neither Storm nor Samza nor any other framework would actually fix this
issue for you, since they are in turn dependent on Kafka's ordering (though
they might solve a lot of other problems).

As Jun mentions we have been thinking of having a per-producer sequence
number to enforce ordering. This would allow us to make produce calls
idempotent, enforce strong ordering in the case of retries, as well as fix
a number of other corner cases. I think it would handle this issue as well.
But it's not a quick patch.

I will try to get a design proposal up by next week so we have something
concrete to discuss.

-Jay


On Thu, Aug 22, 2013 at 9:32 PM, Ross Black <ro...@gmail.com> wrote:

> Hi,
>
> I am using Kafka 0.7.1, and using the low-level SyncProducer to send
> messages to a *single* partition from a *single* thread.
> The client sends messages that contain sequential numbers so it is obvious
> at the consumer when message order is shuffled.
> I have noticed that messages can be saved out-or-order by Kafka when there
> are connection problems, and am looking for possible solutions (I think I
> already know the cause).
>
> The client sends messages in a retry loop so that it will wait for a short
> period and then retry to send on any IO errors.  In SyncProducer, any
> IOException triggers a disconnect.  Next time send is called a new
> connection is established.  I believe that it is this disconnect/reconnect
> cycle that can cause messages to be saved to the kafka log in a different
> order to that of the client.
>
> I had previously had the same sort of issue with reconnect.interval/time,
> which was fixed by disabling those reconnect settings.
>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201305.mbox/%3CCAM%2BbZhjssxmUhn_L%3Do0bGsD7PAXFGQHRpOKABcLz29vF3cNOzA%40mail.gmail.com%3E
>
> Is there anything in 0.7 that would allow me to solve this problem?  The
> only option I can see at the moment is to not perform retries.
>
> Does 0.8 handle this issue any differently?
>
> Thanks,
> Ross
>

Re: Producer message ordering problem

Posted by Ross Black <ro...@gmail.com>.
Hi Phillip,

Thanks for you input.  I did evaluate Storm about 9 months ago before going
down the path of developing this myself on top of Kafka.
The primary reason for not using Storm was the inability to control
allocation of requests to processing elements.  This same requirement was
the reason for using the low-level Kafka consumer and producer rather than
the higher-level Kafka APIs (something I hope will be possible with the
redesigned APIs -
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design).
As Jay mentioned, using Storm would not fix the out-of-order delivery issue.
I will probably eventually couple Storm to our Kafka messaging, but will
need to fix https://github.com/nathanmarz/storm/issues/115 before I could
use it.

I am also about to look at Samza to see if it can help me avoid having to
write more code :-)

Thanks,
Ross



On 24 August 2013 00:34, Philip O'Toole <ph...@loggly.com> wrote:

> Ross -- thanks.
>
> How much code are you writing to do all this, post-Kafka? Have you
> considered Storm? I believe the Trident topologies can give you
> guaranteed-once semantics, so you may be interested in checking that
> out, if you have the time (I have not yet played with Trident stuff
> myself, but Storm in general, yes). Coupling Storm to Kafka is a very
> popular thing to do. Even without Trident, and just using Storm in a
> simpler mode, may save you from writing a ton of code.
>
> Philip
>
> On Thu, Aug 22, 2013 at 11:59 PM, Ross Black <ro...@gmail.com>
> wrote:
> > Hi Phillip,
> >
> > If I can assume that all messages within a single partition are ordered
> the
> > same as delivery order, the state management to eliminate duplicates is
> far
> > simpler.
> >
> > I am using Kafka as the infrastructure for a streaming map/reduce style
> > solution, where throughput is critical.
> > Events are sent into topic A, which is partitioned based on event id.
> > Consumers of topic A generate data that is sent to a different topic B,
> > which is partitioned by a persistence key.  Consumers of topic B save the
> > data to a partitioned store.  Each stage can be single-threaded by the
> > partition, which results in zero contention on the partitioned data store
> > and massively improves the throughput.
> > Message offsets are used to end-to-end to eliminate duplicates, so the
> > application effectively achieves guaranteed once-only processing of
> > messages.  Currently, any out-of-order messages result in data being
> > dropped because duplicate tracking is based *only* on message offsets.
>  If
> > ordering within a partition is not guaranteed, I would need to track
> > maintain a list of message offsets that have been processed, rather than
> > having to track just the latest message offset for a partition (and would
> > need to persist this list of offsets to allow resume after failure).
> >
> > The assumption of guaranteed order is essential for the throughput the
> > application achieves.
> >
> > Thanks,
> > Ross
> >
> >
> >
> > On 23 August 2013 14:36, Philip O'Toole <ph...@loggly.com> wrote:
> >
> >> I am curious. What is it about your design that requires you track order
> >> so tightly? Maybe there is another way to meet your needs instead of
> >> relying on Kafka to do it.
> >>
> >> Philip
> >>
> >> On Aug 22, 2013, at 9:32 PM, Ross Black <ro...@gmail.com> wrote:
> >>
> >> > Hi,
> >> >
> >> > I am using Kafka 0.7.1, and using the low-level SyncProducer to send
> >> > messages to a *single* partition from a *single* thread.
> >> > The client sends messages that contain sequential numbers so it is
> >> obvious
> >> > at the consumer when message order is shuffled.
> >> > I have noticed that messages can be saved out-or-order by Kafka when
> >> there
> >> > are connection problems, and am looking for possible solutions (I
> think I
> >> > already know the cause).
> >> >
> >> > The client sends messages in a retry loop so that it will wait for a
> >> short
> >> > period and then retry to send on any IO errors.  In SyncProducer, any
> >> > IOException triggers a disconnect.  Next time send is called a new
> >> > connection is established.  I believe that it is this
> >> disconnect/reconnect
> >> > cycle that can cause messages to be saved to the kafka log in a
> different
> >> > order to that of the client.
> >> >
> >> > I had previously had the same sort of issue with
> reconnect.interval/time,
> >> > which was fixed by disabling those reconnect settings.
> >> >
> >>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201305.mbox/%3CCAM%2BbZhjssxmUhn_L%3Do0bGsD7PAXFGQHRpOKABcLz29vF3cNOzA%40mail.gmail.com%3E
> >> >
> >> > Is there anything in 0.7 that would allow me to solve this problem?
>  The
> >> > only option I can see at the moment is to not perform retries.
> >> >
> >> > Does 0.8 handle this issue any differently?
> >> >
> >> > Thanks,
> >> > Ross
> >>
>

Re: Producer message ordering problem

Posted by Philip O'Toole <ph...@loggly.com>.
Ross -- thanks.

How much code are you writing to do all this, post-Kafka? Have you
considered Storm? I believe the Trident topologies can give you
guaranteed-once semantics, so you may be interested in checking that
out, if you have the time (I have not yet played with Trident stuff
myself, but Storm in general, yes). Coupling Storm to Kafka is a very
popular thing to do. Even without Trident, and just using Storm in a
simpler mode, may save you from writing a ton of code.

Philip

On Thu, Aug 22, 2013 at 11:59 PM, Ross Black <ro...@gmail.com> wrote:
> Hi Phillip,
>
> If I can assume that all messages within a single partition are ordered the
> same as delivery order, the state management to eliminate duplicates is far
> simpler.
>
> I am using Kafka as the infrastructure for a streaming map/reduce style
> solution, where throughput is critical.
> Events are sent into topic A, which is partitioned based on event id.
> Consumers of topic A generate data that is sent to a different topic B,
> which is partitioned by a persistence key.  Consumers of topic B save the
> data to a partitioned store.  Each stage can be single-threaded by the
> partition, which results in zero contention on the partitioned data store
> and massively improves the throughput.
> Message offsets are used to end-to-end to eliminate duplicates, so the
> application effectively achieves guaranteed once-only processing of
> messages.  Currently, any out-of-order messages result in data being
> dropped because duplicate tracking is based *only* on message offsets.  If
> ordering within a partition is not guaranteed, I would need to track
> maintain a list of message offsets that have been processed, rather than
> having to track just the latest message offset for a partition (and would
> need to persist this list of offsets to allow resume after failure).
>
> The assumption of guaranteed order is essential for the throughput the
> application achieves.
>
> Thanks,
> Ross
>
>
>
> On 23 August 2013 14:36, Philip O'Toole <ph...@loggly.com> wrote:
>
>> I am curious. What is it about your design that requires you track order
>> so tightly? Maybe there is another way to meet your needs instead of
>> relying on Kafka to do it.
>>
>> Philip
>>
>> On Aug 22, 2013, at 9:32 PM, Ross Black <ro...@gmail.com> wrote:
>>
>> > Hi,
>> >
>> > I am using Kafka 0.7.1, and using the low-level SyncProducer to send
>> > messages to a *single* partition from a *single* thread.
>> > The client sends messages that contain sequential numbers so it is
>> obvious
>> > at the consumer when message order is shuffled.
>> > I have noticed that messages can be saved out-or-order by Kafka when
>> there
>> > are connection problems, and am looking for possible solutions (I think I
>> > already know the cause).
>> >
>> > The client sends messages in a retry loop so that it will wait for a
>> short
>> > period and then retry to send on any IO errors.  In SyncProducer, any
>> > IOException triggers a disconnect.  Next time send is called a new
>> > connection is established.  I believe that it is this
>> disconnect/reconnect
>> > cycle that can cause messages to be saved to the kafka log in a different
>> > order to that of the client.
>> >
>> > I had previously had the same sort of issue with reconnect.interval/time,
>> > which was fixed by disabling those reconnect settings.
>> >
>> http://mail-archives.apache.org/mod_mbox/kafka-users/201305.mbox/%3CCAM%2BbZhjssxmUhn_L%3Do0bGsD7PAXFGQHRpOKABcLz29vF3cNOzA%40mail.gmail.com%3E
>> >
>> > Is there anything in 0.7 that would allow me to solve this problem?  The
>> > only option I can see at the moment is to not perform retries.
>> >
>> > Does 0.8 handle this issue any differently?
>> >
>> > Thanks,
>> > Ross
>>

Re: Producer message ordering problem

Posted by Ross Black <ro...@gmail.com>.
Hi Phillip,

If I can assume that all messages within a single partition are ordered the
same as delivery order, the state management to eliminate duplicates is far
simpler.

I am using Kafka as the infrastructure for a streaming map/reduce style
solution, where throughput is critical.
Events are sent into topic A, which is partitioned based on event id.
Consumers of topic A generate data that is sent to a different topic B,
which is partitioned by a persistence key.  Consumers of topic B save the
data to a partitioned store.  Each stage can be single-threaded by the
partition, which results in zero contention on the partitioned data store
and massively improves the throughput.
Message offsets are used to end-to-end to eliminate duplicates, so the
application effectively achieves guaranteed once-only processing of
messages.  Currently, any out-of-order messages result in data being
dropped because duplicate tracking is based *only* on message offsets.  If
ordering within a partition is not guaranteed, I would need to track
maintain a list of message offsets that have been processed, rather than
having to track just the latest message offset for a partition (and would
need to persist this list of offsets to allow resume after failure).

The assumption of guaranteed order is essential for the throughput the
application achieves.

Thanks,
Ross



On 23 August 2013 14:36, Philip O'Toole <ph...@loggly.com> wrote:

> I am curious. What is it about your design that requires you track order
> so tightly? Maybe there is another way to meet your needs instead of
> relying on Kafka to do it.
>
> Philip
>
> On Aug 22, 2013, at 9:32 PM, Ross Black <ro...@gmail.com> wrote:
>
> > Hi,
> >
> > I am using Kafka 0.7.1, and using the low-level SyncProducer to send
> > messages to a *single* partition from a *single* thread.
> > The client sends messages that contain sequential numbers so it is
> obvious
> > at the consumer when message order is shuffled.
> > I have noticed that messages can be saved out-or-order by Kafka when
> there
> > are connection problems, and am looking for possible solutions (I think I
> > already know the cause).
> >
> > The client sends messages in a retry loop so that it will wait for a
> short
> > period and then retry to send on any IO errors.  In SyncProducer, any
> > IOException triggers a disconnect.  Next time send is called a new
> > connection is established.  I believe that it is this
> disconnect/reconnect
> > cycle that can cause messages to be saved to the kafka log in a different
> > order to that of the client.
> >
> > I had previously had the same sort of issue with reconnect.interval/time,
> > which was fixed by disabling those reconnect settings.
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201305.mbox/%3CCAM%2BbZhjssxmUhn_L%3Do0bGsD7PAXFGQHRpOKABcLz29vF3cNOzA%40mail.gmail.com%3E
> >
> > Is there anything in 0.7 that would allow me to solve this problem?  The
> > only option I can see at the moment is to not perform retries.
> >
> > Does 0.8 handle this issue any differently?
> >
> > Thanks,
> > Ross
>

Re: Producer message ordering problem

Posted by Philip O'Toole <ph...@loggly.com>.
I am curious. What is it about your design that requires you track order so tightly? Maybe there is another way to meet your needs instead of relying on Kafka to do it. 

Philip

On Aug 22, 2013, at 9:32 PM, Ross Black <ro...@gmail.com> wrote:

> Hi,
> 
> I am using Kafka 0.7.1, and using the low-level SyncProducer to send
> messages to a *single* partition from a *single* thread.
> The client sends messages that contain sequential numbers so it is obvious
> at the consumer when message order is shuffled.
> I have noticed that messages can be saved out-or-order by Kafka when there
> are connection problems, and am looking for possible solutions (I think I
> already know the cause).
> 
> The client sends messages in a retry loop so that it will wait for a short
> period and then retry to send on any IO errors.  In SyncProducer, any
> IOException triggers a disconnect.  Next time send is called a new
> connection is established.  I believe that it is this disconnect/reconnect
> cycle that can cause messages to be saved to the kafka log in a different
> order to that of the client.
> 
> I had previously had the same sort of issue with reconnect.interval/time,
> which was fixed by disabling those reconnect settings.
> http://mail-archives.apache.org/mod_mbox/kafka-users/201305.mbox/%3CCAM%2BbZhjssxmUhn_L%3Do0bGsD7PAXFGQHRpOKABcLz29vF3cNOzA%40mail.gmail.com%3E
> 
> Is there anything in 0.7 that would allow me to solve this problem?  The
> only option I can see at the moment is to not perform retries.
> 
> Does 0.8 handle this issue any differently?
> 
> Thanks,
> Ross

RE: Producer message ordering problem

Posted by "Yu, Libo " <li...@citi.com>.
An auto-increment index can be assigned to a message as a key when it is being published.
The consumer can monitor this index when receiving. If the expected message does not 
show up,  buffer all received messages in a hashtable (use index as hash key) until it is 
received. Then handle all messages in  the hashtable. 

Regards,

Libo


-----Original Message-----
From: Jun Rao [mailto:junrao@gmail.com] 
Sent: Friday, August 23, 2013 11:20 AM
To: users@kafka.apache.org
Subject: Re: Producer message ordering problem

Ross,

This is a general issue with resending. Since resending is typically done on a new socket, essentially new messages are sent from a new instance of producer. So, there is no easy way to ensure that the new messages are ordered behind the ones sent by the old instance of the producer. So 0.8 will have similar issues. It may be possible to add some sort of per client sequence id and track that in the broker. But this may not be trivial and will need more thoughts.

Thanks,

Jun



On Thu, Aug 22, 2013 at 9:32 PM, Ross Black <ro...@gmail.com> wrote:

> Hi,
>
> I am using Kafka 0.7.1, and using the low-level SyncProducer to send 
> messages to a *single* partition from a *single* thread.
> The client sends messages that contain sequential numbers so it is 
> obvious at the consumer when message order is shuffled.
> I have noticed that messages can be saved out-or-order by Kafka when 
> there are connection problems, and am looking for possible solutions 
> (I think I already know the cause).
>
> The client sends messages in a retry loop so that it will wait for a 
> short period and then retry to send on any IO errors.  In 
> SyncProducer, any IOException triggers a disconnect.  Next time send 
> is called a new connection is established.  I believe that it is this 
> disconnect/reconnect cycle that can cause messages to be saved to the 
> kafka log in a different order to that of the client.
>
> I had previously had the same sort of issue with 
> reconnect.interval/time, which was fixed by disabling those reconnect settings.
>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201305.mbox/%3CCA
> M%2BbZhjssxmUhn_L%3Do0bGsD7PAXFGQHRpOKABcLz29vF3cNOzA%40mail.gmail.com
> %3E
>
> Is there anything in 0.7 that would allow me to solve this problem?  
> The only option I can see at the moment is to not perform retries.
>
> Does 0.8 handle this issue any differently?
>
> Thanks,
> Ross
>

Re: Producer message ordering problem

Posted by Jun Rao <ju...@gmail.com>.
Ross,

This is a general issue with resending. Since resending is typically done
on a new socket, essentially new messages are sent from a new instance of
producer. So, there is no easy way to ensure that the new messages are
ordered behind the ones sent by the old instance of the producer. So 0.8
will have similar issues. It may be possible to add some sort of per client
sequence id and track that in the broker. But this may not be trivial and
will need more thoughts.

Thanks,

Jun



On Thu, Aug 22, 2013 at 9:32 PM, Ross Black <ro...@gmail.com> wrote:

> Hi,
>
> I am using Kafka 0.7.1, and using the low-level SyncProducer to send
> messages to a *single* partition from a *single* thread.
> The client sends messages that contain sequential numbers so it is obvious
> at the consumer when message order is shuffled.
> I have noticed that messages can be saved out-or-order by Kafka when there
> are connection problems, and am looking for possible solutions (I think I
> already know the cause).
>
> The client sends messages in a retry loop so that it will wait for a short
> period and then retry to send on any IO errors.  In SyncProducer, any
> IOException triggers a disconnect.  Next time send is called a new
> connection is established.  I believe that it is this disconnect/reconnect
> cycle that can cause messages to be saved to the kafka log in a different
> order to that of the client.
>
> I had previously had the same sort of issue with reconnect.interval/time,
> which was fixed by disabling those reconnect settings.
>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201305.mbox/%3CCAM%2BbZhjssxmUhn_L%3Do0bGsD7PAXFGQHRpOKABcLz29vF3cNOzA%40mail.gmail.com%3E
>
> Is there anything in 0.7 that would allow me to solve this problem?  The
> only option I can see at the moment is to not perform retries.
>
> Does 0.8 handle this issue any differently?
>
> Thanks,
> Ross
>