You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Darren Sargent <ds...@richrelevance.com> on 2013/03/25 19:36:31 UTC

Any update on the "distributed commit" problem?

This is where you are reading messages from a broker, doing something with the messages, then commit them to some permanent storage such as HBase. There is a race condition in commiting the offsets to Zookeeper; if the DB write succeeds, but the ZK commit fails for any reason, you'll get a duplicate batch next time you query the broker. If you commit to ZK first, and the commit to the DB then fails, you lose data.

The Kafka white paper mentions that Kafka stays agnostic about the distributed commit problem. There has been some prior discussion about this but I haven't seen any solid solutions. If you're using something like PostgreSQL that admits two-phase commits, you can roll the offset into the DB transaction, assuming you're okay with storing offsets in the DB rather than in ZK, but that's not a general solution.

Is there anything in Kafka 0.8.x that helps address this issue?

--Darren Sargent
RichRelevance (www.richrelevance.com)

Re: Any update on the "distributed commit" problem?

Posted by Neha Narkhede <ne...@gmail.com>.
In Kafka 0.8, SimpleConsumer returns a list of (message,offset) pairs.
Hence, for whatever batching you have in mind, you need to keep track
of the start and end offset of the batch that you've fetched from
Kafka. This is what your consumer needs to keep track of for every
partition that it consumes. What I described can be easily done for a
single partition. If you want transactions on multiple partitions,
then that is more complex to get right. When a consumer restarts, you
need to retrieve the last committed offset for each partition and then
issue a fetch request from that point onwards.

Thanks,
Neha

On Mon, Mar 25, 2013 at 2:40 PM, Scott Carey <sc...@richrelevance.com> wrote:
> In order to succeed in a client with two phase commit one needs the
> "coordinates" of the current batch of records from the client perspective
> of the batch (which may be many SimpleConsumer smaller batches).  I
> believe this is the range of offsets plus any partition information that
> describe the batch uniquely.  One risk I worry about is someone restarting
> a consumer with a different configured batch size after such a race
> condition is triggered, causing the batch to appear to be 'after' what has
> been committed but actually have overlap -- but perhaps this is only a
> fear due to the limitations and opaqueness of the SimpleConsumer in 0.7.x
> and an incomplete understanding of what can be done with the 0.8.x version.
>
> Is the offset alone from the SimpleConsumer in 0.8 sufficient to build two
> phase commit in a consumer application with multiple partitions?  Are
> there any additional requirements, such as committing on certain offset
> boundaries that align with boundaries elsewhere?
>
> I'm not afraid of building my own two phase commit, but I am afraid of not
> having all the information I need from Kafka to succeed in the attempt.
>
>
>
> On 3/25/13 12:33 PM, "Neha Narkhede" <ne...@gmail.com> wrote:
>
>>Today, the only safe way of controlling consumer state management is
>>by using the SimpleConsumer. The application is responsible for
>>checkpointing offsets. So, in your example, when you commit a database
>>transaction, you can store your consumer's offset as part of the txn.
>>So either your txn succeeds and the offset moves ahead or your txn
>>fails and the offset stays where it is.
>>
>>Kafka 0.9 is when we will attempt to merge the high level and low
>>level consumer APIs, move the offset management to the broker and
>>offer stronger offset checkpointing guarantees.
>>
>>Thanks,
>>Neha
>>
>>On Mon, Mar 25, 2013 at 11:36 AM, Darren Sargent
>><ds...@richrelevance.com> wrote:
>>> This is where you are reading messages from a broker, doing something
>>>with the messages, then commit them to some permanent storage such as
>>>HBase. There is a race condition in commiting the offsets to Zookeeper;
>>>if the DB write succeeds, but the ZK commit fails for any reason, you'll
>>>get a duplicate batch next time you query the broker. If you commit to
>>>ZK first, and the commit to the DB then fails, you lose data.
>>>
>>> The Kafka white paper mentions that Kafka stays agnostic about the
>>>distributed commit problem. There has been some prior discussion about
>>>this but I haven't seen any solid solutions. If you're using something
>>>like PostgreSQL that admits two-phase commits, you can roll the offset
>>>into the DB transaction, assuming you're okay with storing offsets in
>>>the DB rather than in ZK, but that's not a general solution.
>>>
>>> Is there anything in Kafka 0.8.x that helps address this issue?
>>>
>>> --Darren Sargent
>>> RichRelevance (www.richrelevance.com)
>

Re: Any update on the "distributed commit" problem?

Posted by Scott Carey <sc...@richrelevance.com>.
In order to succeed in a client with two phase commit one needs the
"coordinates" of the current batch of records from the client perspective
of the batch (which may be many SimpleConsumer smaller batches).  I
believe this is the range of offsets plus any partition information that
describe the batch uniquely.  One risk I worry about is someone restarting
a consumer with a different configured batch size after such a race
condition is triggered, causing the batch to appear to be 'after' what has
been committed but actually have overlap -- but perhaps this is only a
fear due to the limitations and opaqueness of the SimpleConsumer in 0.7.x
and an incomplete understanding of what can be done with the 0.8.x version.

Is the offset alone from the SimpleConsumer in 0.8 sufficient to build two
phase commit in a consumer application with multiple partitions?  Are
there any additional requirements, such as committing on certain offset
boundaries that align with boundaries elsewhere?

I'm not afraid of building my own two phase commit, but I am afraid of not
having all the information I need from Kafka to succeed in the attempt.



On 3/25/13 12:33 PM, "Neha Narkhede" <ne...@gmail.com> wrote:

>Today, the only safe way of controlling consumer state management is
>by using the SimpleConsumer. The application is responsible for
>checkpointing offsets. So, in your example, when you commit a database
>transaction, you can store your consumer's offset as part of the txn.
>So either your txn succeeds and the offset moves ahead or your txn
>fails and the offset stays where it is.
>
>Kafka 0.9 is when we will attempt to merge the high level and low
>level consumer APIs, move the offset management to the broker and
>offer stronger offset checkpointing guarantees.
>
>Thanks,
>Neha
>
>On Mon, Mar 25, 2013 at 11:36 AM, Darren Sargent
><ds...@richrelevance.com> wrote:
>> This is where you are reading messages from a broker, doing something
>>with the messages, then commit them to some permanent storage such as
>>HBase. There is a race condition in commiting the offsets to Zookeeper;
>>if the DB write succeeds, but the ZK commit fails for any reason, you'll
>>get a duplicate batch next time you query the broker. If you commit to
>>ZK first, and the commit to the DB then fails, you lose data.
>>
>> The Kafka white paper mentions that Kafka stays agnostic about the
>>distributed commit problem. There has been some prior discussion about
>>this but I haven't seen any solid solutions. If you're using something
>>like PostgreSQL that admits two-phase commits, you can roll the offset
>>into the DB transaction, assuming you're okay with storing offsets in
>>the DB rather than in ZK, but that's not a general solution.
>>
>> Is there anything in Kafka 0.8.x that helps address this issue?
>>
>> --Darren Sargent
>> RichRelevance (www.richrelevance.com)


Re: Any update on the "distributed commit" problem?

Posted by Neha Narkhede <ne...@gmail.com>.
Today, the only safe way of controlling consumer state management is
by using the SimpleConsumer. The application is responsible for
checkpointing offsets. So, in your example, when you commit a database
transaction, you can store your consumer's offset as part of the txn.
So either your txn succeeds and the offset moves ahead or your txn
fails and the offset stays where it is.

Kafka 0.9 is when we will attempt to merge the high level and low
level consumer APIs, move the offset management to the broker and
offer stronger offset checkpointing guarantees.

Thanks,
Neha

On Mon, Mar 25, 2013 at 11:36 AM, Darren Sargent
<ds...@richrelevance.com> wrote:
> This is where you are reading messages from a broker, doing something with the messages, then commit them to some permanent storage such as HBase. There is a race condition in commiting the offsets to Zookeeper; if the DB write succeeds, but the ZK commit fails for any reason, you'll get a duplicate batch next time you query the broker. If you commit to ZK first, and the commit to the DB then fails, you lose data.
>
> The Kafka white paper mentions that Kafka stays agnostic about the distributed commit problem. There has been some prior discussion about this but I haven't seen any solid solutions. If you're using something like PostgreSQL that admits two-phase commits, you can roll the offset into the DB transaction, assuming you're okay with storing offsets in the DB rather than in ZK, but that's not a general solution.
>
> Is there anything in Kafka 0.8.x that helps address this issue?
>
> --Darren Sargent
> RichRelevance (www.richrelevance.com)