You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Wladislaw Mitzel <mi...@tawadi.de> on 2017/07/11 12:02:57 UTC

Kafka streams: Record linkage / collecting all messages linked to one entity

Hi all. How would one approach the following scenario with Kafka streams?

There is one input topic. It has data from different sources in a normalized format. There is a need to join records that come from different sources but are linked to the same entity (record linkage). There is a deterministic rule-set to calculate (composed) "match-keys" for every incoming record that allow the correlation of records that are linked to the same entity.

Example: There are events A (userid,first name,last name,....), B(username, location,.....) and C(location, weatcher-data,....). There is a set of rules in order to correlate A with B (A.firstName+A.lastName = B.username) and B with C (B.location = C.location). At the end, we want to get the whole graph of correlated records.

Constraints: The latency of the records linkage should be as low as possible. The state stores should contain the messages of the last 180 days for linkage. (We are talking about tens to hundreds of GB of data)

I already implemented a solution with spark + an external database. I calculate the match-keys and then store mappings for event-id => list-of-match-keys, match-key => list-of-event-ids and event-id => event-payload in the database. By querying the database one can get a graph of "event -> match-keys -> more events" and so on. I do the querying in a loop until there are no new events added. As a last step, I read the payloads using the accumulated event-ids. However, this solution has a high latency because of the external database calls. That’s why the idea of having KTables as local state stores sounds so interesting to me.

Now with Kafka streams I would like to use the embedded state with KTables but I find it quite hard to come up with a solution. I think what I want to do is a self-join on the incoming topic which is not yet supported by the DSL. I thought of using the Processor API implementing a very similar solution to the one I described with spark: using several state stores for the mapping of event => match-keys, match-key => events. Beside the fact that I don't know how to address the partitioning (or whether I need a global store) I am not sure whether this is the way one would go with Kafka streams.

Another solution I could think of is a loop in the topology so that an event would flow several times through the loop (which again has KTables for the mapping of event-id and match-key) until there are no new matches. Are loops possible at all and if so, is it a good approach or should one avoid loops? At the end of the record linkage process I’d like to have *one* message that contains the payloads of all correlated events and is then processed by the downstream processors. However I can only think of solutions where I need to do a flatMap() (do a join for every match-key) so that there is more than one message.

Do you have any feedback or suggestions? Any examples that could help?

Kind regards,

Wladislaw

Re: Kafka streams: Record linkage / collecting all messages linked to one entity

Posted by Eno Thereska <en...@gmail.com>.
So a couple of things that can help hopefully:

- it's worth thinking about how to map the problem into KStreams, KTables and GlobalKTables. For example, events A seem static and read-only to me, and possibly the data footprint is small, so probably they should be represented in the system as a GlobalKTable. That has certain advantages to using a GlobalKTable (http://docs.confluent.io/current/streams/concepts.html#streams-concepts-globalktable <http://docs.confluent.io/current/streams/concepts.html#streams-concepts-globalktable>)

- whether to use KStreams or KTables for the other events depends on whether you want to interpret the records as, for example, "I only care about the current location of the user" (KTable) or "I care about the path the user has taken" (KStream is probably best).

Once the above mapping is in place, it's fine to do multi-joins (they will have to be done pair-wise though).

Hope this helps
Eno


> On 12 Jul 2017, at 14:26, Wladislaw Mitzel <mi...@tawadi.de> wrote:
> 
> Hello Eno,
> 
> I have to think through this approach. I could split the messages using the source attribute. However one challenge is the fact that I would need to do many joins. The example I gave is simplified. The real problem has about 10 sources of data. And there are various possible matches. A with B, B with C, A with D, C with E and so on. Furthermore there might be several match-keys in order to match A with B.
> 
> So based on this additional information I am wondering 
> 
> a. whether a "multi level join" is feasible in order to get the neighbors of the neighbors (correlate A with E using C)
> 
> b. how to cope with the fact that there are multiple possible match-keys in order to link two sources.
> 
> I am not sure whether I am in the right mind set and thinking in a "streaming way". The algorithm that is in my mind is based on a graph representation of the problem. Each message is a node. Each match-key is a node. Connect the messages with the match-keys using edges. Now the message nodes are connected through the match-key nodes. Each entity is defined by the graph that connects all messages that are linked together. 
> 
> Kind regards,
> 
> Wladislaw
> 
> Eno Thereska <en...@gmail.com> hat am 12. Juli 2017 um 00:23 geschrieben:
> 
> Hi Wladislaw,
> 
> Would splitting the one topic into multiple topics be acceptable at all? E.g., you could use the "branch" function in the DSL to split the messages and send to different topics. Then, once you have multiple topics you can do the joins etc.
> 
> Thoughts?
> 
> Thanks
> Eno
> 
> On 11 Jul 2017, at 05:02, Wladislaw Mitzel <mi...@tawadi.de> wrote:
> 
> Hi all. How would one approach the following scenario with Kafka streams?
> 
> There is one input topic. It has data from different sources in a normalized format. There is a need to join records that come from different sources but are linked to the same entity (record linkage). There is a deterministic rule-set to calculate (composed) "match-keys" for every incoming record that allow the correlation of records that are linked to the same entity.
> 
> Example: There are events A (userid,first name,last name,....), B(username, location,.....) and C(location, weatcher-data,....). There is a set of rules in order to correlate A with B (A.firstName+A.lastName = B.username) and B with C (B.location = C.location). At the end, we want to get the whole graph of correlated records.
> 
> Constraints: The latency of the records linkage should be as low as possible. The state stores should contain the messages of the last 180 days for linkage. (We are talking about tens to hundreds of GB of data)
> 
> I already implemented a solution with spark + an external database. I calculate the match-keys and then store mappings for event-id => list-of-match-keys, match-key => list-of-event-ids and event-id => event-payload in the database. By querying the database one can get a graph of "event -> match-keys -> more events" and so on. I do the querying in a loop until there are no new events added. As a last step, I read the payloads using the accumulated event-ids. However, this solution has a high latency because of the external database calls. That’s why the idea of having KTables as local state stores sounds so interesting to me.
> 
> Now with Kafka streams I would like to use the embedded state with KTables but I find it quite hard to come up with a solution. I think what I want to do is a self-join on the incoming topic which is not yet supported by the DSL. I thought of using the Processor API implementing a very similar solution to the one I described with spark: using several state stores for the mapping of event => match-keys, match-key => events. Beside the fact that I don't know how to address the partitioning (or whether I need a global store) I am not sure whether this is the way one would go with Kafka streams.
> 
> Another solution I could think of is a loop in the topology so that an event would flow several times through the loop (which again has KTables for the mapping of event-id and match-key) until there are no new matches. Are loops possible at all and if so, is it a good approach or should one avoid loops? At the end of the record linkage process I’d like to have *one* message that contains the payloads of all correlated events and is then processed by the downstream processors. However I can only think of solutions where I need to do a flatMap() (do a join for every match-key) so that there is more than one message.
> 
> Do you have any feedback or suggestions? Any examples that could help?
> 
> Kind regards,
> 
> Wladislaw
> 


Re: Kafka streams: Record linkage / collecting all messages linked to one entity

Posted by Wladislaw Mitzel <mi...@tawadi.de>.
Hello Eno,

I have to think through this approach. I could split the messages using the source attribute. However one challenge is the fact that I would need to do many joins. The example I gave is simplified. The real problem has about 10 sources of data. And there are various possible matches. A with B, B with C, A with D, C with E and so on. Furthermore there might be several match-keys in order to match A with B.

So based on this additional information I am wondering 

a. whether a "multi level join" is feasible in order to get the neighbors of the neighbors (correlate A with E using C)

b. how to cope with the fact that there are multiple possible match-keys in order to link two sources.

I am not sure whether I am in the right mind set and thinking in a "streaming way". The algorithm that is in my mind is based on a graph representation of the problem. Each message is a node. Each match-key is a node. Connect the messages with the match-keys using edges. Now the message nodes are connected through the match-key nodes. Each entity is defined by the graph that connects all messages that are linked together. 

Kind regards,

Wladislaw

> 
>     Eno Thereska <en...@gmail.com> hat am 12. Juli 2017 um 00:23 geschrieben:
> 
>     Hi Wladislaw,
> 
>     Would splitting the one topic into multiple topics be acceptable at all? E.g., you could use the "branch" function in the DSL to split the messages and send to different topics. Then, once you have multiple topics you can do the joins etc.
> 
>     Thoughts?
> 
>     Thanks
>     Eno
> 
>         > > 
> >         On 11 Jul 2017, at 05:02, Wladislaw Mitzel <mi...@tawadi.de> wrote:
> > 
> >         Hi all. How would one approach the following scenario with Kafka streams?
> > 
> >         There is one input topic. It has data from different sources in a normalized format. There is a need to join records that come from different sources but are linked to the same entity (record linkage). There is a deterministic rule-set to calculate (composed) "match-keys" for every incoming record that allow the correlation of records that are linked to the same entity.
> > 
> >         Example: There are events A (userid,first name,last name,....), B(username, location,.....) and C(location, weatcher-data,....). There is a set of rules in order to correlate A with B (A.firstName+A.lastName = B.username) and B with C (B.location = C.location). At the end, we want to get the whole graph of correlated records.
> > 
> >         Constraints: The latency of the records linkage should be as low as possible. The state stores should contain the messages of the last 180 days for linkage. (We are talking about tens to hundreds of GB of data)
> > 
> >         I already implemented a solution with spark + an external database. I calculate the match-keys and then store mappings for event-id => list-of-match-keys, match-key => list-of-event-ids and event-id => event-payload in the database. By querying the database one can get a graph of "event -> match-keys -> more events" and so on. I do the querying in a loop until there are no new events added. As a last step, I read the payloads using the accumulated event-ids. However, this solution has a high latency because of the external database calls. That’s why the idea of having KTables as local state stores sounds so interesting to me.
> > 
> >         Now with Kafka streams I would like to use the embedded state with KTables but I find it quite hard to come up with a solution. I think what I want to do is a self-join on the incoming topic which is not yet supported by the DSL. I thought of using the Processor API implementing a very similar solution to the one I described with spark: using several state stores for the mapping of event => match-keys, match-key => events. Beside the fact that I don't know how to address the partitioning (or whether I need a global store) I am not sure whether this is the way one would go with Kafka streams.
> > 
> >         Another solution I could think of is a loop in the topology so that an event would flow several times through the loop (which again has KTables for the mapping of event-id and match-key) until there are no new matches. Are loops possible at all and if so, is it a good approach or should one avoid loops? At the end of the record linkage process I’d like to have *one* message that contains the payloads of all correlated events and is then processed by the downstream processors. However I can only think of solutions where I need to do a flatMap() (do a join for every match-key) so that there is more than one message.
> > 
> >         Do you have any feedback or suggestions? Any examples that could help?
> > 
> >         Kind regards,
> > 
> >         Wladislaw
> > 
> >     > 

Re: Kafka streams: Record linkage / collecting all messages linked to one entity

Posted by Eno Thereska <en...@gmail.com>.
Hi Wladislaw,

Would splitting the one topic into multiple topics be acceptable at all? E.g., you could use the "branch" function in the DSL to split the messages and send to different topics. Then, once you have multiple topics you can do the joins etc.

Thoughts?

Thanks
Eno

> On 11 Jul 2017, at 05:02, Wladislaw Mitzel <mi...@tawadi.de> wrote:
> 
> Hi all. How would one approach the following scenario with Kafka streams?
> 
> There is one input topic. It has data from different sources in a normalized format. There is a need to join records that come from different sources but are linked to the same entity (record linkage). There is a deterministic rule-set to calculate (composed) "match-keys" for every incoming record that allow the correlation of records that are linked to the same entity.
> 
> Example: There are events A (userid,first name,last name,....), B(username, location,.....) and C(location, weatcher-data,....). There is a set of rules in order to correlate A with B (A.firstName+A.lastName = B.username) and B with C (B.location = C.location). At the end, we want to get the whole graph of correlated records.
> 
> Constraints: The latency of the records linkage should be as low as possible. The state stores should contain the messages of the last 180 days for linkage. (We are talking about tens to hundreds of GB of data)
> 
> I already implemented a solution with spark + an external database. I calculate the match-keys and then store mappings for event-id => list-of-match-keys, match-key => list-of-event-ids and event-id => event-payload in the database. By querying the database one can get a graph of "event -> match-keys -> more events" and so on. I do the querying in a loop until there are no new events added. As a last step, I read the payloads using the accumulated event-ids. However, this solution has a high latency because of the external database calls. That’s why the idea of having KTables as local state stores sounds so interesting to me.
> 
> Now with Kafka streams I would like to use the embedded state with KTables but I find it quite hard to come up with a solution. I think what I want to do is a self-join on the incoming topic which is not yet supported by the DSL. I thought of using the Processor API implementing a very similar solution to the one I described with spark: using several state stores for the mapping of event => match-keys, match-key => events. Beside the fact that I don't know how to address the partitioning (or whether I need a global store) I am not sure whether this is the way one would go with Kafka streams.
> 
> Another solution I could think of is a loop in the topology so that an event would flow several times through the loop (which again has KTables for the mapping of event-id and match-key) until there are no new matches. Are loops possible at all and if so, is it a good approach or should one avoid loops? At the end of the record linkage process I’d like to have *one* message that contains the payloads of all correlated events and is then processed by the downstream processors. However I can only think of solutions where I need to do a flatMap() (do a join for every match-key) so that there is more than one message.
> 
> Do you have any feedback or suggestions? Any examples that could help?
> 
> Kind regards,
> 
> Wladislaw