You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2017/05/01 17:14:17 UTC

Re: Kafka-Streams: Cogroup

Kyle,

What's your apache id? I can grant you the permission.


Guozhang


On Sat, Apr 29, 2017 at 7:33 AM, Kyle Winkelman <wi...@gmail.com>
wrote:

> I don't seem to have permission. When logged in I can neither edit the
> main page nor create an additional KIP.
>
> Thanks,
> Kyle
>
> On Thu, Apr 27, 2017 at 12:35 PM, Eno Thereska <en...@gmail.com>
> wrote:
>
>> Hi Kyle,
>>
>> I believe Guozhang has now given you permission to edit the KIP wiki at
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+
>> Improvement+Proposals. Could you see if you can add this there?
>>
>> Many thanks
>> Eno
>>
>> On Wed, Apr 26, 2017 at 6:00 PM, Kyle Winkelman <winkelman.kyle@gmail.com
>> > wrote:
>>
>>> Thank you for your reply.
>>>
>>> I have attached my first attempt at writing a KIP and I was wondering if
>>> you could review it and share your thoughts.
>>>
>>> Going forward I would like to create this KIP. I was wondering whom I
>>> should ask to get the necessary permissions on the wiki. Username:
>>> winkelman.kyle
>>>
>>>
>>>
>>> On Fri, Apr 21, 2017 at 3:15 PM, Eno Thereska <en...@gmail.com>
>>> wrote:
>>>
>>>> Hi Kyle,
>>>>
>>>> Sorry for the delay in replying. I think it's worth doing a KIP for
>>>> this one. One super helpful thing with KIPs is to list a few more scenarios
>>>> that would benefit from this approach. In particular it seems the main
>>>> benefit is from reducing the number of state stores. Does this necessarily
>>>> reduce the number of IOs to the stores (number of puts/gets), or the extra
>>>> space overheads with multiple stores. Quantifying that a bit would help.
>>>>
>>>> To answer your original questions:
>>>>
>>>> >The problem I am having with this approach is understanding if there
>>>> is a race condition. Obviously the source topics would be copartitioned.
>>>> But would it be multithreaded and possibly cause one of the processors to
>>>> grab patient 1 at the same time a different processor has grabbed patient 1?
>>>>
>>>>
>>>> I don't think there will be a problem here. A processor cannot be
>>>> accessed by multiple threads in Kafka Streams.
>>>>
>>>>
>>>> >My understanding is that for each partition there would be a single
>>>> complete set of processors and a new incoming record would go completely
>>>> through the processor topology from a source node to a sink node before the
>>>> next one is sent through. Is this correct?
>>>>
>>>> This is mostly true, however if caching is enabled (for dedupping, see
>>>> KIP-63), then a record may reside in a cache before going to the sink.
>>>> Meanwhile another record can come in. So multiple records can be in the
>>>> topology at the same time.
>>>>
>>>> Thanks
>>>> Eno
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Apr 14, 2017 at 8:16 PM, Kyle Winkelman <
>>>> winkelman.kyle@gmail.com> wrote:
>>>>
>>>>> Eno,
>>>>> Thanks for the response. The figure was just a restatement of my
>>>>> questions. I have made an attempt at a low level processor and it appears
>>>>> to work but it isn't very pretty and was hoping for something at the
>>>>> streams api level.
>>>>>
>>>>> I have written some code to show an example of how I see the Cogroup
>>>>> working in kafka.
>>>>>
>>>>> First the KGroupedStream would have a cogroup method that takes the
>>>>> initializer and the aggregator for that specific KGroupedStream. This would
>>>>> return a KCogroupedStream that has 2 methods one to add more
>>>>> KGroupedStream, Aggregator pairs and one to complete the construction and
>>>>> return a KTable.
>>>>>
>>>>> builder.stream("topic").groupByKey ().cogroup(Initializer,
>>>>> Aggregator, aggValueSerde, storeName).cogroup(groupedStream1,
>>>>> Aggregator1).cogroup(groupedStream2, Aggregator2).aggregate();
>>>>>
>>>>> Behind the scenes we create a KStreamAggregate for each
>>>>> KGroupedStream, Aggregator pair. Then a final pass through processor to
>>>>> pass on the aggregate values. This gives us a KTable backed by a single
>>>>> store that is used in all of the processors.
>>>>>
>>>>> Please let me know if this is something you think would add value to
>>>>> kafka streams. And I will try to create a KIP to foster more communication.
>>>>>
>>>>> You can take a look at what I have. I think it's missing a fair amount
>>>>> but it's a good start. I took the doAggregate method in KGroupedStream as
>>>>> my starting point and expanded on it for multiple streams:
>>>>> https://github.com/KyleWinkelman/kafka/tree/cogroup
>>>>>
>>>>>
>>>>
>>>
>>
>


-- 
-- Guozhang