You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Daniel Wegener <Da...@holisticon.de> on 2015/02/20 18:19:30 UTC

New Producer - Is the configurable partitioner gone?

Hello Kafka-users!

I am facing a migration from a kind of ( a bit self plumbed) kafka 0.8.1 producer to the new kafka-clients API. I just recognized, that the new KafkaProducer initializes its own Partitioner that cannot be changed (final field, no ctor-param, no Class.forName(config.getPartitionerClassNameFoo()).newInstance()). Is this an intentional change? 
If i understand the new API correctly, one should either define a key for a message and let the default Partitioner care that it will be distributed over all available partitions or to set an explicit partition number per message that will be written to.

The old API api allowed to create ProducerRecors with a key and/or a key used only for partitioning (but one that is not sent down the wire) and then to provide a custom Partitioner that later could distribute this partitioning key over all available partitions when the message is actually sent.

The difference in the new procuder API is that we need to know the exact number of available partitions before we even create a ProducerRecord. If we dont ensure the correct number of partitions and try to send a message to a partition that does not exist, the whole message will blow up later when the producer tries to send it.

I dont expect the partition count to change that often but the API-doc states that a partitionsFor(String topic) result _should not_ be cached. But I do not really want to check for changed partition counts before every creation of a ProducerRecord. The old pluggable partitioner was, for us, especially useful for partition-stickyness by business keys (and thus stateful processing stages across multiple topics). This ensured that a message that was processed on stage1:partition2 will eventually be processed on stageN:partition2. Not very clever in terms of scalability per stage, but it makes reasoning about the message flow alot easier.  
 
So for a single ProducerRecord, for my understanding it might be nicer to have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or Java8 lambda equivalent) instead of the nullable Partition attribute and evaluate this function in the producer.

Kind regards

------------------------------------------------------------------------------------
Daniel Wegener
Holisticon AG


Re: New Producer - Is the configurable partitioner gone?

Posted by Steven Wu <st...@gmail.com>.
yes. this is with the new java client. since it is using non-blocking NIO,
sender thread probably was able to scan the buffer very frequently. hence
random partitioner won't get much chance to accumulate records for batch or
request.
Setup


* - 3 broker instances (m1.xlarge)- 6 producer instances (m1.xlarge)- topic
partitions: 36- message size: 1 KB- no compression- traffic volume- total:
30 MB / 30K msgs,- per broker: 10 MB / 10K msgs*Summary

partitioner

batched records per request

broker cpu util

random without lingering

1.25

75%

sticky without lingering

2.0

50%

sticky with 100ms lingering

15

33%

there are two ways to improve batching

   1.

   use sticky partitioner that we implement. kafka default is random
   partitioner, where a random partition is selected for each msg. with sticky
   partitioner, we can stick all msgs (to one topic) on the same partition for
   a while (e.g. 1 second) before moving on to next partition.
   2.

   set "linger.ms" property from kafka producer. it allows message to
   linger around for some period and hope for batching opportunity.

We can deploy one or both methods. But the main point is that improved
batching helps broker a lot.

“linger.ms” can cause risk of filling up the buffer. it works very well
with sticky partitioner because it is very fast to accumulate a full batch.



On Sun, Feb 22, 2015 at 5:21 PM, Jay Kreps <ja...@gmail.com> wrote:

> Interesting, and this was with the new Java client? This sounds like as
> much an opportunity for improvement in the code as anything. Would you be
> willing to share the details?
>
> -jay
>
> On Sunday, February 22, 2015, Steven Wu <st...@gmail.com> wrote:
>
> > > The low connection partitioner might work for this
> > by attempting to reuse recently used nodes whenever possible. That is
> > useful in environments with lots and lots of producers where you don't
> care
> > about semantic partitioning.
> >
> > In one of the perf test, we found that above "sticky" partitioner
> improved
> > batching and reduced cpu util at broker side by 60%. We plan to make it
> our
> > default partitioner.
> >
> >
> >
> > On Sun, Feb 22, 2015 at 10:28 AM, Jay Kreps <jay.kreps@gmail.com
> > <javascript:;>> wrote:
> >
> > > Hey Daniel,
> > >
> > > Yeah I think that would be doable. If you want to pursue it you would
> > need
> > > to do a quick KIP just to get everyone on the same page since this
> would
> > be
> > > a public interface we would have to support over a long time:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> > >
> > > When we have the details worked out, then it should be a fairly
> > > straight-forward patch to make that pluggable.
> > >
> > > A few comments:
> > > - I think we should just make the DefaultPartitioner the default value
> > for
> > > that configuration, rather than having it be a fall back.
> > > - You need to pass in the binary key and value in addition to the java
> > > objects. Otherwise any partitioning based on the binary value will
> > require
> > > reserializing these.
> > > - If we add this option we should really ship at least one other useful
> > > partitioning strategy. The low connection partitioner might work for
> this
> > > by attempting to reuse recently used nodes whenever possible. That is
> > > useful in environments with lots and lots of producers where you don't
> > care
> > > about semantic partitioning. It would be good to think through if there
> > are
> > > any other useful partitioning strategies to make sure they would also
> be
> > > doable with the interface we would end up with.
> > > - Currently Cluster is not a public class so we'll have to think about
> > > whether we want to make that public.
> > >
> > > -Jay
> > >
> > >
> > > On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener <
> > > daniel.wegener@holisticon.de <javascript:;>> wrote:
> > >
> > > >
> > > > Jay Kreps <ja...@...> writes:
> > > >
> > > > >
> > > > > Hey Daniel,
> > > > >
> > > > > partitionsFor() will block the very first time it sees a new topic
> > that
> > > > it
> > > > > doesn't have metadata for yet. If you want to ensure you don't
> block
> > > even
> > > > > that one time, call it prior to your regular usage so it
> initializes
> > > > then.
> > > > >
> > > > > The rationale for adding a partition in ProducerRecord was that
> there
> > > are
> > > > > actually cases where you want to associate a key with the record
> but
> > > not
> > > > > use it for partitioning. The rationale for not retaining the
> > pluggable
> > > > > partitioner was that you could always just use the partition (many
> > > people
> > > > > dislike the plugin apis and asked for it). Personally, like you, I
> > > > > preferred the plugin apis.
> > > > >
> > > > > We aren't cut off from exposing the existing partitioner usage as a
> > > > public
> > > > > api that you can override if there is anyone who wants that. I
> think
> > > one
> > > > > nice thing about it would be the ability to ship with an
> alternative
> > > > > partitioning strategy that you could enable purely in config. For
> > > example
> > > > > the old producer had a partitioning strategy that attempted to
> > minimize
> > > > the
> > > > > number of TCP connections for cases where there was no key. 98% of
> > > people
> > > > > loathed that, but for 2% of people it was useful and this would be
> a
> > > way
> > > > to
> > > > > still include that behavior for people migrating to the new
> producer.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
> > > > > daniel.wegener@...> wrote:
> > > > >
> > > > > > Gwen Shapira <gshapira <at> ...> writes:
> > > > > >
> > > > > > >
> > > > > > > Hi Daniel,
> > > > > > >
> > > > > > > I think you can still use the same logic you had in the custom
> > > > > > partitioner
> > > > > > > in the old producer. You just move it to the client that
> creates
> > > the
> > > > > > > records.
> > > > > > > The reason you don't cache the result of partitionsFor is that
> > the
> > > > > > producer
> > > > > > > should handle the caching for you, so its not necessarily a
> long
> > or
> > > > > > > blocking call.
> > > > > > >
> > > > > > > I see it as a pretty small change to the API. But I'm not sure
> > what
> > > > drove
> > > > > > > the change either.
> > > > > > >
> > > > > > > Gwen
> > > > > > >
> > > > > > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > > > > > > Daniel.Wegener <at> ...> wrote:
> > > > > > >
> > > > > > > > Hello Kafka-users!
> > > > > > > >
> > > > > > > > I am facing a migration from a kind of ( a bit self plumbed)
> > > kafka
> > > > > > 0.8.1
> > > > > > > > producer to the new kafka-clients API. I just recognized,
> that
> > > the
> > > > new
> > > > > > > > KafkaProducer initializes its own Partitioner that cannot be
> > > > changed
> > > > > > (final
> > > > > > > > field, no ctor-param, no
> > > > > > > >
> > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()).
> > > > Is
> > > > > > this
> > > > > > > > an intentional change?
> > > > > > > > If i understand the new API correctly, one should either
> > define a
> > > > key
> > > > > > for
> > > > > > > > a message and let the default Partitioner care that it will
> be
> > > > > > distributed
> > > > > > > > over all available partitions or to set an explicit partition
> > > > number
> > > > > > per
> > > > > > > > message that will be written to.
> > > > > > > >
> > > > > > > > The old API api allowed to create ProducerRecors with a key
> > > and/or
> > > > a
> > > > > > key
> > > > > > > > used only for partitioning (but one that is not sent down the
> > > wire)
> > > > and
> > > > > > > > then to provide a custom Partitioner that later could
> > distribute
> > > > this
> > > > > > > > partitioning key over all available partitions when the
> message
> > > is
> > > > > > actually
> > > > > > > > sent.
> > > > > > > >
> > > > > > > > The difference in the new procuder API is that we need to
> know
> > > the
> > > > > > exact
> > > > > > > > number of available partitions before we even create a
> > > > ProducerRecord.
> > > > > > If
> > > > > > > > we dont ensure the correct number of partitions and try to
> > send a
> > > > > > message
> > > > > > > > to a partition that does not exist, the whole message will
> blow
> > > up
> > > > > > later
> > > > > > > > when the producer tries to send it.
> > > > > > > >
> > > > > > > > I dont expect the partition count to change that often but
> the
> > > API-
> > > > doc
> > > > > > > > states that a partitionsFor(String topic) result _should not_
> > be
> > > > > > cached.
> > > > > > > > But I do not really want to check for changed partition
> counts
> > > > before
> > > > > > every
> > > > > > > > creation of a ProducerRecord. The old pluggable partitioner
> > was,
> > > > for
> > > > > > us,
> > > > > > > > especially useful for partition-stickyness by business keys
> > (and
> > > > thus
> > > > > > > > stateful processing stages across multiple topics). This
> > ensured
> > > > that a
> > > > > > > > message that was processed on stage1:partition2 will
> eventually
> > > be
> > > > > > > > processed on stageN:partition2. Not very clever in terms of
> > > > scalability
> > > > > > per
> > > > > > > > stage, but it makes reasoning about the message flow alot
> > easier.
> > > > > > > >
> > > > > > > > So for a single ProducerRecord, for my understanding it might
> > be
> > > > nicer
> > > > > > to
> > > > > > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int
> (or
> > > > Java8
> > > > > > > > lambda equivalent) instead of the nullable Partition
> attribute
> > > and
> > > > > > evaluate
> > > > > > > > this function in the producer.
> > > > > > > >
> > > > > > > > Kind regards
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > -------------------------------------------------------------------
> > > > ----
> > > > > > -------------
> > > > > > > > Daniel Wegener
> > > > > > > > Holisticon AG
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > > Cheers for you quick reply Gwen!
> > > > > > Good to know that partitionsFor is (most time) fast and
> > non-blocking.
> > > > Im
> > > > > > just wondering if this leaves a (maybe rather artificial?) use
> case
> > > > > > uncovered: If you really try to use the Producer in a completly
> > non-
> > > > > > blocking fashion (with block.on.buffer.full=false, maybe for a
> > > > reactive-
> > > > > > streams adapter?) you would still have to call partitionsFor what
> > may
> > > > > > occasionally be blocking.
> > > > > >
> > > > > > Dont get me wrong, I am happy with this solution but I think the
> > old
> > > > API
> > > > > > was a bit clearer about what you really give into the producer
> and
> > > what
> > > > it
> > > > > > does with it (a partition number (that may be discovered as
> invalid
> > > > later
> > > > > > and throws exceptions) vs a partitioner that let you distribute
> > your
> > > > > > payloads over the available number of partitions). In the new api
> > it
> > > > just
> > > > > > feels a bit more - well - java'ish :).
> > > > > >
> > > > > > Kind regards
> > > > > > Daniel
> > > > > >
> > > > > >
> > > > >
> > > >
> > > > Hi Jay.
> > > >
> > > > Thank you for this clarification, I agree it's perfectly fine to
> > enforce
> > > > the metadata resolution when you start a producer once.
> > > >
> > > > If there would be support for some kind of user defined partitioner
> I'd
> > > > have the following thougts:
> > > >
> > > > - Let the user still be able to optionally choose a partition for a
> > > > ProducerRecord.
> > > > - Let the user optionally provide a _CustomPartitioner_ as
> > KafkaProducer
> > > > ctor
> > > > param or ctor props.
> > > > - Keep the current default partitioners behavior that will just
> prefer
> > > the
> > > > result of the CustomPartitioner over its default strategies (hashing
> > the
> > > > encoded key before round-robin), but AFTER trying the
> > > > ProducerRecord.partition.
> > > >
> > > > The type signature of a CustomPartitioner could look like this:
> > > >
> > > > ```
> > > > public interface CustomPartitioner<K,V> extends Configurable {
> > > >
> > > >     /**
> > > >      * Compute the partition for the given record.
> > > >      *
> > > >      * @param topic     The topic name
> > > >      * @param key       The key to partition on (or null if no key)
> > > >      * @param value     The value to partition on
> > > >      * @param cluster   The current cluster metadata
> > > >      * @returns a partition or {@code null} if this partitioner
> cannot
> > > make
> > > > a useful decision. This will lead to a fallback to the default
> > > partitioning
> > > > behaviour.
> > > >      */
> > > >     public Integer partition(String topic, K key, V value, Cluster
> > > > cluster);
> > > >
> > > > }
> > > > ```
> > > >
> > > > This would:
> > > > - Not introduce any breaking changes to the API
> > > > - Allow users to partition based on their unserialized "business"
> keys
> > or
> > > > Values. This is consistent with the type-parameterized user provided
> > > > serializers.
> > > >
> > > > It might still make the concept of partitioning more complex and thus
> > > > harder to grasp.
> > > >
> > > > An implementation could look like this:
> > > >
> > >
> >
> https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a
> > > > fe60d389f6c (just a raw sketch though).
> > > >
> > > > What do you think?
> > > >
> > > > Kind regards
> > > > Daniel
> > > >
> > > >
> > > >
> > > >
> > >
> >
>

Re: New Producer - Is the configurable partitioner gone?

Posted by Jay Kreps <ja...@gmail.com>.
Interesting, and this was with the new Java client? This sounds like as
much an opportunity for improvement in the code as anything. Would you be
willing to share the details?

-jay

On Sunday, February 22, 2015, Steven Wu <st...@gmail.com> wrote:

> > The low connection partitioner might work for this
> by attempting to reuse recently used nodes whenever possible. That is
> useful in environments with lots and lots of producers where you don't care
> about semantic partitioning.
>
> In one of the perf test, we found that above "sticky" partitioner improved
> batching and reduced cpu util at broker side by 60%. We plan to make it our
> default partitioner.
>
>
>
> On Sun, Feb 22, 2015 at 10:28 AM, Jay Kreps <jay.kreps@gmail.com
> <javascript:;>> wrote:
>
> > Hey Daniel,
> >
> > Yeah I think that would be doable. If you want to pursue it you would
> need
> > to do a quick KIP just to get everyone on the same page since this would
> be
> > a public interface we would have to support over a long time:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> >
> > When we have the details worked out, then it should be a fairly
> > straight-forward patch to make that pluggable.
> >
> > A few comments:
> > - I think we should just make the DefaultPartitioner the default value
> for
> > that configuration, rather than having it be a fall back.
> > - You need to pass in the binary key and value in addition to the java
> > objects. Otherwise any partitioning based on the binary value will
> require
> > reserializing these.
> > - If we add this option we should really ship at least one other useful
> > partitioning strategy. The low connection partitioner might work for this
> > by attempting to reuse recently used nodes whenever possible. That is
> > useful in environments with lots and lots of producers where you don't
> care
> > about semantic partitioning. It would be good to think through if there
> are
> > any other useful partitioning strategies to make sure they would also be
> > doable with the interface we would end up with.
> > - Currently Cluster is not a public class so we'll have to think about
> > whether we want to make that public.
> >
> > -Jay
> >
> >
> > On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener <
> > daniel.wegener@holisticon.de <javascript:;>> wrote:
> >
> > >
> > > Jay Kreps <ja...@...> writes:
> > >
> > > >
> > > > Hey Daniel,
> > > >
> > > > partitionsFor() will block the very first time it sees a new topic
> that
> > > it
> > > > doesn't have metadata for yet. If you want to ensure you don't block
> > even
> > > > that one time, call it prior to your regular usage so it initializes
> > > then.
> > > >
> > > > The rationale for adding a partition in ProducerRecord was that there
> > are
> > > > actually cases where you want to associate a key with the record but
> > not
> > > > use it for partitioning. The rationale for not retaining the
> pluggable
> > > > partitioner was that you could always just use the partition (many
> > people
> > > > dislike the plugin apis and asked for it). Personally, like you, I
> > > > preferred the plugin apis.
> > > >
> > > > We aren't cut off from exposing the existing partitioner usage as a
> > > public
> > > > api that you can override if there is anyone who wants that. I think
> > one
> > > > nice thing about it would be the ability to ship with an alternative
> > > > partitioning strategy that you could enable purely in config. For
> > example
> > > > the old producer had a partitioning strategy that attempted to
> minimize
> > > the
> > > > number of TCP connections for cases where there was no key. 98% of
> > people
> > > > loathed that, but for 2% of people it was useful and this would be a
> > way
> > > to
> > > > still include that behavior for people migrating to the new producer.
> > > >
> > > > -Jay
> > > >
> > > > On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
> > > > daniel.wegener@...> wrote:
> > > >
> > > > > Gwen Shapira <gshapira <at> ...> writes:
> > > > >
> > > > > >
> > > > > > Hi Daniel,
> > > > > >
> > > > > > I think you can still use the same logic you had in the custom
> > > > > partitioner
> > > > > > in the old producer. You just move it to the client that creates
> > the
> > > > > > records.
> > > > > > The reason you don't cache the result of partitionsFor is that
> the
> > > > > producer
> > > > > > should handle the caching for you, so its not necessarily a long
> or
> > > > > > blocking call.
> > > > > >
> > > > > > I see it as a pretty small change to the API. But I'm not sure
> what
> > > drove
> > > > > > the change either.
> > > > > >
> > > > > > Gwen
> > > > > >
> > > > > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > > > > > Daniel.Wegener <at> ...> wrote:
> > > > > >
> > > > > > > Hello Kafka-users!
> > > > > > >
> > > > > > > I am facing a migration from a kind of ( a bit self plumbed)
> > kafka
> > > > > 0.8.1
> > > > > > > producer to the new kafka-clients API. I just recognized, that
> > the
> > > new
> > > > > > > KafkaProducer initializes its own Partitioner that cannot be
> > > changed
> > > > > (final
> > > > > > > field, no ctor-param, no
> > > > > > >
> > Class.forName(config.getPartitionerClassNameFoo()).newInstance()).
> > > Is
> > > > > this
> > > > > > > an intentional change?
> > > > > > > If i understand the new API correctly, one should either
> define a
> > > key
> > > > > for
> > > > > > > a message and let the default Partitioner care that it will be
> > > > > distributed
> > > > > > > over all available partitions or to set an explicit partition
> > > number
> > > > > per
> > > > > > > message that will be written to.
> > > > > > >
> > > > > > > The old API api allowed to create ProducerRecors with a key
> > and/or
> > > a
> > > > > key
> > > > > > > used only for partitioning (but one that is not sent down the
> > wire)
> > > and
> > > > > > > then to provide a custom Partitioner that later could
> distribute
> > > this
> > > > > > > partitioning key over all available partitions when the message
> > is
> > > > > actually
> > > > > > > sent.
> > > > > > >
> > > > > > > The difference in the new procuder API is that we need to know
> > the
> > > > > exact
> > > > > > > number of available partitions before we even create a
> > > ProducerRecord.
> > > > > If
> > > > > > > we dont ensure the correct number of partitions and try to
> send a
> > > > > message
> > > > > > > to a partition that does not exist, the whole message will blow
> > up
> > > > > later
> > > > > > > when the producer tries to send it.
> > > > > > >
> > > > > > > I dont expect the partition count to change that often but the
> > API-
> > > doc
> > > > > > > states that a partitionsFor(String topic) result _should not_
> be
> > > > > cached.
> > > > > > > But I do not really want to check for changed partition counts
> > > before
> > > > > every
> > > > > > > creation of a ProducerRecord. The old pluggable partitioner
> was,
> > > for
> > > > > us,
> > > > > > > especially useful for partition-stickyness by business keys
> (and
> > > thus
> > > > > > > stateful processing stages across multiple topics). This
> ensured
> > > that a
> > > > > > > message that was processed on stage1:partition2 will eventually
> > be
> > > > > > > processed on stageN:partition2. Not very clever in terms of
> > > scalability
> > > > > per
> > > > > > > stage, but it makes reasoning about the message flow alot
> easier.
> > > > > > >
> > > > > > > So for a single ProducerRecord, for my understanding it might
> be
> > > nicer
> > > > > to
> > > > > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or
> > > Java8
> > > > > > > lambda equivalent) instead of the nullable Partition attribute
> > and
> > > > > evaluate
> > > > > > > this function in the producer.
> > > > > > >
> > > > > > > Kind regards
> > > > > > >
> > > > > > >
> > > > > > >
> > -------------------------------------------------------------------
> > > ----
> > > > > -------------
> > > > > > > Daniel Wegener
> > > > > > > Holisticon AG
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > > Cheers for you quick reply Gwen!
> > > > > Good to know that partitionsFor is (most time) fast and
> non-blocking.
> > > Im
> > > > > just wondering if this leaves a (maybe rather artificial?) use case
> > > > > uncovered: If you really try to use the Producer in a completly
> non-
> > > > > blocking fashion (with block.on.buffer.full=false, maybe for a
> > > reactive-
> > > > > streams adapter?) you would still have to call partitionsFor what
> may
> > > > > occasionally be blocking.
> > > > >
> > > > > Dont get me wrong, I am happy with this solution but I think the
> old
> > > API
> > > > > was a bit clearer about what you really give into the producer and
> > what
> > > it
> > > > > does with it (a partition number (that may be discovered as invalid
> > > later
> > > > > and throws exceptions) vs a partitioner that let you distribute
> your
> > > > > payloads over the available number of partitions). In the new api
> it
> > > just
> > > > > feels a bit more - well - java'ish :).
> > > > >
> > > > > Kind regards
> > > > > Daniel
> > > > >
> > > > >
> > > >
> > >
> > > Hi Jay.
> > >
> > > Thank you for this clarification, I agree it's perfectly fine to
> enforce
> > > the metadata resolution when you start a producer once.
> > >
> > > If there would be support for some kind of user defined partitioner I'd
> > > have the following thougts:
> > >
> > > - Let the user still be able to optionally choose a partition for a
> > > ProducerRecord.
> > > - Let the user optionally provide a _CustomPartitioner_ as
> KafkaProducer
> > > ctor
> > > param or ctor props.
> > > - Keep the current default partitioners behavior that will just prefer
> > the
> > > result of the CustomPartitioner over its default strategies (hashing
> the
> > > encoded key before round-robin), but AFTER trying the
> > > ProducerRecord.partition.
> > >
> > > The type signature of a CustomPartitioner could look like this:
> > >
> > > ```
> > > public interface CustomPartitioner<K,V> extends Configurable {
> > >
> > >     /**
> > >      * Compute the partition for the given record.
> > >      *
> > >      * @param topic     The topic name
> > >      * @param key       The key to partition on (or null if no key)
> > >      * @param value     The value to partition on
> > >      * @param cluster   The current cluster metadata
> > >      * @returns a partition or {@code null} if this partitioner cannot
> > make
> > > a useful decision. This will lead to a fallback to the default
> > partitioning
> > > behaviour.
> > >      */
> > >     public Integer partition(String topic, K key, V value, Cluster
> > > cluster);
> > >
> > > }
> > > ```
> > >
> > > This would:
> > > - Not introduce any breaking changes to the API
> > > - Allow users to partition based on their unserialized "business" keys
> or
> > > Values. This is consistent with the type-parameterized user provided
> > > serializers.
> > >
> > > It might still make the concept of partitioning more complex and thus
> > > harder to grasp.
> > >
> > > An implementation could look like this:
> > >
> >
> https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a
> > > fe60d389f6c (just a raw sketch though).
> > >
> > > What do you think?
> > >
> > > Kind regards
> > > Daniel
> > >
> > >
> > >
> > >
> >
>

Re: New Producer - Is the configurable partitioner gone?

Posted by Steven Wu <st...@gmail.com>.
> The low connection partitioner might work for this
by attempting to reuse recently used nodes whenever possible. That is
useful in environments with lots and lots of producers where you don't care
about semantic partitioning.

In one of the perf test, we found that above "sticky" partitioner improved
batching and reduced cpu util at broker side by 60%. We plan to make it our
default partitioner.



On Sun, Feb 22, 2015 at 10:28 AM, Jay Kreps <ja...@gmail.com> wrote:

> Hey Daniel,
>
> Yeah I think that would be doable. If you want to pursue it you would need
> to do a quick KIP just to get everyone on the same page since this would be
> a public interface we would have to support over a long time:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
>
> When we have the details worked out, then it should be a fairly
> straight-forward patch to make that pluggable.
>
> A few comments:
> - I think we should just make the DefaultPartitioner the default value for
> that configuration, rather than having it be a fall back.
> - You need to pass in the binary key and value in addition to the java
> objects. Otherwise any partitioning based on the binary value will require
> reserializing these.
> - If we add this option we should really ship at least one other useful
> partitioning strategy. The low connection partitioner might work for this
> by attempting to reuse recently used nodes whenever possible. That is
> useful in environments with lots and lots of producers where you don't care
> about semantic partitioning. It would be good to think through if there are
> any other useful partitioning strategies to make sure they would also be
> doable with the interface we would end up with.
> - Currently Cluster is not a public class so we'll have to think about
> whether we want to make that public.
>
> -Jay
>
>
> On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener <
> daniel.wegener@holisticon.de> wrote:
>
> >
> > Jay Kreps <ja...@...> writes:
> >
> > >
> > > Hey Daniel,
> > >
> > > partitionsFor() will block the very first time it sees a new topic that
> > it
> > > doesn't have metadata for yet. If you want to ensure you don't block
> even
> > > that one time, call it prior to your regular usage so it initializes
> > then.
> > >
> > > The rationale for adding a partition in ProducerRecord was that there
> are
> > > actually cases where you want to associate a key with the record but
> not
> > > use it for partitioning. The rationale for not retaining the pluggable
> > > partitioner was that you could always just use the partition (many
> people
> > > dislike the plugin apis and asked for it). Personally, like you, I
> > > preferred the plugin apis.
> > >
> > > We aren't cut off from exposing the existing partitioner usage as a
> > public
> > > api that you can override if there is anyone who wants that. I think
> one
> > > nice thing about it would be the ability to ship with an alternative
> > > partitioning strategy that you could enable purely in config. For
> example
> > > the old producer had a partitioning strategy that attempted to minimize
> > the
> > > number of TCP connections for cases where there was no key. 98% of
> people
> > > loathed that, but for 2% of people it was useful and this would be a
> way
> > to
> > > still include that behavior for people migrating to the new producer.
> > >
> > > -Jay
> > >
> > > On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
> > > daniel.wegener@...> wrote:
> > >
> > > > Gwen Shapira <gshapira <at> ...> writes:
> > > >
> > > > >
> > > > > Hi Daniel,
> > > > >
> > > > > I think you can still use the same logic you had in the custom
> > > > partitioner
> > > > > in the old producer. You just move it to the client that creates
> the
> > > > > records.
> > > > > The reason you don't cache the result of partitionsFor is that the
> > > > producer
> > > > > should handle the caching for you, so its not necessarily a long or
> > > > > blocking call.
> > > > >
> > > > > I see it as a pretty small change to the API. But I'm not sure what
> > drove
> > > > > the change either.
> > > > >
> > > > > Gwen
> > > > >
> > > > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > > > > Daniel.Wegener <at> ...> wrote:
> > > > >
> > > > > > Hello Kafka-users!
> > > > > >
> > > > > > I am facing a migration from a kind of ( a bit self plumbed)
> kafka
> > > > 0.8.1
> > > > > > producer to the new kafka-clients API. I just recognized, that
> the
> > new
> > > > > > KafkaProducer initializes its own Partitioner that cannot be
> > changed
> > > > (final
> > > > > > field, no ctor-param, no
> > > > > >
> Class.forName(config.getPartitionerClassNameFoo()).newInstance()).
> > Is
> > > > this
> > > > > > an intentional change?
> > > > > > If i understand the new API correctly, one should either define a
> > key
> > > > for
> > > > > > a message and let the default Partitioner care that it will be
> > > > distributed
> > > > > > over all available partitions or to set an explicit partition
> > number
> > > > per
> > > > > > message that will be written to.
> > > > > >
> > > > > > The old API api allowed to create ProducerRecors with a key
> and/or
> > a
> > > > key
> > > > > > used only for partitioning (but one that is not sent down the
> wire)
> > and
> > > > > > then to provide a custom Partitioner that later could distribute
> > this
> > > > > > partitioning key over all available partitions when the message
> is
> > > > actually
> > > > > > sent.
> > > > > >
> > > > > > The difference in the new procuder API is that we need to know
> the
> > > > exact
> > > > > > number of available partitions before we even create a
> > ProducerRecord.
> > > > If
> > > > > > we dont ensure the correct number of partitions and try to send a
> > > > message
> > > > > > to a partition that does not exist, the whole message will blow
> up
> > > > later
> > > > > > when the producer tries to send it.
> > > > > >
> > > > > > I dont expect the partition count to change that often but the
> API-
> > doc
> > > > > > states that a partitionsFor(String topic) result _should not_ be
> > > > cached.
> > > > > > But I do not really want to check for changed partition counts
> > before
> > > > every
> > > > > > creation of a ProducerRecord. The old pluggable partitioner was,
> > for
> > > > us,
> > > > > > especially useful for partition-stickyness by business keys (and
> > thus
> > > > > > stateful processing stages across multiple topics). This ensured
> > that a
> > > > > > message that was processed on stage1:partition2 will eventually
> be
> > > > > > processed on stageN:partition2. Not very clever in terms of
> > scalability
> > > > per
> > > > > > stage, but it makes reasoning about the message flow alot easier.
> > > > > >
> > > > > > So for a single ProducerRecord, for my understanding it might be
> > nicer
> > > > to
> > > > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or
> > Java8
> > > > > > lambda equivalent) instead of the nullable Partition attribute
> and
> > > > evaluate
> > > > > > this function in the producer.
> > > > > >
> > > > > > Kind regards
> > > > > >
> > > > > >
> > > > > >
> -------------------------------------------------------------------
> > ----
> > > > -------------
> > > > > > Daniel Wegener
> > > > > > Holisticon AG
> > > > > >
> > > > > >
> > > > >
> > > >
> > > > Cheers for you quick reply Gwen!
> > > > Good to know that partitionsFor is (most time) fast and non-blocking.
> > Im
> > > > just wondering if this leaves a (maybe rather artificial?) use case
> > > > uncovered: If you really try to use the Producer in a completly non-
> > > > blocking fashion (with block.on.buffer.full=false, maybe for a
> > reactive-
> > > > streams adapter?) you would still have to call partitionsFor what may
> > > > occasionally be blocking.
> > > >
> > > > Dont get me wrong, I am happy with this solution but I think the old
> > API
> > > > was a bit clearer about what you really give into the producer and
> what
> > it
> > > > does with it (a partition number (that may be discovered as invalid
> > later
> > > > and throws exceptions) vs a partitioner that let you distribute your
> > > > payloads over the available number of partitions). In the new api it
> > just
> > > > feels a bit more - well - java'ish :).
> > > >
> > > > Kind regards
> > > > Daniel
> > > >
> > > >
> > >
> >
> > Hi Jay.
> >
> > Thank you for this clarification, I agree it's perfectly fine to enforce
> > the metadata resolution when you start a producer once.
> >
> > If there would be support for some kind of user defined partitioner I'd
> > have the following thougts:
> >
> > - Let the user still be able to optionally choose a partition for a
> > ProducerRecord.
> > - Let the user optionally provide a _CustomPartitioner_ as KafkaProducer
> > ctor
> > param or ctor props.
> > - Keep the current default partitioners behavior that will just prefer
> the
> > result of the CustomPartitioner over its default strategies (hashing the
> > encoded key before round-robin), but AFTER trying the
> > ProducerRecord.partition.
> >
> > The type signature of a CustomPartitioner could look like this:
> >
> > ```
> > public interface CustomPartitioner<K,V> extends Configurable {
> >
> >     /**
> >      * Compute the partition for the given record.
> >      *
> >      * @param topic     The topic name
> >      * @param key       The key to partition on (or null if no key)
> >      * @param value     The value to partition on
> >      * @param cluster   The current cluster metadata
> >      * @returns a partition or {@code null} if this partitioner cannot
> make
> > a useful decision. This will lead to a fallback to the default
> partitioning
> > behaviour.
> >      */
> >     public Integer partition(String topic, K key, V value, Cluster
> > cluster);
> >
> > }
> > ```
> >
> > This would:
> > - Not introduce any breaking changes to the API
> > - Allow users to partition based on their unserialized "business" keys or
> > Values. This is consistent with the type-parameterized user provided
> > serializers.
> >
> > It might still make the concept of partitioning more complex and thus
> > harder to grasp.
> >
> > An implementation could look like this:
> >
> https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a
> > fe60d389f6c (just a raw sketch though).
> >
> > What do you think?
> >
> > Kind regards
> > Daniel
> >
> >
> >
> >
>

Re: New Producer - Is the configurable partitioner gone?

Posted by Jay Kreps <ja...@gmail.com>.
Hey Daniel,

Yeah I think that would be doable. If you want to pursue it you would need
to do a quick KIP just to get everyone on the same page since this would be
a public interface we would have to support over a long time:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

When we have the details worked out, then it should be a fairly
straight-forward patch to make that pluggable.

A few comments:
- I think we should just make the DefaultPartitioner the default value for
that configuration, rather than having it be a fall back.
- You need to pass in the binary key and value in addition to the java
objects. Otherwise any partitioning based on the binary value will require
reserializing these.
- If we add this option we should really ship at least one other useful
partitioning strategy. The low connection partitioner might work for this
by attempting to reuse recently used nodes whenever possible. That is
useful in environments with lots and lots of producers where you don't care
about semantic partitioning. It would be good to think through if there are
any other useful partitioning strategies to make sure they would also be
doable with the interface we would end up with.
- Currently Cluster is not a public class so we'll have to think about
whether we want to make that public.

-Jay


On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener <
daniel.wegener@holisticon.de> wrote:

>
> Jay Kreps <ja...@...> writes:
>
> >
> > Hey Daniel,
> >
> > partitionsFor() will block the very first time it sees a new topic that
> it
> > doesn't have metadata for yet. If you want to ensure you don't block even
> > that one time, call it prior to your regular usage so it initializes
> then.
> >
> > The rationale for adding a partition in ProducerRecord was that there are
> > actually cases where you want to associate a key with the record but not
> > use it for partitioning. The rationale for not retaining the pluggable
> > partitioner was that you could always just use the partition (many people
> > dislike the plugin apis and asked for it). Personally, like you, I
> > preferred the plugin apis.
> >
> > We aren't cut off from exposing the existing partitioner usage as a
> public
> > api that you can override if there is anyone who wants that. I think one
> > nice thing about it would be the ability to ship with an alternative
> > partitioning strategy that you could enable purely in config. For example
> > the old producer had a partitioning strategy that attempted to minimize
> the
> > number of TCP connections for cases where there was no key. 98% of people
> > loathed that, but for 2% of people it was useful and this would be a way
> to
> > still include that behavior for people migrating to the new producer.
> >
> > -Jay
> >
> > On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
> > daniel.wegener@...> wrote:
> >
> > > Gwen Shapira <gshapira <at> ...> writes:
> > >
> > > >
> > > > Hi Daniel,
> > > >
> > > > I think you can still use the same logic you had in the custom
> > > partitioner
> > > > in the old producer. You just move it to the client that creates the
> > > > records.
> > > > The reason you don't cache the result of partitionsFor is that the
> > > producer
> > > > should handle the caching for you, so its not necessarily a long or
> > > > blocking call.
> > > >
> > > > I see it as a pretty small change to the API. But I'm not sure what
> drove
> > > > the change either.
> > > >
> > > > Gwen
> > > >
> > > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > > > Daniel.Wegener <at> ...> wrote:
> > > >
> > > > > Hello Kafka-users!
> > > > >
> > > > > I am facing a migration from a kind of ( a bit self plumbed) kafka
> > > 0.8.1
> > > > > producer to the new kafka-clients API. I just recognized, that the
> new
> > > > > KafkaProducer initializes its own Partitioner that cannot be
> changed
> > > (final
> > > > > field, no ctor-param, no
> > > > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()).
> Is
> > > this
> > > > > an intentional change?
> > > > > If i understand the new API correctly, one should either define a
> key
> > > for
> > > > > a message and let the default Partitioner care that it will be
> > > distributed
> > > > > over all available partitions or to set an explicit partition
> number
> > > per
> > > > > message that will be written to.
> > > > >
> > > > > The old API api allowed to create ProducerRecors with a key and/or
> a
> > > key
> > > > > used only for partitioning (but one that is not sent down the wire)
> and
> > > > > then to provide a custom Partitioner that later could distribute
> this
> > > > > partitioning key over all available partitions when the message is
> > > actually
> > > > > sent.
> > > > >
> > > > > The difference in the new procuder API is that we need to know the
> > > exact
> > > > > number of available partitions before we even create a
> ProducerRecord.
> > > If
> > > > > we dont ensure the correct number of partitions and try to send a
> > > message
> > > > > to a partition that does not exist, the whole message will blow up
> > > later
> > > > > when the producer tries to send it.
> > > > >
> > > > > I dont expect the partition count to change that often but the API-
> doc
> > > > > states that a partitionsFor(String topic) result _should not_ be
> > > cached.
> > > > > But I do not really want to check for changed partition counts
> before
> > > every
> > > > > creation of a ProducerRecord. The old pluggable partitioner was,
> for
> > > us,
> > > > > especially useful for partition-stickyness by business keys (and
> thus
> > > > > stateful processing stages across multiple topics). This ensured
> that a
> > > > > message that was processed on stage1:partition2 will eventually be
> > > > > processed on stageN:partition2. Not very clever in terms of
> scalability
> > > per
> > > > > stage, but it makes reasoning about the message flow alot easier.
> > > > >
> > > > > So for a single ProducerRecord, for my understanding it might be
> nicer
> > > to
> > > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or
> Java8
> > > > > lambda equivalent) instead of the nullable Partition attribute and
> > > evaluate
> > > > > this function in the producer.
> > > > >
> > > > > Kind regards
> > > > >
> > > > >
> > > > > -------------------------------------------------------------------
> ----
> > > -------------
> > > > > Daniel Wegener
> > > > > Holisticon AG
> > > > >
> > > > >
> > > >
> > >
> > > Cheers for you quick reply Gwen!
> > > Good to know that partitionsFor is (most time) fast and non-blocking.
> Im
> > > just wondering if this leaves a (maybe rather artificial?) use case
> > > uncovered: If you really try to use the Producer in a completly non-
> > > blocking fashion (with block.on.buffer.full=false, maybe for a
> reactive-
> > > streams adapter?) you would still have to call partitionsFor what may
> > > occasionally be blocking.
> > >
> > > Dont get me wrong, I am happy with this solution but I think the old
> API
> > > was a bit clearer about what you really give into the producer and what
> it
> > > does with it (a partition number (that may be discovered as invalid
> later
> > > and throws exceptions) vs a partitioner that let you distribute your
> > > payloads over the available number of partitions). In the new api it
> just
> > > feels a bit more - well - java'ish :).
> > >
> > > Kind regards
> > > Daniel
> > >
> > >
> >
>
> Hi Jay.
>
> Thank you for this clarification, I agree it's perfectly fine to enforce
> the metadata resolution when you start a producer once.
>
> If there would be support for some kind of user defined partitioner I'd
> have the following thougts:
>
> - Let the user still be able to optionally choose a partition for a
> ProducerRecord.
> - Let the user optionally provide a _CustomPartitioner_ as KafkaProducer
> ctor
> param or ctor props.
> - Keep the current default partitioners behavior that will just prefer the
> result of the CustomPartitioner over its default strategies (hashing the
> encoded key before round-robin), but AFTER trying the
> ProducerRecord.partition.
>
> The type signature of a CustomPartitioner could look like this:
>
> ```
> public interface CustomPartitioner<K,V> extends Configurable {
>
>     /**
>      * Compute the partition for the given record.
>      *
>      * @param topic     The topic name
>      * @param key       The key to partition on (or null if no key)
>      * @param value     The value to partition on
>      * @param cluster   The current cluster metadata
>      * @returns a partition or {@code null} if this partitioner cannot make
> a useful decision. This will lead to a fallback to the default partitioning
> behaviour.
>      */
>     public Integer partition(String topic, K key, V value, Cluster
> cluster);
>
> }
> ```
>
> This would:
> - Not introduce any breaking changes to the API
> - Allow users to partition based on their unserialized "business" keys or
> Values. This is consistent with the type-parameterized user provided
> serializers.
>
> It might still make the concept of partitioning more complex and thus
> harder to grasp.
>
> An implementation could look like this:
> https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a
> fe60d389f6c (just a raw sketch though).
>
> What do you think?
>
> Kind regards
> Daniel
>
>
>
>

Re: New Producer - Is the configurable partitioner gone?

Posted by Daniel Wegener <da...@holisticon.de>.
Jay Kreps <ja...@...> writes:

> 
> Hey Daniel,
> 
> partitionsFor() will block the very first time it sees a new topic that 
it
> doesn't have metadata for yet. If you want to ensure you don't block even
> that one time, call it prior to your regular usage so it initializes 
then.
> 
> The rationale for adding a partition in ProducerRecord was that there are
> actually cases where you want to associate a key with the record but not
> use it for partitioning. The rationale for not retaining the pluggable
> partitioner was that you could always just use the partition (many people
> dislike the plugin apis and asked for it). Personally, like you, I
> preferred the plugin apis.
> 
> We aren't cut off from exposing the existing partitioner usage as a 
public
> api that you can override if there is anyone who wants that. I think one
> nice thing about it would be the ability to ship with an alternative
> partitioning strategy that you could enable purely in config. For example
> the old producer had a partitioning strategy that attempted to minimize 
the
> number of TCP connections for cases where there was no key. 98% of people
> loathed that, but for 2% of people it was useful and this would be a way 
to
> still include that behavior for people migrating to the new producer.
> 
> -Jay
> 
> On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
> daniel.wegener@...> wrote:
> 
> > Gwen Shapira <gshapira <at> ...> writes:
> >
> > >
> > > Hi Daniel,
> > >
> > > I think you can still use the same logic you had in the custom
> > partitioner
> > > in the old producer. You just move it to the client that creates the
> > > records.
> > > The reason you don't cache the result of partitionsFor is that the
> > producer
> > > should handle the caching for you, so its not necessarily a long or
> > > blocking call.
> > >
> > > I see it as a pretty small change to the API. But I'm not sure what 
drove
> > > the change either.
> > >
> > > Gwen
> > >
> > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > > Daniel.Wegener <at> ...> wrote:
> > >
> > > > Hello Kafka-users!
> > > >
> > > > I am facing a migration from a kind of ( a bit self plumbed) kafka
> > 0.8.1
> > > > producer to the new kafka-clients API. I just recognized, that the 
new
> > > > KafkaProducer initializes its own Partitioner that cannot be 
changed
> > (final
> > > > field, no ctor-param, no
> > > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()). 
Is
> > this
> > > > an intentional change?
> > > > If i understand the new API correctly, one should either define a 
key
> > for
> > > > a message and let the default Partitioner care that it will be
> > distributed
> > > > over all available partitions or to set an explicit partition 
number
> > per
> > > > message that will be written to.
> > > >
> > > > The old API api allowed to create ProducerRecors with a key and/or 
a
> > key
> > > > used only for partitioning (but one that is not sent down the wire) 
and
> > > > then to provide a custom Partitioner that later could distribute 
this
> > > > partitioning key over all available partitions when the message is
> > actually
> > > > sent.
> > > >
> > > > The difference in the new procuder API is that we need to know the
> > exact
> > > > number of available partitions before we even create a 
ProducerRecord.
> > If
> > > > we dont ensure the correct number of partitions and try to send a
> > message
> > > > to a partition that does not exist, the whole message will blow up
> > later
> > > > when the producer tries to send it.
> > > >
> > > > I dont expect the partition count to change that often but the API-
doc
> > > > states that a partitionsFor(String topic) result _should not_ be
> > cached.
> > > > But I do not really want to check for changed partition counts 
before
> > every
> > > > creation of a ProducerRecord. The old pluggable partitioner was, 
for
> > us,
> > > > especially useful for partition-stickyness by business keys (and 
thus
> > > > stateful processing stages across multiple topics). This ensured 
that a
> > > > message that was processed on stage1:partition2 will eventually be
> > > > processed on stageN:partition2. Not very clever in terms of 
scalability
> > per
> > > > stage, but it makes reasoning about the message flow alot easier.
> > > >
> > > > So for a single ProducerRecord, for my understanding it might be 
nicer
> > to
> > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or 
Java8
> > > > lambda equivalent) instead of the nullable Partition attribute and
> > evaluate
> > > > this function in the producer.
> > > >
> > > > Kind regards
> > > >
> > > >
> > > > -------------------------------------------------------------------
----
> > -------------
> > > > Daniel Wegener
> > > > Holisticon AG
> > > >
> > > >
> > >
> >
> > Cheers for you quick reply Gwen!
> > Good to know that partitionsFor is (most time) fast and non-blocking. 
Im
> > just wondering if this leaves a (maybe rather artificial?) use case
> > uncovered: If you really try to use the Producer in a completly non-
> > blocking fashion (with block.on.buffer.full=false, maybe for a 
reactive-
> > streams adapter?) you would still have to call partitionsFor what may
> > occasionally be blocking.
> >
> > Dont get me wrong, I am happy with this solution but I think the old 
API
> > was a bit clearer about what you really give into the producer and what 
it
> > does with it (a partition number (that may be discovered as invalid 
later
> > and throws exceptions) vs a partitioner that let you distribute your
> > payloads over the available number of partitions). In the new api it 
just
> > feels a bit more - well - java'ish :).
> >
> > Kind regards
> > Daniel
> >
> >
> 

Hi Jay.

Thank you for this clarification, I agree it's perfectly fine to enforce 
the metadata resolution when you start a producer once.

If there would be support for some kind of user defined partitioner I'd 
have the following thougts:

- Let the user still be able to optionally choose a partition for a 
ProducerRecord.
- Let the user optionally provide a _CustomPartitioner_ as KafkaProducer 
ctor 
param or ctor props.
- Keep the current default partitioners behavior that will just prefer the 
result of the CustomPartitioner over its default strategies (hashing the 
encoded key before round-robin), but AFTER trying the 
ProducerRecord.partition.

The type signature of a CustomPartitioner could look like this:

```
public interface CustomPartitioner<K,V> extends Configurable {

    /**
     * Compute the partition for the given record.
     *
     * @param topic     The topic name
     * @param key       The key to partition on (or null if no key)
     * @param value     The value to partition on
     * @param cluster   The current cluster metadata
     * @returns a partition or {@code null} if this partitioner cannot make 
a useful decision. This will lead to a fallback to the default partitioning 
behaviour.
     */
    public Integer partition(String topic, K key, V value, Cluster 
cluster);

}
```

This would:
- Not introduce any breaking changes to the API
- Allow users to partition based on their unserialized "business" keys or 
Values. This is consistent with the type-parameterized user provided 
serializers.

It might still make the concept of partitioning more complex and thus 
harder to grasp.

An implementation could look like this: 
https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a
fe60d389f6c (just a raw sketch though).

What do you think?

Kind regards
Daniel




Re: New Producer - Is the configurable partitioner gone?

Posted by Jay Kreps <ja...@gmail.com>.
Hey Daniel,

partitionsFor() will block the very first time it sees a new topic that it
doesn't have metadata for yet. If you want to ensure you don't block even
that one time, call it prior to your regular usage so it initializes then.

The rationale for adding a partition in ProducerRecord was that there are
actually cases where you want to associate a key with the record but not
use it for partitioning. The rationale for not retaining the pluggable
partitioner was that you could always just use the partition (many people
dislike the plugin apis and asked for it). Personally, like you, I
preferred the plugin apis.

We aren't cut off from exposing the existing partitioner usage as a public
api that you can override if there is anyone who wants that. I think one
nice thing about it would be the ability to ship with an alternative
partitioning strategy that you could enable purely in config. For example
the old producer had a partitioning strategy that attempted to minimize the
number of TCP connections for cases where there was no key. 98% of people
loathed that, but for 2% of people it was useful and this would be a way to
still include that behavior for people migrating to the new producer.

-Jay

On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
daniel.wegener@holisticon.de> wrote:

> Gwen Shapira <gs...@...> writes:
>
> >
> > Hi Daniel,
> >
> > I think you can still use the same logic you had in the custom
> partitioner
> > in the old producer. You just move it to the client that creates the
> > records.
> > The reason you don't cache the result of partitionsFor is that the
> producer
> > should handle the caching for you, so its not necessarily a long or
> > blocking call.
> >
> > I see it as a pretty small change to the API. But I'm not sure what drove
> > the change either.
> >
> > Gwen
> >
> > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > Daniel.Wegener@...> wrote:
> >
> > > Hello Kafka-users!
> > >
> > > I am facing a migration from a kind of ( a bit self plumbed) kafka
> 0.8.1
> > > producer to the new kafka-clients API. I just recognized, that the new
> > > KafkaProducer initializes its own Partitioner that cannot be changed
> (final
> > > field, no ctor-param, no
> > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()). Is
> this
> > > an intentional change?
> > > If i understand the new API correctly, one should either define a key
> for
> > > a message and let the default Partitioner care that it will be
> distributed
> > > over all available partitions or to set an explicit partition number
> per
> > > message that will be written to.
> > >
> > > The old API api allowed to create ProducerRecors with a key and/or a
> key
> > > used only for partitioning (but one that is not sent down the wire) and
> > > then to provide a custom Partitioner that later could distribute this
> > > partitioning key over all available partitions when the message is
> actually
> > > sent.
> > >
> > > The difference in the new procuder API is that we need to know the
> exact
> > > number of available partitions before we even create a ProducerRecord.
> If
> > > we dont ensure the correct number of partitions and try to send a
> message
> > > to a partition that does not exist, the whole message will blow up
> later
> > > when the producer tries to send it.
> > >
> > > I dont expect the partition count to change that often but the API-doc
> > > states that a partitionsFor(String topic) result _should not_ be
> cached.
> > > But I do not really want to check for changed partition counts before
> every
> > > creation of a ProducerRecord. The old pluggable partitioner was, for
> us,
> > > especially useful for partition-stickyness by business keys (and thus
> > > stateful processing stages across multiple topics). This ensured that a
> > > message that was processed on stage1:partition2 will eventually be
> > > processed on stageN:partition2. Not very clever in terms of scalability
> per
> > > stage, but it makes reasoning about the message flow alot easier.
> > >
> > > So for a single ProducerRecord, for my understanding it might be nicer
> to
> > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or Java8
> > > lambda equivalent) instead of the nullable Partition attribute and
> evaluate
> > > this function in the producer.
> > >
> > > Kind regards
> > >
> > >
> > > -----------------------------------------------------------------------
> -------------
> > > Daniel Wegener
> > > Holisticon AG
> > >
> > >
> >
>
> Cheers for you quick reply Gwen!
> Good to know that partitionsFor is (most time) fast and non-blocking. Im
> just wondering if this leaves a (maybe rather artificial?) use case
> uncovered: If you really try to use the Producer in a completly non-
> blocking fashion (with block.on.buffer.full=false, maybe for a reactive-
> streams adapter?) you would still have to call partitionsFor what may
> occasionally be blocking.
>
> Dont get me wrong, I am happy with this solution but I think the old API
> was a bit clearer about what you really give into the producer and what it
> does with it (a partition number (that may be discovered as invalid later
> and throws exceptions) vs a partitioner that let you distribute your
> payloads over the available number of partitions). In the new api it just
> feels a bit more - well - java'ish :).
>
> Kind regards
> Daniel
>
>

Re: New Producer - Is the configurable partitioner gone?

Posted by Daniel Wegener <da...@holisticon.de>.
Gwen Shapira <gs...@...> writes:

> 
> Hi Daniel,
> 
> I think you can still use the same logic you had in the custom 
partitioner
> in the old producer. You just move it to the client that creates the
> records.
> The reason you don't cache the result of partitionsFor is that the 
producer
> should handle the caching for you, so its not necessarily a long or
> blocking call.
> 
> I see it as a pretty small change to the API. But I'm not sure what drove
> the change either.
> 
> Gwen
> 
> On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> Daniel.Wegener@...> wrote:
> 
> > Hello Kafka-users!
> >
> > I am facing a migration from a kind of ( a bit self plumbed) kafka 
0.8.1
> > producer to the new kafka-clients API. I just recognized, that the new
> > KafkaProducer initializes its own Partitioner that cannot be changed 
(final
> > field, no ctor-param, no
> > Class.forName(config.getPartitionerClassNameFoo()).newInstance()). Is 
this
> > an intentional change?
> > If i understand the new API correctly, one should either define a key 
for
> > a message and let the default Partitioner care that it will be 
distributed
> > over all available partitions or to set an explicit partition number 
per
> > message that will be written to.
> >
> > The old API api allowed to create ProducerRecors with a key and/or a 
key
> > used only for partitioning (but one that is not sent down the wire) and
> > then to provide a custom Partitioner that later could distribute this
> > partitioning key over all available partitions when the message is 
actually
> > sent.
> >
> > The difference in the new procuder API is that we need to know the 
exact
> > number of available partitions before we even create a ProducerRecord. 
If
> > we dont ensure the correct number of partitions and try to send a 
message
> > to a partition that does not exist, the whole message will blow up 
later
> > when the producer tries to send it.
> >
> > I dont expect the partition count to change that often but the API-doc
> > states that a partitionsFor(String topic) result _should not_ be 
cached.
> > But I do not really want to check for changed partition counts before 
every
> > creation of a ProducerRecord. The old pluggable partitioner was, for 
us,
> > especially useful for partition-stickyness by business keys (and thus
> > stateful processing stages across multiple topics). This ensured that a
> > message that was processed on stage1:partition2 will eventually be
> > processed on stageN:partition2. Not very clever in terms of scalability 
per
> > stage, but it makes reasoning about the message flow alot easier.
> >
> > So for a single ProducerRecord, for my understanding it might be nicer 
to
> > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or Java8
> > lambda equivalent) instead of the nullable Partition attribute and 
evaluate
> > this function in the producer.
> >
> > Kind regards
> >
> >
> > -----------------------------------------------------------------------
-------------
> > Daniel Wegener
> > Holisticon AG
> >
> >
> 

Cheers for you quick reply Gwen!
Good to know that partitionsFor is (most time) fast and non-blocking. Im 
just wondering if this leaves a (maybe rather artificial?) use case 
uncovered: If you really try to use the Producer in a completly non-
blocking fashion (with block.on.buffer.full=false, maybe for a reactive-
streams adapter?) you would still have to call partitionsFor what may 
occasionally be blocking.

Dont get me wrong, I am happy with this solution but I think the old API 
was a bit clearer about what you really give into the producer and what it 
does with it (a partition number (that may be discovered as invalid later 
and throws exceptions) vs a partitioner that let you distribute your 
payloads over the available number of partitions). In the new api it just 
feels a bit more - well - java'ish :).

Kind regards
Daniel


Re: New Producer - Is the configurable partitioner gone?

Posted by Gwen Shapira <gs...@cloudera.com>.
Hi Daniel,

I think you can still use the same logic you had in the custom partitioner
in the old producer. You just move it to the client that creates the
records.
The reason you don't cache the result of partitionsFor is that the producer
should handle the caching for you, so its not necessarily a long or
blocking call.

I see it as a pretty small change to the API. But I'm not sure what drove
the change either.

Gwen

On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
Daniel.Wegener@holisticon.de> wrote:

> Hello Kafka-users!
>
> I am facing a migration from a kind of ( a bit self plumbed) kafka 0.8.1
> producer to the new kafka-clients API. I just recognized, that the new
> KafkaProducer initializes its own Partitioner that cannot be changed (final
> field, no ctor-param, no
> Class.forName(config.getPartitionerClassNameFoo()).newInstance()). Is this
> an intentional change?
> If i understand the new API correctly, one should either define a key for
> a message and let the default Partitioner care that it will be distributed
> over all available partitions or to set an explicit partition number per
> message that will be written to.
>
> The old API api allowed to create ProducerRecors with a key and/or a key
> used only for partitioning (but one that is not sent down the wire) and
> then to provide a custom Partitioner that later could distribute this
> partitioning key over all available partitions when the message is actually
> sent.
>
> The difference in the new procuder API is that we need to know the exact
> number of available partitions before we even create a ProducerRecord. If
> we dont ensure the correct number of partitions and try to send a message
> to a partition that does not exist, the whole message will blow up later
> when the producer tries to send it.
>
> I dont expect the partition count to change that often but the API-doc
> states that a partitionsFor(String topic) result _should not_ be cached.
> But I do not really want to check for changed partition counts before every
> creation of a ProducerRecord. The old pluggable partitioner was, for us,
> especially useful for partition-stickyness by business keys (and thus
> stateful processing stages across multiple topics). This ensured that a
> message that was processed on stage1:partition2 will eventually be
> processed on stageN:partition2. Not very clever in terms of scalability per
> stage, but it makes reasoning about the message flow alot easier.
>
> So for a single ProducerRecord, for my understanding it might be nicer to
> have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or Java8
> lambda equivalent) instead of the nullable Partition attribute and evaluate
> this function in the producer.
>
> Kind regards
>
>
> ------------------------------------------------------------------------------------
> Daniel Wegener
> Holisticon AG
>
>