You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Adrienne Kole <ad...@gmail.com> on 2016/10/04 22:00:21 UTC

Kafka Streams dynamic partitioning

Hi,

From Streams documentation, I can see that each Streams instance is
processing data independently (from other instances), reads from topic
partition(s) and writes to specified topic.


So here, the partitions of topic should be determined beforehand and should
remain static.
In my usecase I want to create partitioned/keyed (time) windows and
aggregate them.
I can partition the incoming data to specified topic's partitions and each
Stream instance can do windowed aggregations.

However, if I don't know the number of possible keys (to partition), then
what should I do?

Thanks
Adrienne

Re: Kafka Streams dynamic partitioning

Posted by Michael Noll <mi...@confluent.io>.
> I think this should be ' pick number of partitions that matches max number
> of possible keys in stream to be partitioned '.
> At least in my usecase , in which I am trying to partition stream by key
> and make windowed aggregations, if there are less number of topic
> partitions than possible keys,  then application will not work correctly.

As I said above, this is actually not needed -- which (I hope) means good
news for you. :-)



On Wed, Oct 5, 2016 at 11:27 PM, Adrienne Kole <ad...@gmail.com>
wrote:

> Thanks, I got the point. That solves my problem.
>
>
>
> On Wed, Oct 5, 2016 at 10:58 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > -----BEGIN PGP SIGNED MESSAGE-----
> > Hash: SHA512
> >
> > Hi,
> >
> > even if you have more distinct keys than partitions (ie, different key
> > go to the same partition), if you do "aggregate by key" Streams will
> > automatically separate the keys and compute an aggregate per key.
> > Thus, you do not need to worry about which keys is hashed to what
> > partition.
> >
> > - -Matthias
> >
> > On 10/5/16 1:37 PM, Adrienne Kole wrote:
> > > Hi,
> > >
> > > @Ali     IMO, Yes. That is the job of kafka server to assign kafka
> > > instances partition(s) to process. Each instance can process more
> > > than one partition but one partition cannot be processed by more
> > > than one instance.
> > >
> > > @Michael, Thanks for reply.
> > >> Rather, pick the number of partitions in a way that matches your
> > >> needs to
> > > process the data in parallel I think this should be ' pick number
> > > of partitions that matches max number of possible keys in stream to
> > > be partitioned '. At least in my usecase , in which I am trying to
> > > partition stream by key and make windowed aggregations, if there
> > > are less number of topic partitions than possible keys,  then
> > > application will not work correctly.
> > >
> > > That is, if the number of topic partitions is less than possible
> > > stream keys, then different keyed stream tuples will be assigned to
> > > same topic. That was the problem that I was trying to solve and it
> > > seems the only solution is to estimate max number of possible keys
> > > and assign accordingly.
> > >
> > > Thanks Adrienne
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Oct 5, 2016 at 5:55 PM, Ali Akhtar <al...@gmail.com>
> > > wrote:
> > >
> > >>> It's often a good
> > >> idea to over-partition your topics.  For example, even if today
> > >> 10 machines (and thus 10 partitions) would be sufficient, pick a
> > >> higher number of partitions (say, 50) so you have some wiggle
> > >> room to add more machines (11...50) later if need be.
> > >>
> > >> If you create e.g 30 partitions, but only have e.g 5 instances of
> > >> your program, all on the same consumer group, all using kafka
> > >> streams to consume the topic, do you still receive all the data
> > >> posted to the topic, or will you need to have the same instances
> > >> of the program as there are partitions?
> > >>
> > >> (If you have 1 instance, 30 partitions, will the same rules
> > >> apply, i.e it will receive all data?)
> > >>
> > >> On Wed, Oct 5, 2016 at 8:52 PM, Michael Noll
> > >> <mi...@confluent.io> wrote:
> > >>
> > >>>> So, in this case I should know the max number of possible
> > >>>> keys so that I can create that number of partitions.
> > >>>
> > >>> Assuming I understand your original question correctly, then
> > >>> you would
> > >> not
> > >>> need to do/know this.  Rather, pick the number of partitions in
> > >>> a way
> > >> that
> > >>> matches your needs to process the data in parallel (e.g. if you
> > >>> expect
> > >> that
> > >>> you require 10 machines in order to process the incoming data,
> > >>> then you'd need 10 partitions).  Also, as a general
> > >>> recommendation:  It's often a
> > >> good
> > >>> idea to over-partition your topics.  For example, even if today
> > >>> 10
> > >> machines
> > >>> (and thus 10 partitions) would be sufficient, pick a higher
> > >>> number of partitions (say, 50) so you have some wiggle room to
> > >>> add more machines (11...50) later if need be.
> > >>>
> > >>>
> > >>>
> > >>> On Wed, Oct 5, 2016 at 9:34 AM, Adrienne Kole
> > >>> <ad...@gmail.com> wrote:
> > >>>
> > >>>> Hi Guozhang,
> > >>>>
> > >>>> So, in this case I should know the max number of possible
> > >>>> keys so that
> > >> I
> > >>>> can create that number of partitions.
> > >>>>
> > >>>> Thanks
> > >>>>
> > >>>> Adrienne
> > >>>>
> > >>>> On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang
> > >>>> <wa...@gmail.com>
> > >>> wrote:
> > >>>>
> > >>>>> By default the partitioner will use murmur hash on the key
> > >>>>> and mode
> > >> on
> > >>>>> current num.partitions to determine which partitions to go
> > >>>>> to, so
> > >>> records
> > >>>>> with the same key will be assigned to the same partition.
> > >>>>> Would that
> > >> be
> > >>>> OK
> > >>>>> for your case?
> > >>>>>
> > >>>>>
> > >>>>> Guozhang
> > >>>>>
> > >>>>>
> > >>>>> On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole <
> > >> adriennekole1@gmail.com
> > >>>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi,
> > >>>>>>
> > >>>>>> From Streams documentation, I can see that each Streams
> > >>>>>> instance is processing data independently (from other
> > >>>>>> instances), reads from
> > >>> topic
> > >>>>>> partition(s) and writes to specified topic.
> > >>>>>>
> > >>>>>>
> > >>>>>> So here, the partitions of topic should be determined
> > >>>>>> beforehand
> > >> and
> > >>>>> should
> > >>>>>> remain static. In my usecase I want to create
> > >>>>>> partitioned/keyed (time) windows and aggregate them. I
> > >>>>>> can partition the incoming data to specified topic's
> > >>>>>> partitions
> > >> and
> > >>>>> each
> > >>>>>> Stream instance can do windowed aggregations.
> > >>>>>>
> > >>>>>> However, if I don't know the number of possible keys (to
> > >> partition),
> > >>>> then
> > >>>>>> what should I do?
> > >>>>>>
> > >>>>>> Thanks Adrienne
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> -- -- Guozhang
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> > -----BEGIN PGP SIGNATURE-----
> > Comment: GPGTools - https://gpgtools.org
> >
> > iQIcBAEBCgAGBQJX9Wl2AAoJECnhiMLycopPVNQQAJnLVIEFTWRdUY41jLEjHEdJ
> > Nwqk/M/VrZ3/s8BR9+XKKN+lTd+lQaBFgQUxyae18kIchnEe5r+QB+PoDB4IkTV8
> > zS6XhDTr7RwiHdhykGK9bKxhF/0gAiQ4qFu8iBlmwTfH3mOSDgY76z4/wQVnS7Sf
> > C1/s2ubvQFgEp0W1OOiTAy2uYhPkeskLjHpFL7Nxc19zGy4a8IeHFo2r1CYCsJHJ
> > VBOsLaBgstICTcWnx1lJBjqwhqlXPPo4+dOo+e6h71vuHhFMePhsPuxHQ9nBVKw/
> > 0S0X4m+fB2FInx9XOG9rHA3nYvK5zr5eijKMNGGdJfU9lItcM5nhnEnPOI1QLnak
> > rrAgwbdeUlv0clo04tAyaxGxz2/F0Z5S3xJa1M5vvAd5895jeKdh1l7UdByQWA5R
> > BTkYWodEZ01Zn6fqHkhR5tsWzKLfvFr2bXps/21WzpC90bJK4snUXSs97ugVdT0U
> > UgngxEeD9566EENIFzF2HGOrkZd74B5sEs4p5Tp16JhzOydnv9xGGOfxDJXwr0lh
> > 5TBcKRqF/998zyil7UOFFecvR7DUYDc/pJIJVffRo7DyjvkOCK1OYBBQB50JTh3s
> > blMCHsNu7iXDbRocLT2EigkqKZtQ5w4Xm7e3pEkqQJ/KOnmvJsbg4JFPRC3sw+7X
> > h+bHtn7Nbc7HCUhho4nJ
> > =9zvn
> > -----END PGP SIGNATURE-----
> >
>

Re: Kafka Streams dynamic partitioning

Posted by Adrienne Kole <ad...@gmail.com>.
Thanks, I got the point. That solves my problem.



On Wed, Oct 5, 2016 at 10:58 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Hi,
>
> even if you have more distinct keys than partitions (ie, different key
> go to the same partition), if you do "aggregate by key" Streams will
> automatically separate the keys and compute an aggregate per key.
> Thus, you do not need to worry about which keys is hashed to what
> partition.
>
> - -Matthias
>
> On 10/5/16 1:37 PM, Adrienne Kole wrote:
> > Hi,
> >
> > @Ali     IMO, Yes. That is the job of kafka server to assign kafka
> > instances partition(s) to process. Each instance can process more
> > than one partition but one partition cannot be processed by more
> > than one instance.
> >
> > @Michael, Thanks for reply.
> >> Rather, pick the number of partitions in a way that matches your
> >> needs to
> > process the data in parallel I think this should be ' pick number
> > of partitions that matches max number of possible keys in stream to
> > be partitioned '. At least in my usecase , in which I am trying to
> > partition stream by key and make windowed aggregations, if there
> > are less number of topic partitions than possible keys,  then
> > application will not work correctly.
> >
> > That is, if the number of topic partitions is less than possible
> > stream keys, then different keyed stream tuples will be assigned to
> > same topic. That was the problem that I was trying to solve and it
> > seems the only solution is to estimate max number of possible keys
> > and assign accordingly.
> >
> > Thanks Adrienne
> >
> >
> >
> >
> >
> > On Wed, Oct 5, 2016 at 5:55 PM, Ali Akhtar <al...@gmail.com>
> > wrote:
> >
> >>> It's often a good
> >> idea to over-partition your topics.  For example, even if today
> >> 10 machines (and thus 10 partitions) would be sufficient, pick a
> >> higher number of partitions (say, 50) so you have some wiggle
> >> room to add more machines (11...50) later if need be.
> >>
> >> If you create e.g 30 partitions, but only have e.g 5 instances of
> >> your program, all on the same consumer group, all using kafka
> >> streams to consume the topic, do you still receive all the data
> >> posted to the topic, or will you need to have the same instances
> >> of the program as there are partitions?
> >>
> >> (If you have 1 instance, 30 partitions, will the same rules
> >> apply, i.e it will receive all data?)
> >>
> >> On Wed, Oct 5, 2016 at 8:52 PM, Michael Noll
> >> <mi...@confluent.io> wrote:
> >>
> >>>> So, in this case I should know the max number of possible
> >>>> keys so that I can create that number of partitions.
> >>>
> >>> Assuming I understand your original question correctly, then
> >>> you would
> >> not
> >>> need to do/know this.  Rather, pick the number of partitions in
> >>> a way
> >> that
> >>> matches your needs to process the data in parallel (e.g. if you
> >>> expect
> >> that
> >>> you require 10 machines in order to process the incoming data,
> >>> then you'd need 10 partitions).  Also, as a general
> >>> recommendation:  It's often a
> >> good
> >>> idea to over-partition your topics.  For example, even if today
> >>> 10
> >> machines
> >>> (and thus 10 partitions) would be sufficient, pick a higher
> >>> number of partitions (say, 50) so you have some wiggle room to
> >>> add more machines (11...50) later if need be.
> >>>
> >>>
> >>>
> >>> On Wed, Oct 5, 2016 at 9:34 AM, Adrienne Kole
> >>> <ad...@gmail.com> wrote:
> >>>
> >>>> Hi Guozhang,
> >>>>
> >>>> So, in this case I should know the max number of possible
> >>>> keys so that
> >> I
> >>>> can create that number of partitions.
> >>>>
> >>>> Thanks
> >>>>
> >>>> Adrienne
> >>>>
> >>>> On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang
> >>>> <wa...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> By default the partitioner will use murmur hash on the key
> >>>>> and mode
> >> on
> >>>>> current num.partitions to determine which partitions to go
> >>>>> to, so
> >>> records
> >>>>> with the same key will be assigned to the same partition.
> >>>>> Would that
> >> be
> >>>> OK
> >>>>> for your case?
> >>>>>
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>> On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole <
> >> adriennekole1@gmail.com
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> From Streams documentation, I can see that each Streams
> >>>>>> instance is processing data independently (from other
> >>>>>> instances), reads from
> >>> topic
> >>>>>> partition(s) and writes to specified topic.
> >>>>>>
> >>>>>>
> >>>>>> So here, the partitions of topic should be determined
> >>>>>> beforehand
> >> and
> >>>>> should
> >>>>>> remain static. In my usecase I want to create
> >>>>>> partitioned/keyed (time) windows and aggregate them. I
> >>>>>> can partition the incoming data to specified topic's
> >>>>>> partitions
> >> and
> >>>>> each
> >>>>>> Stream instance can do windowed aggregations.
> >>>>>>
> >>>>>> However, if I don't know the number of possible keys (to
> >> partition),
> >>>> then
> >>>>>> what should I do?
> >>>>>>
> >>>>>> Thanks Adrienne
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> -- -- Guozhang
> >>>>>
> >>>>
> >>>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJX9Wl2AAoJECnhiMLycopPVNQQAJnLVIEFTWRdUY41jLEjHEdJ
> Nwqk/M/VrZ3/s8BR9+XKKN+lTd+lQaBFgQUxyae18kIchnEe5r+QB+PoDB4IkTV8
> zS6XhDTr7RwiHdhykGK9bKxhF/0gAiQ4qFu8iBlmwTfH3mOSDgY76z4/wQVnS7Sf
> C1/s2ubvQFgEp0W1OOiTAy2uYhPkeskLjHpFL7Nxc19zGy4a8IeHFo2r1CYCsJHJ
> VBOsLaBgstICTcWnx1lJBjqwhqlXPPo4+dOo+e6h71vuHhFMePhsPuxHQ9nBVKw/
> 0S0X4m+fB2FInx9XOG9rHA3nYvK5zr5eijKMNGGdJfU9lItcM5nhnEnPOI1QLnak
> rrAgwbdeUlv0clo04tAyaxGxz2/F0Z5S3xJa1M5vvAd5895jeKdh1l7UdByQWA5R
> BTkYWodEZ01Zn6fqHkhR5tsWzKLfvFr2bXps/21WzpC90bJK4snUXSs97ugVdT0U
> UgngxEeD9566EENIFzF2HGOrkZd74B5sEs4p5Tp16JhzOydnv9xGGOfxDJXwr0lh
> 5TBcKRqF/998zyil7UOFFecvR7DUYDc/pJIJVffRo7DyjvkOCK1OYBBQB50JTh3s
> blMCHsNu7iXDbRocLT2EigkqKZtQ5w4Xm7e3pEkqQJ/KOnmvJsbg4JFPRC3sw+7X
> h+bHtn7Nbc7HCUhho4nJ
> =9zvn
> -----END PGP SIGNATURE-----
>

Re: Kafka Streams dynamic partitioning

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hi,

even if you have more distinct keys than partitions (ie, different key
go to the same partition), if you do "aggregate by key" Streams will
automatically separate the keys and compute an aggregate per key.
Thus, you do not need to worry about which keys is hashed to what
partition.

- -Matthias

On 10/5/16 1:37 PM, Adrienne Kole wrote:
> Hi,
> 
> @Ali     IMO, Yes. That is the job of kafka server to assign kafka 
> instances partition(s) to process. Each instance can process more
> than one partition but one partition cannot be processed by more
> than one instance.
> 
> @Michael, Thanks for reply.
>> Rather, pick the number of partitions in a way that matches your
>> needs to
> process the data in parallel I think this should be ' pick number
> of partitions that matches max number of possible keys in stream to
> be partitioned '. At least in my usecase , in which I am trying to
> partition stream by key and make windowed aggregations, if there
> are less number of topic partitions than possible keys,  then
> application will not work correctly.
> 
> That is, if the number of topic partitions is less than possible
> stream keys, then different keyed stream tuples will be assigned to
> same topic. That was the problem that I was trying to solve and it
> seems the only solution is to estimate max number of possible keys
> and assign accordingly.
> 
> Thanks Adrienne
> 
> 
> 
> 
> 
> On Wed, Oct 5, 2016 at 5:55 PM, Ali Akhtar <al...@gmail.com>
> wrote:
> 
>>> It's often a good
>> idea to over-partition your topics.  For example, even if today
>> 10 machines (and thus 10 partitions) would be sufficient, pick a
>> higher number of partitions (say, 50) so you have some wiggle
>> room to add more machines (11...50) later if need be.
>> 
>> If you create e.g 30 partitions, but only have e.g 5 instances of
>> your program, all on the same consumer group, all using kafka
>> streams to consume the topic, do you still receive all the data
>> posted to the topic, or will you need to have the same instances
>> of the program as there are partitions?
>> 
>> (If you have 1 instance, 30 partitions, will the same rules
>> apply, i.e it will receive all data?)
>> 
>> On Wed, Oct 5, 2016 at 8:52 PM, Michael Noll
>> <mi...@confluent.io> wrote:
>> 
>>>> So, in this case I should know the max number of possible
>>>> keys so that I can create that number of partitions.
>>> 
>>> Assuming I understand your original question correctly, then
>>> you would
>> not
>>> need to do/know this.  Rather, pick the number of partitions in
>>> a way
>> that
>>> matches your needs to process the data in parallel (e.g. if you
>>> expect
>> that
>>> you require 10 machines in order to process the incoming data,
>>> then you'd need 10 partitions).  Also, as a general
>>> recommendation:  It's often a
>> good
>>> idea to over-partition your topics.  For example, even if today
>>> 10
>> machines
>>> (and thus 10 partitions) would be sufficient, pick a higher
>>> number of partitions (say, 50) so you have some wiggle room to
>>> add more machines (11...50) later if need be.
>>> 
>>> 
>>> 
>>> On Wed, Oct 5, 2016 at 9:34 AM, Adrienne Kole
>>> <ad...@gmail.com> wrote:
>>> 
>>>> Hi Guozhang,
>>>> 
>>>> So, in this case I should know the max number of possible
>>>> keys so that
>> I
>>>> can create that number of partitions.
>>>> 
>>>> Thanks
>>>> 
>>>> Adrienne
>>>> 
>>>> On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang
>>>> <wa...@gmail.com>
>>> wrote:
>>>> 
>>>>> By default the partitioner will use murmur hash on the key
>>>>> and mode
>> on
>>>>> current num.partitions to determine which partitions to go
>>>>> to, so
>>> records
>>>>> with the same key will be assigned to the same partition.
>>>>> Would that
>> be
>>>> OK
>>>>> for your case?
>>>>> 
>>>>> 
>>>>> Guozhang
>>>>> 
>>>>> 
>>>>> On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole <
>> adriennekole1@gmail.com
>>>> 
>>>>> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> From Streams documentation, I can see that each Streams
>>>>>> instance is processing data independently (from other
>>>>>> instances), reads from
>>> topic
>>>>>> partition(s) and writes to specified topic.
>>>>>> 
>>>>>> 
>>>>>> So here, the partitions of topic should be determined
>>>>>> beforehand
>> and
>>>>> should
>>>>>> remain static. In my usecase I want to create
>>>>>> partitioned/keyed (time) windows and aggregate them. I
>>>>>> can partition the incoming data to specified topic's
>>>>>> partitions
>> and
>>>>> each
>>>>>> Stream instance can do windowed aggregations.
>>>>>> 
>>>>>> However, if I don't know the number of possible keys (to
>> partition),
>>>> then
>>>>>> what should I do?
>>>>>> 
>>>>>> Thanks Adrienne
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> -- -- Guozhang
>>>>> 
>>>> 
>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJX9Wl2AAoJECnhiMLycopPVNQQAJnLVIEFTWRdUY41jLEjHEdJ
Nwqk/M/VrZ3/s8BR9+XKKN+lTd+lQaBFgQUxyae18kIchnEe5r+QB+PoDB4IkTV8
zS6XhDTr7RwiHdhykGK9bKxhF/0gAiQ4qFu8iBlmwTfH3mOSDgY76z4/wQVnS7Sf
C1/s2ubvQFgEp0W1OOiTAy2uYhPkeskLjHpFL7Nxc19zGy4a8IeHFo2r1CYCsJHJ
VBOsLaBgstICTcWnx1lJBjqwhqlXPPo4+dOo+e6h71vuHhFMePhsPuxHQ9nBVKw/
0S0X4m+fB2FInx9XOG9rHA3nYvK5zr5eijKMNGGdJfU9lItcM5nhnEnPOI1QLnak
rrAgwbdeUlv0clo04tAyaxGxz2/F0Z5S3xJa1M5vvAd5895jeKdh1l7UdByQWA5R
BTkYWodEZ01Zn6fqHkhR5tsWzKLfvFr2bXps/21WzpC90bJK4snUXSs97ugVdT0U
UgngxEeD9566EENIFzF2HGOrkZd74B5sEs4p5Tp16JhzOydnv9xGGOfxDJXwr0lh
5TBcKRqF/998zyil7UOFFecvR7DUYDc/pJIJVffRo7DyjvkOCK1OYBBQB50JTh3s
blMCHsNu7iXDbRocLT2EigkqKZtQ5w4Xm7e3pEkqQJ/KOnmvJsbg4JFPRC3sw+7X
h+bHtn7Nbc7HCUhho4nJ
=9zvn
-----END PGP SIGNATURE-----

Re: Kafka Streams dynamic partitioning

Posted by Adrienne Kole <ad...@gmail.com>.
Hi,

@Ali     IMO, Yes. That is the job of kafka server to assign kafka
instances partition(s) to process. Each instance can process more than one
partition but one partition cannot be processed by more than one instance.

@Michael, Thanks for reply.
>Rather, pick the number of partitions in a way that matches your needs to
process the data in parallel
I think this should be ' pick number of partitions that matches max number
of possible keys in stream to be partitioned '.
At least in my usecase , in which I am trying to partition stream by key
and make windowed aggregations, if there are less number of topic
partitions than possible keys,  then application will not work correctly.

That is, if the number of topic partitions is less than possible stream
keys, then different keyed stream tuples will be assigned to same topic.
That was the problem that I was trying to solve and it seems the only
solution is to estimate max number of possible keys and assign accordingly.

Thanks
Adrienne





On Wed, Oct 5, 2016 at 5:55 PM, Ali Akhtar <al...@gmail.com> wrote:

> > It's often a good
> idea to over-partition your topics.  For example, even if today 10 machines
> (and thus 10 partitions) would be sufficient, pick a higher number of
> partitions (say, 50) so you have some wiggle room to add more machines
> (11...50) later if need be.
>
> If you create e.g 30 partitions, but only have e.g 5 instances of your
> program, all on the same consumer group, all using kafka streams to consume
> the topic, do you still receive all the data posted to the topic, or will
> you need to have the same instances of the program as there are partitions?
>
> (If you have 1 instance, 30 partitions, will the same rules apply, i.e it
> will receive all data?)
>
> On Wed, Oct 5, 2016 at 8:52 PM, Michael Noll <mi...@confluent.io> wrote:
>
> > > So, in this case I should know the max number of possible keys so that
> > > I can create that number of partitions.
> >
> > Assuming I understand your original question correctly, then you would
> not
> > need to do/know this.  Rather, pick the number of partitions in a way
> that
> > matches your needs to process the data in parallel (e.g. if you expect
> that
> > you require 10 machines in order to process the incoming data, then you'd
> > need 10 partitions).  Also, as a general recommendation:  It's often a
> good
> > idea to over-partition your topics.  For example, even if today 10
> machines
> > (and thus 10 partitions) would be sufficient, pick a higher number of
> > partitions (say, 50) so you have some wiggle room to add more machines
> > (11...50) later if need be.
> >
> >
> >
> > On Wed, Oct 5, 2016 at 9:34 AM, Adrienne Kole <ad...@gmail.com>
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > So, in this case I should know the max number of possible keys so that
> I
> > > can create that number of partitions.
> > >
> > > Thanks
> > >
> > > Adrienne
> > >
> > > On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > By default the partitioner will use murmur hash on the key and mode
> on
> > > > current num.partitions to determine which partitions to go to, so
> > records
> > > > with the same key will be assigned to the same partition. Would that
> be
> > > OK
> > > > for your case?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole <
> adriennekole1@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > From Streams documentation, I can see that each Streams instance is
> > > > > processing data independently (from other instances), reads from
> > topic
> > > > > partition(s) and writes to specified topic.
> > > > >
> > > > >
> > > > > So here, the partitions of topic should be determined beforehand
> and
> > > > should
> > > > > remain static.
> > > > > In my usecase I want to create partitioned/keyed (time) windows and
> > > > > aggregate them.
> > > > > I can partition the incoming data to specified topic's partitions
> and
> > > > each
> > > > > Stream instance can do windowed aggregations.
> > > > >
> > > > > However, if I don't know the number of possible keys (to
> partition),
> > > then
> > > > > what should I do?
> > > > >
> > > > > Thanks
> > > > > Adrienne
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>

Re: Kafka Streams dynamic partitioning

Posted by Ali Akhtar <al...@gmail.com>.
> It's often a good
idea to over-partition your topics.  For example, even if today 10 machines
(and thus 10 partitions) would be sufficient, pick a higher number of
partitions (say, 50) so you have some wiggle room to add more machines
(11...50) later if need be.

If you create e.g 30 partitions, but only have e.g 5 instances of your
program, all on the same consumer group, all using kafka streams to consume
the topic, do you still receive all the data posted to the topic, or will
you need to have the same instances of the program as there are partitions?

(If you have 1 instance, 30 partitions, will the same rules apply, i.e it
will receive all data?)

On Wed, Oct 5, 2016 at 8:52 PM, Michael Noll <mi...@confluent.io> wrote:

> > So, in this case I should know the max number of possible keys so that
> > I can create that number of partitions.
>
> Assuming I understand your original question correctly, then you would not
> need to do/know this.  Rather, pick the number of partitions in a way that
> matches your needs to process the data in parallel (e.g. if you expect that
> you require 10 machines in order to process the incoming data, then you'd
> need 10 partitions).  Also, as a general recommendation:  It's often a good
> idea to over-partition your topics.  For example, even if today 10 machines
> (and thus 10 partitions) would be sufficient, pick a higher number of
> partitions (say, 50) so you have some wiggle room to add more machines
> (11...50) later if need be.
>
>
>
> On Wed, Oct 5, 2016 at 9:34 AM, Adrienne Kole <ad...@gmail.com>
> wrote:
>
> > Hi Guozhang,
> >
> > So, in this case I should know the max number of possible keys so that I
> > can create that number of partitions.
> >
> > Thanks
> >
> > Adrienne
> >
> > On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > By default the partitioner will use murmur hash on the key and mode on
> > > current num.partitions to determine which partitions to go to, so
> records
> > > with the same key will be assigned to the same partition. Would that be
> > OK
> > > for your case?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole <adriennekole1@gmail.com
> >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > From Streams documentation, I can see that each Streams instance is
> > > > processing data independently (from other instances), reads from
> topic
> > > > partition(s) and writes to specified topic.
> > > >
> > > >
> > > > So here, the partitions of topic should be determined beforehand and
> > > should
> > > > remain static.
> > > > In my usecase I want to create partitioned/keyed (time) windows and
> > > > aggregate them.
> > > > I can partition the incoming data to specified topic's partitions and
> > > each
> > > > Stream instance can do windowed aggregations.
> > > >
> > > > However, if I don't know the number of possible keys (to partition),
> > then
> > > > what should I do?
> > > >
> > > > Thanks
> > > > Adrienne
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: Kafka Streams dynamic partitioning

Posted by Michael Noll <mi...@confluent.io>.
> So, in this case I should know the max number of possible keys so that
> I can create that number of partitions.

Assuming I understand your original question correctly, then you would not
need to do/know this.  Rather, pick the number of partitions in a way that
matches your needs to process the data in parallel (e.g. if you expect that
you require 10 machines in order to process the incoming data, then you'd
need 10 partitions).  Also, as a general recommendation:  It's often a good
idea to over-partition your topics.  For example, even if today 10 machines
(and thus 10 partitions) would be sufficient, pick a higher number of
partitions (say, 50) so you have some wiggle room to add more machines
(11...50) later if need be.



On Wed, Oct 5, 2016 at 9:34 AM, Adrienne Kole <ad...@gmail.com>
wrote:

> Hi Guozhang,
>
> So, in this case I should know the max number of possible keys so that I
> can create that number of partitions.
>
> Thanks
>
> Adrienne
>
> On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > By default the partitioner will use murmur hash on the key and mode on
> > current num.partitions to determine which partitions to go to, so records
> > with the same key will be assigned to the same partition. Would that be
> OK
> > for your case?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole <ad...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > From Streams documentation, I can see that each Streams instance is
> > > processing data independently (from other instances), reads from topic
> > > partition(s) and writes to specified topic.
> > >
> > >
> > > So here, the partitions of topic should be determined beforehand and
> > should
> > > remain static.
> > > In my usecase I want to create partitioned/keyed (time) windows and
> > > aggregate them.
> > > I can partition the incoming data to specified topic's partitions and
> > each
> > > Stream instance can do windowed aggregations.
> > >
> > > However, if I don't know the number of possible keys (to partition),
> then
> > > what should I do?
> > >
> > > Thanks
> > > Adrienne
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: Kafka Streams dynamic partitioning

Posted by Adrienne Kole <ad...@gmail.com>.
Hi Guozhang,

So, in this case I should know the max number of possible keys so that I
can create that number of partitions.

Thanks

Adrienne

On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang <wa...@gmail.com> wrote:

> By default the partitioner will use murmur hash on the key and mode on
> current num.partitions to determine which partitions to go to, so records
> with the same key will be assigned to the same partition. Would that be OK
> for your case?
>
>
> Guozhang
>
>
> On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole <ad...@gmail.com>
> wrote:
>
> > Hi,
> >
> > From Streams documentation, I can see that each Streams instance is
> > processing data independently (from other instances), reads from topic
> > partition(s) and writes to specified topic.
> >
> >
> > So here, the partitions of topic should be determined beforehand and
> should
> > remain static.
> > In my usecase I want to create partitioned/keyed (time) windows and
> > aggregate them.
> > I can partition the incoming data to specified topic's partitions and
> each
> > Stream instance can do windowed aggregations.
> >
> > However, if I don't know the number of possible keys (to partition), then
> > what should I do?
> >
> > Thanks
> > Adrienne
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka Streams dynamic partitioning

Posted by Guozhang Wang <wa...@gmail.com>.
By default the partitioner will use murmur hash on the key and mode on
current num.partitions to determine which partitions to go to, so records
with the same key will be assigned to the same partition. Would that be OK
for your case?


Guozhang


On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole <ad...@gmail.com>
wrote:

> Hi,
>
> From Streams documentation, I can see that each Streams instance is
> processing data independently (from other instances), reads from topic
> partition(s) and writes to specified topic.
>
>
> So here, the partitions of topic should be determined beforehand and should
> remain static.
> In my usecase I want to create partitioned/keyed (time) windows and
> aggregate them.
> I can partition the incoming data to specified topic's partitions and each
> Stream instance can do windowed aggregations.
>
> However, if I don't know the number of possible keys (to partition), then
> what should I do?
>
> Thanks
> Adrienne
>



-- 
-- Guozhang