You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ross Black <ro...@gmail.com> on 2021/03/16 04:34:09 UTC

Emit events that are NOT joined

Hi,

I am trying to find the best pattern to solve a specific problem using
Kafka streaming.  All of our current processing uses the Kafka streaming
API (using multiple joins, windows, repartitions etc) so I already think I
have a decent grasp of the fundamentals.

We have 2 streams of events:
- primary events (P), which indicate some key event in the system and carry
a large amount of data
- secondary events (S), which should *always* occur as a follow-on to the
primary event and only contain a reference to the single associated primary
event.

I want to join secondary events to primary events (the simple part) BUT I
also want to find out when secondary events have been *unable* to be joined.
A secondary is unable to be joined:
- when primary event delivery has been delayed (so that secondary events
are received before the associated primary event)
- when primary events go missing (the event collection system is noisy, so
we do lose a small bu significant number of primary events)
- due to coding errors in the collectors, where an incorrect reference has
been inserted into the secondary event

Currently this functionality is implemented using a database:
- primary events are inserted into the database and then secondary events
lookup the primary by-reference.  If the primary is found the secondary is
sent to a "JOINED" topic.
- if the primary is not found, the secondary event is buffered in the
database until the primary is received and then joined+emitted (and the
secondary event is removed from the DB)
- after some arbitrary time period, the database is queried for outstanding
not-joined secondary events and they are emitted to an "UNJOINED" topic.
This allows alerting on unmatched secondary events to drive quality
measures, and allows offline analysis (to understand why)

Some options:
1. Implement the same strategy as existing except using Kafka state stores
instead of the DB.  With this approach I am concerned about atomic
correctness - i.e. that state in the Kafka store can be managed so that the
event is never sent to both JOINED and UNJOINED.


2. Continually emit key-values to a "PENDING" topic for the secondary join.
An example sequence could be something like ...(where primary events = P,
secondary events = S) :
    a) receive S with no matching P => emit {S, false}
    b) receive matching P for S => emit {S, null} (to effectively delete it
from the topic)
    c) receive S with matching P => do not emit anything

Now the problem becomes more like building a time-window of events from
PENDING, to eventually emit the events to UNJOINED.  I am also uncertain as
how to ensure events can never end up in both JOINED and UNJOINED.

My apologies for the wall of text .. I find it a difficult problem to
explain. 😏


Is there some pattern I am missing that will help solve this problem?
Any help / other suggestions would be appreciated.

Thanks,
Ross

Re: Emit events that are NOT joined

Posted by Rubén Terceño <ru...@confluent.io.INVALID>.
Maybe it’s not what you are looking for, but I think that functionality can
be implemented in two steps.

First, you perform a LEFT join, and then you filter by “null” on the joined
field to identify those whose joins didn’t succeed.

HTH,

Rubén

El El mar, 16 mar 2021 a las 5:34, Ross Black <ro...@gmail.com>
escribiĂł:

> Hi,
>
> I am trying to find the best pattern to solve a specific problem using
> Kafka streaming.  All of our current processing uses the Kafka streaming
> API (using multiple joins, windows, repartitions etc) so I already think I
> have a decent grasp of the fundamentals.
>
> We have 2 streams of events:
> - primary events (P), which indicate some key event in the system and carry
> a large amount of data
> - secondary events (S), which should *always* occur as a follow-on to the
> primary event and only contain a reference to the single associated primary
> event.
>
> I want to join secondary events to primary events (the simple part) BUT I
> also want to find out when secondary events have been *unable* to be
> joined.
> A secondary is unable to be joined:
> - when primary event delivery has been delayed (so that secondary events
> are received before the associated primary event)
> - when primary events go missing (the event collection system is noisy, so
> we do lose a small bu significant number of primary events)
> - due to coding errors in the collectors, where an incorrect reference has
> been inserted into the secondary event
>
> Currently this functionality is implemented using a database:
> - primary events are inserted into the database and then secondary events
> lookup the primary by-reference.  If the primary is found the secondary is
> sent to a "JOINED" topic.
> - if the primary is not found, the secondary event is buffered in the
> database until the primary is received and then joined+emitted (and the
> secondary event is removed from the DB)
> - after some arbitrary time period, the database is queried for outstanding
> not-joined secondary events and they are emitted to an "UNJOINED" topic.
> This allows alerting on unmatched secondary events to drive quality
> measures, and allows offline analysis (to understand why)
>
> Some options:
> 1. Implement the same strategy as existing except using Kafka state stores
> instead of the DB.  With this approach I am concerned about atomic
> correctness - i.e. that state in the Kafka store can be managed so that the
> event is never sent to both JOINED and UNJOINED.
>
>
> 2. Continually emit key-values to a "PENDING" topic for the secondary join.
> An example sequence could be something like ...(where primary events = P,
> secondary events = S) :
>     a) receive S with no matching P => emit {S, false}
>     b) receive matching P for S => emit {S, null} (to effectively delete it
> from the topic)
>     c) receive S with matching P => do not emit anything
>
> Now the problem becomes more like building a time-window of events from
> PENDING, to eventually emit the events to UNJOINED.  I am also uncertain as
> how to ensure events can never end up in both JOINED and UNJOINED.
>
> My apologies for the wall of text .. I find it a difficult problem to
> explain. 😏
>
>
> Is there some pattern I am missing that will help solve this problem?
> Any help / other suggestions would be appreciated.
>
> Thanks,
> Ross
>
-- 
Rubén
--

<https://www.confluent.io>

Confluent
Data in Motion

Rubén Terceño Rodríguez

Director, Solutions Engineering, EMEA

+34 661 42 42 28

Follow us:  Blog
<https://confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog>
‱ Slack <https://slackpass.io/confluentcommunity> ‱ Twitter
<https://twitter.com/ConfluentInc> ‱ YouTube <https://youtube.com/confluent>

Re: Emit events that are NOT joined

Posted by Blake Miller <bl...@gmail.com>.
I think what you described can be accomplished using the low-level API
(perhaps by implementing a custom Transformer).

You would create your own store for this Transformer to use, and implement
your own join logic which includes emitting these records for secondary
events that couldn't be joined with their primary.

I can't think of a way to do this without using the low-level API, but this
could be my lack of imagination/knowledge.



On Mon, Mar 15, 2021, 9:35 PM Ross Black <ro...@gmail.com> wrote:

> Hi,
>
> I am trying to find the best pattern to solve a specific problem using
> Kafka streaming.  All of our current processing uses the Kafka streaming
> API (using multiple joins, windows, repartitions etc) so I already think I
> have a decent grasp of the fundamentals.
>
> We have 2 streams of events:
> - primary events (P), which indicate some key event in the system and carry
> a large amount of data
> - secondary events (S), which should *always* occur as a follow-on to the
> primary event and only contain a reference to the single associated primary
> event.
>
> I want to join secondary events to primary events (the simple part) BUT I
> also want to find out when secondary events have been *unable* to be
> joined.
> A secondary is unable to be joined:
> - when primary event delivery has been delayed (so that secondary events
> are received before the associated primary event)
> - when primary events go missing (the event collection system is noisy, so
> we do lose a small bu significant number of primary events)
> - due to coding errors in the collectors, where an incorrect reference has
> been inserted into the secondary event
>
> Currently this functionality is implemented using a database:
> - primary events are inserted into the database and then secondary events
> lookup the primary by-reference.  If the primary is found the secondary is
> sent to a "JOINED" topic.
> - if the primary is not found, the secondary event is buffered in the
> database until the primary is received and then joined+emitted (and the
> secondary event is removed from the DB)
> - after some arbitrary time period, the database is queried for outstanding
> not-joined secondary events and they are emitted to an "UNJOINED" topic.
> This allows alerting on unmatched secondary events to drive quality
> measures, and allows offline analysis (to understand why)
>
> Some options:
> 1. Implement the same strategy as existing except using Kafka state stores
> instead of the DB.  With this approach I am concerned about atomic
> correctness - i.e. that state in the Kafka store can be managed so that the
> event is never sent to both JOINED and UNJOINED.
>
>
> 2. Continually emit key-values to a "PENDING" topic for the secondary join.
> An example sequence could be something like ...(where primary events = P,
> secondary events = S) :
>     a) receive S with no matching P => emit {S, false}
>     b) receive matching P for S => emit {S, null} (to effectively delete it
> from the topic)
>     c) receive S with matching P => do not emit anything
>
> Now the problem becomes more like building a time-window of events from
> PENDING, to eventually emit the events to UNJOINED.  I am also uncertain as
> how to ensure events can never end up in both JOINED and UNJOINED.
>
> My apologies for the wall of text .. I find it a difficult problem to
> explain. 😏
>
>
> Is there some pattern I am missing that will help solve this problem?
> Any help / other suggestions would be appreciated.
>
> Thanks,
> Ross
>