You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Mike Freyberger <mf...@appnexus.com> on 2018/06/20 20:07:23 UTC

[DISCUSS] KIP-315: Stream Join Sticky Assignor

Hi everybody,

I’ve created a proposal document for KIP-315 which outlines the motivation of adding a new partition assignment strategy that can used for streaming join use cases.

It’d be great to get feedback on the overall idea and the proposed implementation.

KIP Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor

Thanks,

Mike

Re: [DISCUSS] KIP-315: Stream Join Sticky Assignor

Posted by "Matthias J. Sax" <ma...@confluent.io>.
A `StickyRangeAssignor` is not the same as a `JoinAssignor` that needs
to do co-partitioning.

Also, does "sticky" assignment and "range" assignment not contradict
each other a little bit?


>> Regarding how I am using a KV store instead of a kafka compacted topic: I simply prepend my keys with the incoming kafka partition, so on partition assignment I can scan the KV store for all keys within the assigned partition.

I cannot follow here. If you are using local state stores (or is this
assumption wrong?) and you scale, you need to move state around. How do
you do this? And if you don't use local state, why do you need
co-partitioning?

> Any user that wants to take advantage of KIP-429 needs to use a sticky assignor.

Yes. Still, I don't see a strong use case to add a `JoinStickyAssignor`
to the project. The consumer has a plugable interface to allow adding
your own assignment strategy. To me, your use case seems to fall into
this category.


-Matthias



On 5/6/19 4:40 PM, Mike Freyberger wrote:
> Hi Matthias,
> 
> Once KIP-429 is released,  all non-sticky assignors will be as useful. Any user that wants to take advantage of KIP-429 needs to use a sticky assignor. Currently there is only 1 sticky assignor in the kafka project, which is similar to RoundRobinAssignor, but a sticky verion. I imagine there will be users who currently use RangeAssignor but want to take advantage of KIP-429. So, having more directly accessible sticky assignors will allow for more users to take advantage of KIP-429, without being forced to use Kafka Streams. Maybe I should reframe the KIP to essentially being a sticky version of RangeAssignor?
> 
> Regarding how I am using a KV store instead of a kafka compacted topic: I simply prepend my keys with the incoming kafka partition, so on partition assignment I can scan the KV store for all keys within the assigned partition.
> 
> Mike
> 
> On 4/30/19, 6:49 AM, "Matthias J. Sax" <ma...@confluent.io> wrote:
> 
>     Mike,
>     
>     I am still not sure, why we need to add this assignor to the project.
>     Even after you pointed out that you cannot use Kafka Streams, the idea
>     of the consumer to make the `PartitionAssignor` interface public and
>     plugable is, that the project does not need to add strategies for all
>     kind of use cases, but that people can customize the assignors to their
>     needs.
>     
>     My main question is: how generic is this use case (especially with Kafka
>     Streams offering joins out-of-the-box) and do we really need to add it?
>     So far, it seems ok to me, if you just write a custom assignor and plug
>     it into the consumer. I don't see a strong need to add it to the Kafka
>     code base. Basically, it breaks down to
>     
>     - How many people use joins?
>     - How many people can or can't use Kafka Streams joins?
>     - To what extend can Kafka Streams be improved to increase the use-case
>     coverage?
>     - How many people would benefit? (ie, even with adding this assignor,
>     they might still be users who need to customize their own assignors
>     because their join-use-case is still different to yours.)
>     
>     
>     Also note, that in Kafka Streams you could still provide a custom state
>     store implementation (with or without using a compacted changelog) and a
>     `Processor` or `Transformer` to implement a custom join. Even if this
>     might not work for your specific case, it might work for many other
>     people who want to customer a join instead of using Kafka Streams'
>     out-of-the-box join.
>     
>     
>     Can you elaborate why you think it needs to be part of Kafka directly?
>     
>     
>     One last question:
>     
>     > - Our state has a high eviction rate, so kafka compacted topics are not ideal for storing the changelog. The compaction cannot keep up and the topic will be majority tombstones when it is read on partition reassignment. We are using a KV store the "change log" instead.
>     
>     What do you mean by 'We are using a KV store the "change log" instead.'?
>     How to you handle reassignment and state movement? Curious to see if we
>     could improve Kafka Streams :)
>     
>     
>     -Matthias
>     
>     
>     On 4/30/19 3:09 AM, Mike Freyberger wrote:
>     > In light of KIP-429, I think there will be an increased demand for sticky assignors. So, I'd like to restart the conversation about adding the sticky streams assignor, https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor. 
>     > 
>     > It’d be great to get feedback on the overall idea and the proposed implementation.
>     > 
>     > Thanks,
>     > 
>     > Mike
>     > 
>     > 
>     > On 6/20/18, 5:47 PM, "Mike Freyberger" <mf...@appnexus.com> wrote:
>     > 
>     >     Matthias, 
>     >     
>     >     Thanks for the feedback. For our use case, we have some complexities that make using the existing Streams API more complicated than using the Kafka Consumer directly. 
>     >     
>     >     - We are doing async processing, which I don't think is currently available (KIP-311 is handling this). 
>     >     
>     >     - Our state has a high eviction rate, so kafka compacted topics are not ideal for storing the changelog. The compaction cannot keep up and the topic will be majority tombstones when it is read on partition reassignment. We are using a KV store the "change log" instead.
>     >     
>     >     - We wanted to separate consumer threads from worker threads to maximize parallelization while keeping consumer TCP connections down.
>     >     
>     >     Ultimately, it was much simpler to use the KafkaConsumer directly rather than peel away a lot of what Streams API does for you. I think we should continue to add support for more complex use cases and processing to the Streams API. However, I think there will remain streaming join use cases that can benefit from the flexibility that comes from using the KafkaConsumer directly. 
>     >     
>     >     Mike
>     >     
>     >     On 6/20/18, 5:08 PM, "Matthias J. Sax" <ma...@confluent.io> wrote:
>     >     
>     >         Mike,
>     >         
>     >         thanks a lot for the KIP. I am wondering, why Streams API cannot be used
>     >         for perform the join? Would be good to understand the advantage of
>     >         adding a `StickyStreamJoinAssignor` compared to using Streams API? Atm,
>     >         it seems to be a redundant feature to me.
>     >         
>     >         -Matthias
>     >         
>     >         
>     >         On 6/20/18 1:07 PM, Mike Freyberger wrote:
>     >         > Hi everybody,
>     >         > 
>     >         > I’ve created a proposal document for KIP-315 which outlines the motivation of adding a new partition assignment strategy that can used for streaming join use cases.
>     >         > 
>     >         > It’d be great to get feedback on the overall idea and the proposed implementation.
>     >         > 
>     >         > KIP Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor
>     >         > 
>     >         > Thanks,
>     >         > 
>     >         > Mike
>     >         > 
>     >         
>     >         
>     >     
>     >     
>     > 
>     
>     
> 


Re: [DISCUSS] KIP-315: Stream Join Sticky Assignor

Posted by Mike Freyberger <mi...@xandr.com>.
Hi Matthias,

Once KIP-429 is released,  all non-sticky assignors will be as useful. Any user that wants to take advantage of KIP-429 needs to use a sticky assignor. Currently there is only 1 sticky assignor in the kafka project, which is similar to RoundRobinAssignor, but a sticky verion. I imagine there will be users who currently use RangeAssignor but want to take advantage of KIP-429. So, having more directly accessible sticky assignors will allow for more users to take advantage of KIP-429, without being forced to use Kafka Streams. Maybe I should reframe the KIP to essentially being a sticky version of RangeAssignor?

Regarding how I am using a KV store instead of a kafka compacted topic: I simply prepend my keys with the incoming kafka partition, so on partition assignment I can scan the KV store for all keys within the assigned partition.

Mike

On 4/30/19, 6:49 AM, "Matthias J. Sax" <ma...@confluent.io> wrote:

    Mike,
    
    I am still not sure, why we need to add this assignor to the project.
    Even after you pointed out that you cannot use Kafka Streams, the idea
    of the consumer to make the `PartitionAssignor` interface public and
    plugable is, that the project does not need to add strategies for all
    kind of use cases, but that people can customize the assignors to their
    needs.
    
    My main question is: how generic is this use case (especially with Kafka
    Streams offering joins out-of-the-box) and do we really need to add it?
    So far, it seems ok to me, if you just write a custom assignor and plug
    it into the consumer. I don't see a strong need to add it to the Kafka
    code base. Basically, it breaks down to
    
    - How many people use joins?
    - How many people can or can't use Kafka Streams joins?
    - To what extend can Kafka Streams be improved to increase the use-case
    coverage?
    - How many people would benefit? (ie, even with adding this assignor,
    they might still be users who need to customize their own assignors
    because their join-use-case is still different to yours.)
    
    
    Also note, that in Kafka Streams you could still provide a custom state
    store implementation (with or without using a compacted changelog) and a
    `Processor` or `Transformer` to implement a custom join. Even if this
    might not work for your specific case, it might work for many other
    people who want to customer a join instead of using Kafka Streams'
    out-of-the-box join.
    
    
    Can you elaborate why you think it needs to be part of Kafka directly?
    
    
    One last question:
    
    > - Our state has a high eviction rate, so kafka compacted topics are not ideal for storing the changelog. The compaction cannot keep up and the topic will be majority tombstones when it is read on partition reassignment. We are using a KV store the "change log" instead.
    
    What do you mean by 'We are using a KV store the "change log" instead.'?
    How to you handle reassignment and state movement? Curious to see if we
    could improve Kafka Streams :)
    
    
    -Matthias
    
    
    On 4/30/19 3:09 AM, Mike Freyberger wrote:
    > In light of KIP-429, I think there will be an increased demand for sticky assignors. So, I'd like to restart the conversation about adding the sticky streams assignor, https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor. 
    > 
    > It’d be great to get feedback on the overall idea and the proposed implementation.
    > 
    > Thanks,
    > 
    > Mike
    > 
    > 
    > On 6/20/18, 5:47 PM, "Mike Freyberger" <mf...@appnexus.com> wrote:
    > 
    >     Matthias, 
    >     
    >     Thanks for the feedback. For our use case, we have some complexities that make using the existing Streams API more complicated than using the Kafka Consumer directly. 
    >     
    >     - We are doing async processing, which I don't think is currently available (KIP-311 is handling this). 
    >     
    >     - Our state has a high eviction rate, so kafka compacted topics are not ideal for storing the changelog. The compaction cannot keep up and the topic will be majority tombstones when it is read on partition reassignment. We are using a KV store the "change log" instead.
    >     
    >     - We wanted to separate consumer threads from worker threads to maximize parallelization while keeping consumer TCP connections down.
    >     
    >     Ultimately, it was much simpler to use the KafkaConsumer directly rather than peel away a lot of what Streams API does for you. I think we should continue to add support for more complex use cases and processing to the Streams API. However, I think there will remain streaming join use cases that can benefit from the flexibility that comes from using the KafkaConsumer directly. 
    >     
    >     Mike
    >     
    >     On 6/20/18, 5:08 PM, "Matthias J. Sax" <ma...@confluent.io> wrote:
    >     
    >         Mike,
    >         
    >         thanks a lot for the KIP. I am wondering, why Streams API cannot be used
    >         for perform the join? Would be good to understand the advantage of
    >         adding a `StickyStreamJoinAssignor` compared to using Streams API? Atm,
    >         it seems to be a redundant feature to me.
    >         
    >         -Matthias
    >         
    >         
    >         On 6/20/18 1:07 PM, Mike Freyberger wrote:
    >         > Hi everybody,
    >         > 
    >         > I’ve created a proposal document for KIP-315 which outlines the motivation of adding a new partition assignment strategy that can used for streaming join use cases.
    >         > 
    >         > It’d be great to get feedback on the overall idea and the proposed implementation.
    >         > 
    >         > KIP Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor
    >         > 
    >         > Thanks,
    >         > 
    >         > Mike
    >         > 
    >         
    >         
    >     
    >     
    > 
    
    


Re: [DISCUSS] KIP-315: Stream Join Sticky Assignor

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Mike,

I am still not sure, why we need to add this assignor to the project.
Even after you pointed out that you cannot use Kafka Streams, the idea
of the consumer to make the `PartitionAssignor` interface public and
plugable is, that the project does not need to add strategies for all
kind of use cases, but that people can customize the assignors to their
needs.

My main question is: how generic is this use case (especially with Kafka
Streams offering joins out-of-the-box) and do we really need to add it?
So far, it seems ok to me, if you just write a custom assignor and plug
it into the consumer. I don't see a strong need to add it to the Kafka
code base. Basically, it breaks down to

- How many people use joins?
- How many people can or can't use Kafka Streams joins?
- To what extend can Kafka Streams be improved to increase the use-case
coverage?
- How many people would benefit? (ie, even with adding this assignor,
they might still be users who need to customize their own assignors
because their join-use-case is still different to yours.)


Also note, that in Kafka Streams you could still provide a custom state
store implementation (with or without using a compacted changelog) and a
`Processor` or `Transformer` to implement a custom join. Even if this
might not work for your specific case, it might work for many other
people who want to customer a join instead of using Kafka Streams'
out-of-the-box join.


Can you elaborate why you think it needs to be part of Kafka directly?


One last question:

> - Our state has a high eviction rate, so kafka compacted topics are not ideal for storing the changelog. The compaction cannot keep up and the topic will be majority tombstones when it is read on partition reassignment. We are using a KV store the "change log" instead.

What do you mean by 'We are using a KV store the "change log" instead.'?
How to you handle reassignment and state movement? Curious to see if we
could improve Kafka Streams :)


-Matthias


On 4/30/19 3:09 AM, Mike Freyberger wrote:
> In light of KIP-429, I think there will be an increased demand for sticky assignors. So, I'd like to restart the conversation about adding the sticky streams assignor, https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor. 
> 
> It’d be great to get feedback on the overall idea and the proposed implementation.
> 
> Thanks,
> 
> Mike
> 
> 
> On 6/20/18, 5:47 PM, "Mike Freyberger" <mf...@appnexus.com> wrote:
> 
>     Matthias, 
>     
>     Thanks for the feedback. For our use case, we have some complexities that make using the existing Streams API more complicated than using the Kafka Consumer directly. 
>     
>     - We are doing async processing, which I don't think is currently available (KIP-311 is handling this). 
>     
>     - Our state has a high eviction rate, so kafka compacted topics are not ideal for storing the changelog. The compaction cannot keep up and the topic will be majority tombstones when it is read on partition reassignment. We are using a KV store the "change log" instead.
>     
>     - We wanted to separate consumer threads from worker threads to maximize parallelization while keeping consumer TCP connections down.
>     
>     Ultimately, it was much simpler to use the KafkaConsumer directly rather than peel away a lot of what Streams API does for you. I think we should continue to add support for more complex use cases and processing to the Streams API. However, I think there will remain streaming join use cases that can benefit from the flexibility that comes from using the KafkaConsumer directly. 
>     
>     Mike
>     
>     On 6/20/18, 5:08 PM, "Matthias J. Sax" <ma...@confluent.io> wrote:
>     
>         Mike,
>         
>         thanks a lot for the KIP. I am wondering, why Streams API cannot be used
>         for perform the join? Would be good to understand the advantage of
>         adding a `StickyStreamJoinAssignor` compared to using Streams API? Atm,
>         it seems to be a redundant feature to me.
>         
>         -Matthias
>         
>         
>         On 6/20/18 1:07 PM, Mike Freyberger wrote:
>         > Hi everybody,
>         > 
>         > I’ve created a proposal document for KIP-315 which outlines the motivation of adding a new partition assignment strategy that can used for streaming join use cases.
>         > 
>         > It’d be great to get feedback on the overall idea and the proposed implementation.
>         > 
>         > KIP Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor
>         > 
>         > Thanks,
>         > 
>         > Mike
>         > 
>         
>         
>     
>     
> 


Re: [DISCUSS] KIP-315: Stream Join Sticky Assignor

Posted by Mike Freyberger <mi...@xandr.com>.
In light of KIP-429, I think there will be an increased demand for sticky assignors. So, I'd like to restart the conversation about adding the sticky streams assignor, https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor. 

It’d be great to get feedback on the overall idea and the proposed implementation.

Thanks,

Mike


On 6/20/18, 5:47 PM, "Mike Freyberger" <mf...@appnexus.com> wrote:

    Matthias, 
    
    Thanks for the feedback. For our use case, we have some complexities that make using the existing Streams API more complicated than using the Kafka Consumer directly. 
    
    - We are doing async processing, which I don't think is currently available (KIP-311 is handling this). 
    
    - Our state has a high eviction rate, so kafka compacted topics are not ideal for storing the changelog. The compaction cannot keep up and the topic will be majority tombstones when it is read on partition reassignment. We are using a KV store the "change log" instead.
    
    - We wanted to separate consumer threads from worker threads to maximize parallelization while keeping consumer TCP connections down.
    
    Ultimately, it was much simpler to use the KafkaConsumer directly rather than peel away a lot of what Streams API does for you. I think we should continue to add support for more complex use cases and processing to the Streams API. However, I think there will remain streaming join use cases that can benefit from the flexibility that comes from using the KafkaConsumer directly. 
    
    Mike
    
    On 6/20/18, 5:08 PM, "Matthias J. Sax" <ma...@confluent.io> wrote:
    
        Mike,
        
        thanks a lot for the KIP. I am wondering, why Streams API cannot be used
        for perform the join? Would be good to understand the advantage of
        adding a `StickyStreamJoinAssignor` compared to using Streams API? Atm,
        it seems to be a redundant feature to me.
        
        -Matthias
        
        
        On 6/20/18 1:07 PM, Mike Freyberger wrote:
        > Hi everybody,
        > 
        > I’ve created a proposal document for KIP-315 which outlines the motivation of adding a new partition assignment strategy that can used for streaming join use cases.
        > 
        > It’d be great to get feedback on the overall idea and the proposed implementation.
        > 
        > KIP Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor
        > 
        > Thanks,
        > 
        > Mike
        > 
        
        
    
    


Re: [DISCUSS] KIP-315: Stream Join Sticky Assignor

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Mike,

Thanks for sharing your feedbacks and the blocker features for Kafka
Streams. They are very helpful.


Guozhang


On Wed, Jun 20, 2018 at 2:47 PM, Mike Freyberger <mf...@appnexus.com>
wrote:

> Matthias,
>
> Thanks for the feedback. For our use case, we have some complexities that
> make using the existing Streams API more complicated than using the Kafka
> Consumer directly.
>
> - We are doing async processing, which I don't think is currently
> available (KIP-311 is handling this).
>
> - Our state has a high eviction rate, so kafka compacted topics are not
> ideal for storing the changelog. The compaction cannot keep up and the
> topic will be majority tombstones when it is read on partition
> reassignment. We are using a KV store the "change log" instead.
>
> - We wanted to separate consumer threads from worker threads to maximize
> parallelization while keeping consumer TCP connections down.
>
> Ultimately, it was much simpler to use the KafkaConsumer directly rather
> than peel away a lot of what Streams API does for you. I think we should
> continue to add support for more complex use cases and processing to the
> Streams API. However, I think there will remain streaming join use cases
> that can benefit from the flexibility that comes from using the
> KafkaConsumer directly.
>
> Mike
>
> On 6/20/18, 5:08 PM, "Matthias J. Sax" <ma...@confluent.io> wrote:
>
>     Mike,
>
>     thanks a lot for the KIP. I am wondering, why Streams API cannot be
> used
>     for perform the join? Would be good to understand the advantage of
>     adding a `StickyStreamJoinAssignor` compared to using Streams API? Atm,
>     it seems to be a redundant feature to me.
>
>     -Matthias
>
>
>     On 6/20/18 1:07 PM, Mike Freyberger wrote:
>     > Hi everybody,
>     >
>     > I’ve created a proposal document for KIP-315 which outlines the
> motivation of adding a new partition assignment strategy that can used for
> streaming join use cases.
>     >
>     > It’d be great to get feedback on the overall idea and the proposed
> implementation.
>     >
>     > KIP Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 315%3A+Stream+Join+Sticky+Assignor
>     >
>     > Thanks,
>     >
>     > Mike
>     >
>
>
>
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-315: Stream Join Sticky Assignor

Posted by Mike Freyberger <mf...@appnexus.com>.
Matthias, 

Thanks for the feedback. For our use case, we have some complexities that make using the existing Streams API more complicated than using the Kafka Consumer directly. 

- We are doing async processing, which I don't think is currently available (KIP-311 is handling this). 

- Our state has a high eviction rate, so kafka compacted topics are not ideal for storing the changelog. The compaction cannot keep up and the topic will be majority tombstones when it is read on partition reassignment. We are using a KV store the "change log" instead.

- We wanted to separate consumer threads from worker threads to maximize parallelization while keeping consumer TCP connections down.

Ultimately, it was much simpler to use the KafkaConsumer directly rather than peel away a lot of what Streams API does for you. I think we should continue to add support for more complex use cases and processing to the Streams API. However, I think there will remain streaming join use cases that can benefit from the flexibility that comes from using the KafkaConsumer directly. 

Mike

On 6/20/18, 5:08 PM, "Matthias J. Sax" <ma...@confluent.io> wrote:

    Mike,
    
    thanks a lot for the KIP. I am wondering, why Streams API cannot be used
    for perform the join? Would be good to understand the advantage of
    adding a `StickyStreamJoinAssignor` compared to using Streams API? Atm,
    it seems to be a redundant feature to me.
    
    -Matthias
    
    
    On 6/20/18 1:07 PM, Mike Freyberger wrote:
    > Hi everybody,
    > 
    > I’ve created a proposal document for KIP-315 which outlines the motivation of adding a new partition assignment strategy that can used for streaming join use cases.
    > 
    > It’d be great to get feedback on the overall idea and the proposed implementation.
    > 
    > KIP Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor
    > 
    > Thanks,
    > 
    > Mike
    > 
    
    


Re: [DISCUSS] KIP-315: Stream Join Sticky Assignor

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Mike,

thanks a lot for the KIP. I am wondering, why Streams API cannot be used
for perform the join? Would be good to understand the advantage of
adding a `StickyStreamJoinAssignor` compared to using Streams API? Atm,
it seems to be a redundant feature to me.

-Matthias


On 6/20/18 1:07 PM, Mike Freyberger wrote:
> Hi everybody,
> 
> I’ve created a proposal document for KIP-315 which outlines the motivation of adding a new partition assignment strategy that can used for streaming join use cases.
> 
> It’d be great to get feedback on the overall idea and the proposed implementation.
> 
> KIP Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor
> 
> Thanks,
> 
> Mike
>