You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Daniel Schierbeck <da...@gmail.com> on 2015/01/05 11:47:50 UTC

Optimistic locking

I'm trying to design a system that uses Kafka as its primary data store by
persisting immutable events into a topic and keeping a secondary index in
another data store. The secondary index would store the "entities". Each
event would pertain to some "entity", e.g. a user, and those entities are
stored in an easily queriable way.

Kafka seems well suited for this, but there's one thing I'm having problems
with. I cannot guarantee that only one process writes events about an
entity, which makes the design vulnerable to integrity issues.

For example, say that a user can have multiple email addresses assigned,
and the EmailAddressRemoved event is published when the user removes one.
There's an integrity constraint, though: every user MUST have at least one
email address. As far as I can see, there's no way to stop two separate
processes from looking up a user entity, seeing that there are two email
addresses assigned, and each publish an event. The end result would violate
the contraint.

If I'm wrong in saying that this isn't possible I'd love some feedback!

My current thinking is that Kafka could relatively easily support this kind
of application with a small additional API. Kafka already has the abstract
notion of entities through its key-based retention policy. If the produce
API was modified in order to allow an integer OffsetConstraint, the
following algorithm could determine whether the request should proceed:

1. For every key seen, keep track of the offset of the latest message
referencing the key.
2. When an OffsetContraint is specified in the produce API call, compare
that value with the latest offset for the message key.
2.1. If they're identical, allow the operation to continue.
2.2. If they're not identical, fail with some OptimisticLockingFailure.

Would such a feature be completely out of scope for Kafka?

Best regards,
Daniel Schierbeck

Re: Optimistic locking

Posted by Daniel Schierbeck <da...@gmail.com>.
Reading the source code, it seems that kafka.log.LogCleaner builds a
kafka.log.OffsetMap only when compacting logs, not ahead of time, so the
information is not available at write time :-/

If I'm not mistaken, this also means that the cleaner needs _two_ passes to
clean a log segment, one to build up the key->last_offset map and one to
actually remove "expired" messages. Having the map be available up front
could make that be a single-pass algorithm. Especially if the segments are
too large to keep in memory this should be considerable faster, at the cost
of having to maintain the mapping for each write.

I'd be willing to do the coding if you think it's a good idea.

On Mon Jan 05 2015 at 2:24:00 PM Daniel Schierbeck <
daniel.schierbeck@gmail.com> wrote:

> Reading your reply again, I'd like to address this:
>
> > Also, it seems like a conflation of concerns - in an event sourcing
> model, we save the immutable event, and represent current state in
> another, separate data structure.
>
> That's the idea – events are stored in Kafka and processed sequentially by
> a single process per partition. Those processes update a secondary store
> (e.g. MySQL) with the "current" state. However, when determining whether an
> event is valid, the "current" state must be taken into account. E.g. you
> can only withdraw money from an open account, so the event sequence
> AccountClosed -> MoneyWithdrawn is invalid. The only way I can think of to
> ensure this is the case is to have optimistic locking. Each account would
> have a unique key, and in order to write an event to Kafka the secondary
> store *must* be up-to-date with the previous events for that key. If not,
> it should wait a bit and try again, re-validating the input against the new
> state of the world. Is that a completely hopeless idea?
>
> On Mon Jan 05 2015 at 2:18:49 PM Daniel Schierbeck <
> daniel.schierbeck@gmail.com> wrote:
>
>> Regarding the use case – some events may only be valid given a specific
>> state of the world, e.g. a NewComment event and a PostClosedForComments
>> event would be valid only in that order. If two producer processes (e.g.
>> two HTTP application processes) tries to write an event each, you may get
>> integrity issues depending on the order. In this case, knowing that you
>> *must* build upon that latest event for a key will provide application
>> authors a way to ensure integrity.
>>
>> I'm not quite sure I understand the challenge in the implementation – how
>> is key-based message expiration implemented, if not with the some mapping
>> of message keys to the last written offset? How else would you know whether
>> the message you're examining is the last or not?
>>
>> On Mon Jan 05 2015 at 1:41:39 PM Colin Clark <co...@clark.ws> wrote:
>>
>>> Hi,
>>>
>>> Couple of comments on this.
>>>
>>> What you're proposing is difficult to do at scale and would require some
>>> type of Paxos style algorithm for the update only if different - it would
>>> be easier in that case to just go ahead and do the update.
>>>
>>> Also, it seems like a conflation of concerns - in an event sourcing
>>> model,
>>> we save the immutable event, and represent current state in another,
>>> separate data structure.  Perhaps cassandra would work well here - that
>>> data model night provide what you're looking for out of the box.
>>>
>>> Just as I don't recommend people use data stores as queuing mechanisms, I
>>> also recommend not using a queuing mechanism as a primary datastore -
>>> mixed
>>> semantics.
>>>
>>> --
>>> *Colin*
>>> +1 612 859-6129
>>>
>>>
>>> On Mon, Jan 5, 2015 at 4:47 AM, Daniel Schierbeck <
>>> daniel.schierbeck@gmail.com> wrote:
>>>
>>> > I'm trying to design a system that uses Kafka as its primary data
>>> store by
>>> > persisting immutable events into a topic and keeping a secondary index
>>> in
>>> > another data store. The secondary index would store the "entities".
>>> Each
>>> > event would pertain to some "entity", e.g. a user, and those entities
>>> are
>>> > stored in an easily queriable way.
>>> >
>>> > Kafka seems well suited for this, but there's one thing I'm having
>>> problems
>>> > with. I cannot guarantee that only one process writes events about an
>>> > entity, which makes the design vulnerable to integrity issues.
>>> >
>>> > For example, say that a user can have multiple email addresses
>>> assigned,
>>> > and the EmailAddressRemoved event is published when the user removes
>>> one.
>>> > There's an integrity constraint, though: every user MUST have at least
>>> one
>>> > email address. As far as I can see, there's no way to stop two separate
>>> > processes from looking up a user entity, seeing that there are two
>>> email
>>> > addresses assigned, and each publish an event. The end result would
>>> violate
>>> > the contraint.
>>> >
>>> > If I'm wrong in saying that this isn't possible I'd love some feedback!
>>> >
>>> > My current thinking is that Kafka could relatively easily support this
>>> kind
>>> > of application with a small additional API. Kafka already has the
>>> abstract
>>> > notion of entities through its key-based retention policy. If the
>>> produce
>>> > API was modified in order to allow an integer OffsetConstraint, the
>>> > following algorithm could determine whether the request should proceed:
>>> >
>>> > 1. For every key seen, keep track of the offset of the latest message
>>> > referencing the key.
>>> > 2. When an OffsetContraint is specified in the produce API call,
>>> compare
>>> > that value with the latest offset for the message key.
>>> > 2.1. If they're identical, allow the operation to continue.
>>> > 2.2. If they're not identical, fail with some OptimisticLockingFailure.
>>> >
>>> > Would such a feature be completely out of scope for Kafka?
>>> >
>>> > Best regards,
>>> > Daniel Schierbeck
>>> >
>>>
>>

Re: Optimistic locking

Posted by Daniel Schierbeck <da...@gmail.com>.
Reading your reply again, I'd like to address this:

> Also, it seems like a conflation of concerns - in an event sourcing
model, we save the immutable event, and represent current state in
another, separate
data structure.

That's the idea – events are stored in Kafka and processed sequentially by
a single process per partition. Those processes update a secondary store
(e.g. MySQL) with the "current" state. However, when determining whether an
event is valid, the "current" state must be taken into account. E.g. you
can only withdraw money from an open account, so the event sequence
AccountClosed -> MoneyWithdrawn is invalid. The only way I can think of to
ensure this is the case is to have optimistic locking. Each account would
have a unique key, and in order to write an event to Kafka the secondary
store *must* be up-to-date with the previous events for that key. If not,
it should wait a bit and try again, re-validating the input against the new
state of the world. Is that a completely hopeless idea?

On Mon Jan 05 2015 at 2:18:49 PM Daniel Schierbeck <
daniel.schierbeck@gmail.com> wrote:

> Regarding the use case – some events may only be valid given a specific
> state of the world, e.g. a NewComment event and a PostClosedForComments
> event would be valid only in that order. If two producer processes (e.g.
> two HTTP application processes) tries to write an event each, you may get
> integrity issues depending on the order. In this case, knowing that you
> *must* build upon that latest event for a key will provide application
> authors a way to ensure integrity.
>
> I'm not quite sure I understand the challenge in the implementation – how
> is key-based message expiration implemented, if not with the some mapping
> of message keys to the last written offset? How else would you know whether
> the message you're examining is the last or not?
>
> On Mon Jan 05 2015 at 1:41:39 PM Colin Clark <co...@clark.ws> wrote:
>
>> Hi,
>>
>> Couple of comments on this.
>>
>> What you're proposing is difficult to do at scale and would require some
>> type of Paxos style algorithm for the update only if different - it would
>> be easier in that case to just go ahead and do the update.
>>
>> Also, it seems like a conflation of concerns - in an event sourcing model,
>> we save the immutable event, and represent current state in another,
>> separate data structure.  Perhaps cassandra would work well here - that
>> data model night provide what you're looking for out of the box.
>>
>> Just as I don't recommend people use data stores as queuing mechanisms, I
>> also recommend not using a queuing mechanism as a primary datastore -
>> mixed
>> semantics.
>>
>> --
>> *Colin*
>> +1 612 859-6129
>>
>>
>> On Mon, Jan 5, 2015 at 4:47 AM, Daniel Schierbeck <
>> daniel.schierbeck@gmail.com> wrote:
>>
>> > I'm trying to design a system that uses Kafka as its primary data store
>> by
>> > persisting immutable events into a topic and keeping a secondary index
>> in
>> > another data store. The secondary index would store the "entities". Each
>> > event would pertain to some "entity", e.g. a user, and those entities
>> are
>> > stored in an easily queriable way.
>> >
>> > Kafka seems well suited for this, but there's one thing I'm having
>> problems
>> > with. I cannot guarantee that only one process writes events about an
>> > entity, which makes the design vulnerable to integrity issues.
>> >
>> > For example, say that a user can have multiple email addresses assigned,
>> > and the EmailAddressRemoved event is published when the user removes
>> one.
>> > There's an integrity constraint, though: every user MUST have at least
>> one
>> > email address. As far as I can see, there's no way to stop two separate
>> > processes from looking up a user entity, seeing that there are two email
>> > addresses assigned, and each publish an event. The end result would
>> violate
>> > the contraint.
>> >
>> > If I'm wrong in saying that this isn't possible I'd love some feedback!
>> >
>> > My current thinking is that Kafka could relatively easily support this
>> kind
>> > of application with a small additional API. Kafka already has the
>> abstract
>> > notion of entities through its key-based retention policy. If the
>> produce
>> > API was modified in order to allow an integer OffsetConstraint, the
>> > following algorithm could determine whether the request should proceed:
>> >
>> > 1. For every key seen, keep track of the offset of the latest message
>> > referencing the key.
>> > 2. When an OffsetContraint is specified in the produce API call, compare
>> > that value with the latest offset for the message key.
>> > 2.1. If they're identical, allow the operation to continue.
>> > 2.2. If they're not identical, fail with some OptimisticLockingFailure.
>> >
>> > Would such a feature be completely out of scope for Kafka?
>> >
>> > Best regards,
>> > Daniel Schierbeck
>> >
>>
>

Re: Optimistic locking

Posted by Daniel Schierbeck <da...@gmail.com>.
Regarding the use case – some events may only be valid given a specific
state of the world, e.g. a NewComment event and a PostClosedForComments
event would be valid only in that order. If two producer processes (e.g.
two HTTP application processes) tries to write an event each, you may get
integrity issues depending on the order. In this case, knowing that you
*must* build upon that latest event for a key will provide application
authors a way to ensure integrity.

I'm not quite sure I understand the challenge in the implementation – how
is key-based message expiration implemented, if not with the some mapping
of message keys to the last written offset? How else would you know whether
the message you're examining is the last or not?

On Mon Jan 05 2015 at 1:41:39 PM Colin Clark <co...@clark.ws> wrote:

> Hi,
>
> Couple of comments on this.
>
> What you're proposing is difficult to do at scale and would require some
> type of Paxos style algorithm for the update only if different - it would
> be easier in that case to just go ahead and do the update.
>
> Also, it seems like a conflation of concerns - in an event sourcing model,
> we save the immutable event, and represent current state in another,
> separate data structure.  Perhaps cassandra would work well here - that
> data model night provide what you're looking for out of the box.
>
> Just as I don't recommend people use data stores as queuing mechanisms, I
> also recommend not using a queuing mechanism as a primary datastore - mixed
> semantics.
>
> --
> *Colin*
> +1 612 859-6129
>
>
> On Mon, Jan 5, 2015 at 4:47 AM, Daniel Schierbeck <
> daniel.schierbeck@gmail.com> wrote:
>
> > I'm trying to design a system that uses Kafka as its primary data store
> by
> > persisting immutable events into a topic and keeping a secondary index in
> > another data store. The secondary index would store the "entities". Each
> > event would pertain to some "entity", e.g. a user, and those entities are
> > stored in an easily queriable way.
> >
> > Kafka seems well suited for this, but there's one thing I'm having
> problems
> > with. I cannot guarantee that only one process writes events about an
> > entity, which makes the design vulnerable to integrity issues.
> >
> > For example, say that a user can have multiple email addresses assigned,
> > and the EmailAddressRemoved event is published when the user removes one.
> > There's an integrity constraint, though: every user MUST have at least
> one
> > email address. As far as I can see, there's no way to stop two separate
> > processes from looking up a user entity, seeing that there are two email
> > addresses assigned, and each publish an event. The end result would
> violate
> > the contraint.
> >
> > If I'm wrong in saying that this isn't possible I'd love some feedback!
> >
> > My current thinking is that Kafka could relatively easily support this
> kind
> > of application with a small additional API. Kafka already has the
> abstract
> > notion of entities through its key-based retention policy. If the produce
> > API was modified in order to allow an integer OffsetConstraint, the
> > following algorithm could determine whether the request should proceed:
> >
> > 1. For every key seen, keep track of the offset of the latest message
> > referencing the key.
> > 2. When an OffsetContraint is specified in the produce API call, compare
> > that value with the latest offset for the message key.
> > 2.1. If they're identical, allow the operation to continue.
> > 2.2. If they're not identical, fail with some OptimisticLockingFailure.
> >
> > Would such a feature be completely out of scope for Kafka?
> >
> > Best regards,
> > Daniel Schierbeck
> >
>

Re: Optimistic locking

Posted by Colin Clark <co...@clark.ws>.
Hi,

Couple of comments on this.

What you're proposing is difficult to do at scale and would require some
type of Paxos style algorithm for the update only if different - it would
be easier in that case to just go ahead and do the update.

Also, it seems like a conflation of concerns - in an event sourcing model,
we save the immutable event, and represent current state in another,
separate data structure.  Perhaps cassandra would work well here - that
data model night provide what you're looking for out of the box.

Just as I don't recommend people use data stores as queuing mechanisms, I
also recommend not using a queuing mechanism as a primary datastore - mixed
semantics.

--
*Colin*
+1 612 859-6129


On Mon, Jan 5, 2015 at 4:47 AM, Daniel Schierbeck <
daniel.schierbeck@gmail.com> wrote:

> I'm trying to design a system that uses Kafka as its primary data store by
> persisting immutable events into a topic and keeping a secondary index in
> another data store. The secondary index would store the "entities". Each
> event would pertain to some "entity", e.g. a user, and those entities are
> stored in an easily queriable way.
>
> Kafka seems well suited for this, but there's one thing I'm having problems
> with. I cannot guarantee that only one process writes events about an
> entity, which makes the design vulnerable to integrity issues.
>
> For example, say that a user can have multiple email addresses assigned,
> and the EmailAddressRemoved event is published when the user removes one.
> There's an integrity constraint, though: every user MUST have at least one
> email address. As far as I can see, there's no way to stop two separate
> processes from looking up a user entity, seeing that there are two email
> addresses assigned, and each publish an event. The end result would violate
> the contraint.
>
> If I'm wrong in saying that this isn't possible I'd love some feedback!
>
> My current thinking is that Kafka could relatively easily support this kind
> of application with a small additional API. Kafka already has the abstract
> notion of entities through its key-based retention policy. If the produce
> API was modified in order to allow an integer OffsetConstraint, the
> following algorithm could determine whether the request should proceed:
>
> 1. For every key seen, keep track of the offset of the latest message
> referencing the key.
> 2. When an OffsetContraint is specified in the produce API call, compare
> that value with the latest offset for the message key.
> 2.1. If they're identical, allow the operation to continue.
> 2.2. If they're not identical, fail with some OptimisticLockingFailure.
>
> Would such a feature be completely out of scope for Kafka?
>
> Best regards,
> Daniel Schierbeck
>