You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Winkelman, Kyle G" <ky...@optum.com> on 2017/04/13 18:26:21 UTC

Kafka-Streams: Cogroup

Hello,

I am wondering if there is any way to aggregate together many streams at once to build a larger object. Example (Healthcare Domain):
I have streams of Medical, Pharmacy, and Lab claims. Key is PatientId, Value is a different Avro Record for each stream.
I was hoping there was a way to supply a single Initializer, () -> new Patient(), and 3 aggregators, (key, value, patient) -> patient.add******Claim(value).

Currently the only way that I see to do the above use case is by aggregating each individual stream then joining them. This doesn't scale well with a large number of input streams because for each stream I would be creating another state store.

I was hoping to get thoughts on a KCogroupedStream api. I have spent a little time conceptualizing it.

Approach 1:
In KGroupedStream add a cogroup method that takes the single initializer, a list of other kgroupedstreams, and a list of other aggregators.
This would then all flow through a single processor and a have a single backing state store.
The aggregator that the object will get sent to is determined by the context().topic() which we should be able to trace back to one of the kgroupedstreams in the list.

The problem I am having with this approach is that because everything is going through the single processors and java doesn't do the best with generic types. I have to either pass in a list of Type objects for casting the object before sending it to the aggregator or I must create aggregators that accept an object and cast them to the appropriate type.

Approach 2:
Create one processor for each aggregator and have a single state store. Then have a single KStreamPassThrough that just passes on the new aggregate value.
The positive for this is you know which stream it will be coming from and won't need to do the context().topic() trick.

The problem I am having with this approach is understanding if there is a race condition. Obviously the source topics would be copartitioned. But would it be multithreaded and possibly cause one of the processors to grab patient 1 at the same time a different processor has grabbed patient 1?
My understanding is that for each partition there would be a single complete set of processors and a new incoming record would go completely through the processor topology from a source node to a sink node before the next one is sent through. Is this correct?

[cid:image002.png@01D2B45F.53169F50]

If anyone has any additional ideas about this let me know. I don't know if I have the time to actually create this api so if someone likes the idea and wants to develop it feel free.

This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.

Re: Kafka-Streams: Cogroup

Posted by Eno Thereska <en...@gmail.com>.
Hi Kyle, (cc-ing user list as well)

This could be an interesting scenario. Two things to help us think through it some more: 1) it seems you attached a figure, but I cannot seem to open it. 2) what about using the low level processor API instead of the DSL as approach 3? Do you have any thoughts on that?

Thanks
Eno

> On 13 Apr 2017, at 11:26, Winkelman, Kyle G <ky...@optum.com> wrote:
> 
> Hello,
>  
> I am wondering if there is any way to aggregate together many streams at once to build a larger object. Example (Healthcare Domain):
> I have streams of Medical, Pharmacy, and Lab claims. Key is PatientId, Value is a different Avro Record for each stream.
> I was hoping there was a way to supply a single Initializer, () -> new Patient(), and 3 aggregators, (key, value, patient) -> patient.add******Claim(value).
>  
> Currently the only way that I see to do the above use case is by aggregating each individual stream then joining them. This doesn’t scale well with a large number of input streams because for each stream I would be creating another state store.
>  
> I was hoping to get thoughts on a KCogroupedStream api. I have spent a little time conceptualizing it.
>  
> Approach 1:
> In KGroupedStream add a cogroup method that takes the single initializer, a list of other kgroupedstreams, and a list of other aggregators.
> This would then all flow through a single processor and a have a single backing state store.
> The aggregator that the object will get sent to is determined by the context().topic() which we should be able to trace back to one of the kgroupedstreams in the list.
>  
> The problem I am having with this approach is that because everything is going through the single processors and java doesn’t do the best with generic types. I have to either pass in a list of Type objects for casting the object before sending it to the aggregator or I must create aggregators that accept an object and cast them to the appropriate type.
>  
> Approach 2:
> Create one processor for each aggregator and have a single state store. Then have a single KStreamPassThrough that just passes on the new aggregate value.
> The positive for this is you know which stream it will be coming from and won’t need to do the context().topic() trick.
>  
> The problem I am having with this approach is understanding if there is a race condition. Obviously the source topics would be copartitioned. But would it be multithreaded and possibly cause one of the processors to grab patient 1 at the same time a different processor has grabbed patient 1?
> My understanding is that for each partition there would be a single complete set of processors and a new incoming record would go completely through the processor topology from a source node to a sink node before the next one is sent through. Is this correct?
>  
> 
>  
> If anyone has any additional ideas about this let me know. I don’t know if I have the time to actually create this api so if someone likes the idea and wants to develop it feel free.
> 
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
> 


Re: Kafka-Streams: Cogroup

Posted by Eno Thereska <en...@gmail.com>.
Hi Kyle, (cc-ing user list as well)

This could be an interesting scenario. Two things to help us think through it some more: 1) it seems you attached a figure, but I cannot seem to open it. 2) what about using the low level processor API instead of the DSL as approach 3? Do you have any thoughts on that?

Thanks
Eno

> On 13 Apr 2017, at 11:26, Winkelman, Kyle G <ky...@optum.com> wrote:
> 
> Hello,
>  
> I am wondering if there is any way to aggregate together many streams at once to build a larger object. Example (Healthcare Domain):
> I have streams of Medical, Pharmacy, and Lab claims. Key is PatientId, Value is a different Avro Record for each stream.
> I was hoping there was a way to supply a single Initializer, () -> new Patient(), and 3 aggregators, (key, value, patient) -> patient.add******Claim(value).
>  
> Currently the only way that I see to do the above use case is by aggregating each individual stream then joining them. This doesn’t scale well with a large number of input streams because for each stream I would be creating another state store.
>  
> I was hoping to get thoughts on a KCogroupedStream api. I have spent a little time conceptualizing it.
>  
> Approach 1:
> In KGroupedStream add a cogroup method that takes the single initializer, a list of other kgroupedstreams, and a list of other aggregators.
> This would then all flow through a single processor and a have a single backing state store.
> The aggregator that the object will get sent to is determined by the context().topic() which we should be able to trace back to one of the kgroupedstreams in the list.
>  
> The problem I am having with this approach is that because everything is going through the single processors and java doesn’t do the best with generic types. I have to either pass in a list of Type objects for casting the object before sending it to the aggregator or I must create aggregators that accept an object and cast them to the appropriate type.
>  
> Approach 2:
> Create one processor for each aggregator and have a single state store. Then have a single KStreamPassThrough that just passes on the new aggregate value.
> The positive for this is you know which stream it will be coming from and won’t need to do the context().topic() trick.
>  
> The problem I am having with this approach is understanding if there is a race condition. Obviously the source topics would be copartitioned. But would it be multithreaded and possibly cause one of the processors to grab patient 1 at the same time a different processor has grabbed patient 1?
> My understanding is that for each partition there would be a single complete set of processors and a new incoming record would go completely through the processor topology from a source node to a sink node before the next one is sent through. Is this correct?
>  
> 
>  
> If anyone has any additional ideas about this let me know. I don’t know if I have the time to actually create this api so if someone likes the idea and wants to develop it feel free.
> 
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>