You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Daniel Wegener <Da...@holisticon.de> on 2015/02/20 18:19:30 UTC
New Producer - Is the configurable partitioner gone?
Hello Kafka-users!
I am facing a migration from a kind of ( a bit self plumbed) kafka 0.8.1 producer to the new kafka-clients API. I just recognized, that the new KafkaProducer initializes its own Partitioner that cannot be changed (final field, no ctor-param, no Class.forName(config.getPartitionerClassNameFoo()).newInstance()). Is this an intentional change?
If i understand the new API correctly, one should either define a key for a message and let the default Partitioner care that it will be distributed over all available partitions or to set an explicit partition number per message that will be written to.
The old API api allowed to create ProducerRecors with a key and/or a key used only for partitioning (but one that is not sent down the wire) and then to provide a custom Partitioner that later could distribute this partitioning key over all available partitions when the message is actually sent.
The difference in the new procuder API is that we need to know the exact number of available partitions before we even create a ProducerRecord. If we dont ensure the correct number of partitions and try to send a message to a partition that does not exist, the whole message will blow up later when the producer tries to send it.
I dont expect the partition count to change that often but the API-doc states that a partitionsFor(String topic) result _should not_ be cached. But I do not really want to check for changed partition counts before every creation of a ProducerRecord. The old pluggable partitioner was, for us, especially useful for partition-stickyness by business keys (and thus stateful processing stages across multiple topics). This ensured that a message that was processed on stage1:partition2 will eventually be processed on stageN:partition2. Not very clever in terms of scalability per stage, but it makes reasoning about the message flow alot easier.
So for a single ProducerRecord, for my understanding it might be nicer to have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or Java8 lambda equivalent) instead of the nullable Partition attribute and evaluate this function in the producer.
Kind regards
------------------------------------------------------------------------------------
Daniel Wegener
Holisticon AG
Re: New Producer - Is the configurable partitioner gone?
Posted by Steven Wu <st...@gmail.com>.
yes. this is with the new java client. since it is using non-blocking NIO,
sender thread probably was able to scan the buffer very frequently. hence
random partitioner won't get much chance to accumulate records for batch or
request.
Setup
* - 3 broker instances (m1.xlarge)- 6 producer instances (m1.xlarge)- topic
partitions: 36- message size: 1 KB- no compression- traffic volume- total:
30 MB / 30K msgs,- per broker: 10 MB / 10K msgs*Summary
partitioner
batched records per request
broker cpu util
random without lingering
1.25
75%
sticky without lingering
2.0
50%
sticky with 100ms lingering
15
33%
there are two ways to improve batching
1.
use sticky partitioner that we implement. kafka default is random
partitioner, where a random partition is selected for each msg. with sticky
partitioner, we can stick all msgs (to one topic) on the same partition for
a while (e.g. 1 second) before moving on to next partition.
2.
set "linger.ms" property from kafka producer. it allows message to
linger around for some period and hope for batching opportunity.
We can deploy one or both methods. But the main point is that improved
batching helps broker a lot.
“linger.ms” can cause risk of filling up the buffer. it works very well
with sticky partitioner because it is very fast to accumulate a full batch.
On Sun, Feb 22, 2015 at 5:21 PM, Jay Kreps <ja...@gmail.com> wrote:
> Interesting, and this was with the new Java client? This sounds like as
> much an opportunity for improvement in the code as anything. Would you be
> willing to share the details?
>
> -jay
>
> On Sunday, February 22, 2015, Steven Wu <st...@gmail.com> wrote:
>
> > > The low connection partitioner might work for this
> > by attempting to reuse recently used nodes whenever possible. That is
> > useful in environments with lots and lots of producers where you don't
> care
> > about semantic partitioning.
> >
> > In one of the perf test, we found that above "sticky" partitioner
> improved
> > batching and reduced cpu util at broker side by 60%. We plan to make it
> our
> > default partitioner.
> >
> >
> >
> > On Sun, Feb 22, 2015 at 10:28 AM, Jay Kreps <jay.kreps@gmail.com
> > <javascript:;>> wrote:
> >
> > > Hey Daniel,
> > >
> > > Yeah I think that would be doable. If you want to pursue it you would
> > need
> > > to do a quick KIP just to get everyone on the same page since this
> would
> > be
> > > a public interface we would have to support over a long time:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> > >
> > > When we have the details worked out, then it should be a fairly
> > > straight-forward patch to make that pluggable.
> > >
> > > A few comments:
> > > - I think we should just make the DefaultPartitioner the default value
> > for
> > > that configuration, rather than having it be a fall back.
> > > - You need to pass in the binary key and value in addition to the java
> > > objects. Otherwise any partitioning based on the binary value will
> > require
> > > reserializing these.
> > > - If we add this option we should really ship at least one other useful
> > > partitioning strategy. The low connection partitioner might work for
> this
> > > by attempting to reuse recently used nodes whenever possible. That is
> > > useful in environments with lots and lots of producers where you don't
> > care
> > > about semantic partitioning. It would be good to think through if there
> > are
> > > any other useful partitioning strategies to make sure they would also
> be
> > > doable with the interface we would end up with.
> > > - Currently Cluster is not a public class so we'll have to think about
> > > whether we want to make that public.
> > >
> > > -Jay
> > >
> > >
> > > On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener <
> > > daniel.wegener@holisticon.de <javascript:;>> wrote:
> > >
> > > >
> > > > Jay Kreps <ja...@...> writes:
> > > >
> > > > >
> > > > > Hey Daniel,
> > > > >
> > > > > partitionsFor() will block the very first time it sees a new topic
> > that
> > > > it
> > > > > doesn't have metadata for yet. If you want to ensure you don't
> block
> > > even
> > > > > that one time, call it prior to your regular usage so it
> initializes
> > > > then.
> > > > >
> > > > > The rationale for adding a partition in ProducerRecord was that
> there
> > > are
> > > > > actually cases where you want to associate a key with the record
> but
> > > not
> > > > > use it for partitioning. The rationale for not retaining the
> > pluggable
> > > > > partitioner was that you could always just use the partition (many
> > > people
> > > > > dislike the plugin apis and asked for it). Personally, like you, I
> > > > > preferred the plugin apis.
> > > > >
> > > > > We aren't cut off from exposing the existing partitioner usage as a
> > > > public
> > > > > api that you can override if there is anyone who wants that. I
> think
> > > one
> > > > > nice thing about it would be the ability to ship with an
> alternative
> > > > > partitioning strategy that you could enable purely in config. For
> > > example
> > > > > the old producer had a partitioning strategy that attempted to
> > minimize
> > > > the
> > > > > number of TCP connections for cases where there was no key. 98% of
> > > people
> > > > > loathed that, but for 2% of people it was useful and this would be
> a
> > > way
> > > > to
> > > > > still include that behavior for people migrating to the new
> producer.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
> > > > > daniel.wegener@...> wrote:
> > > > >
> > > > > > Gwen Shapira <gshapira <at> ...> writes:
> > > > > >
> > > > > > >
> > > > > > > Hi Daniel,
> > > > > > >
> > > > > > > I think you can still use the same logic you had in the custom
> > > > > > partitioner
> > > > > > > in the old producer. You just move it to the client that
> creates
> > > the
> > > > > > > records.
> > > > > > > The reason you don't cache the result of partitionsFor is that
> > the
> > > > > > producer
> > > > > > > should handle the caching for you, so its not necessarily a
> long
> > or
> > > > > > > blocking call.
> > > > > > >
> > > > > > > I see it as a pretty small change to the API. But I'm not sure
> > what
> > > > drove
> > > > > > > the change either.
> > > > > > >
> > > > > > > Gwen
> > > > > > >
> > > > > > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > > > > > > Daniel.Wegener <at> ...> wrote:
> > > > > > >
> > > > > > > > Hello Kafka-users!
> > > > > > > >
> > > > > > > > I am facing a migration from a kind of ( a bit self plumbed)
> > > kafka
> > > > > > 0.8.1
> > > > > > > > producer to the new kafka-clients API. I just recognized,
> that
> > > the
> > > > new
> > > > > > > > KafkaProducer initializes its own Partitioner that cannot be
> > > > changed
> > > > > > (final
> > > > > > > > field, no ctor-param, no
> > > > > > > >
> > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()).
> > > > Is
> > > > > > this
> > > > > > > > an intentional change?
> > > > > > > > If i understand the new API correctly, one should either
> > define a
> > > > key
> > > > > > for
> > > > > > > > a message and let the default Partitioner care that it will
> be
> > > > > > distributed
> > > > > > > > over all available partitions or to set an explicit partition
> > > > number
> > > > > > per
> > > > > > > > message that will be written to.
> > > > > > > >
> > > > > > > > The old API api allowed to create ProducerRecors with a key
> > > and/or
> > > > a
> > > > > > key
> > > > > > > > used only for partitioning (but one that is not sent down the
> > > wire)
> > > > and
> > > > > > > > then to provide a custom Partitioner that later could
> > distribute
> > > > this
> > > > > > > > partitioning key over all available partitions when the
> message
> > > is
> > > > > > actually
> > > > > > > > sent.
> > > > > > > >
> > > > > > > > The difference in the new procuder API is that we need to
> know
> > > the
> > > > > > exact
> > > > > > > > number of available partitions before we even create a
> > > > ProducerRecord.
> > > > > > If
> > > > > > > > we dont ensure the correct number of partitions and try to
> > send a
> > > > > > message
> > > > > > > > to a partition that does not exist, the whole message will
> blow
> > > up
> > > > > > later
> > > > > > > > when the producer tries to send it.
> > > > > > > >
> > > > > > > > I dont expect the partition count to change that often but
> the
> > > API-
> > > > doc
> > > > > > > > states that a partitionsFor(String topic) result _should not_
> > be
> > > > > > cached.
> > > > > > > > But I do not really want to check for changed partition
> counts
> > > > before
> > > > > > every
> > > > > > > > creation of a ProducerRecord. The old pluggable partitioner
> > was,
> > > > for
> > > > > > us,
> > > > > > > > especially useful for partition-stickyness by business keys
> > (and
> > > > thus
> > > > > > > > stateful processing stages across multiple topics). This
> > ensured
> > > > that a
> > > > > > > > message that was processed on stage1:partition2 will
> eventually
> > > be
> > > > > > > > processed on stageN:partition2. Not very clever in terms of
> > > > scalability
> > > > > > per
> > > > > > > > stage, but it makes reasoning about the message flow alot
> > easier.
> > > > > > > >
> > > > > > > > So for a single ProducerRecord, for my understanding it might
> > be
> > > > nicer
> > > > > > to
> > > > > > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int
> (or
> > > > Java8
> > > > > > > > lambda equivalent) instead of the nullable Partition
> attribute
> > > and
> > > > > > evaluate
> > > > > > > > this function in the producer.
> > > > > > > >
> > > > > > > > Kind regards
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > -------------------------------------------------------------------
> > > > ----
> > > > > > -------------
> > > > > > > > Daniel Wegener
> > > > > > > > Holisticon AG
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > > Cheers for you quick reply Gwen!
> > > > > > Good to know that partitionsFor is (most time) fast and
> > non-blocking.
> > > > Im
> > > > > > just wondering if this leaves a (maybe rather artificial?) use
> case
> > > > > > uncovered: If you really try to use the Producer in a completly
> > non-
> > > > > > blocking fashion (with block.on.buffer.full=false, maybe for a
> > > > reactive-
> > > > > > streams adapter?) you would still have to call partitionsFor what
> > may
> > > > > > occasionally be blocking.
> > > > > >
> > > > > > Dont get me wrong, I am happy with this solution but I think the
> > old
> > > > API
> > > > > > was a bit clearer about what you really give into the producer
> and
> > > what
> > > > it
> > > > > > does with it (a partition number (that may be discovered as
> invalid
> > > > later
> > > > > > and throws exceptions) vs a partitioner that let you distribute
> > your
> > > > > > payloads over the available number of partitions). In the new api
> > it
> > > > just
> > > > > > feels a bit more - well - java'ish :).
> > > > > >
> > > > > > Kind regards
> > > > > > Daniel
> > > > > >
> > > > > >
> > > > >
> > > >
> > > > Hi Jay.
> > > >
> > > > Thank you for this clarification, I agree it's perfectly fine to
> > enforce
> > > > the metadata resolution when you start a producer once.
> > > >
> > > > If there would be support for some kind of user defined partitioner
> I'd
> > > > have the following thougts:
> > > >
> > > > - Let the user still be able to optionally choose a partition for a
> > > > ProducerRecord.
> > > > - Let the user optionally provide a _CustomPartitioner_ as
> > KafkaProducer
> > > > ctor
> > > > param or ctor props.
> > > > - Keep the current default partitioners behavior that will just
> prefer
> > > the
> > > > result of the CustomPartitioner over its default strategies (hashing
> > the
> > > > encoded key before round-robin), but AFTER trying the
> > > > ProducerRecord.partition.
> > > >
> > > > The type signature of a CustomPartitioner could look like this:
> > > >
> > > > ```
> > > > public interface CustomPartitioner<K,V> extends Configurable {
> > > >
> > > > /**
> > > > * Compute the partition for the given record.
> > > > *
> > > > * @param topic The topic name
> > > > * @param key The key to partition on (or null if no key)
> > > > * @param value The value to partition on
> > > > * @param cluster The current cluster metadata
> > > > * @returns a partition or {@code null} if this partitioner
> cannot
> > > make
> > > > a useful decision. This will lead to a fallback to the default
> > > partitioning
> > > > behaviour.
> > > > */
> > > > public Integer partition(String topic, K key, V value, Cluster
> > > > cluster);
> > > >
> > > > }
> > > > ```
> > > >
> > > > This would:
> > > > - Not introduce any breaking changes to the API
> > > > - Allow users to partition based on their unserialized "business"
> keys
> > or
> > > > Values. This is consistent with the type-parameterized user provided
> > > > serializers.
> > > >
> > > > It might still make the concept of partitioning more complex and thus
> > > > harder to grasp.
> > > >
> > > > An implementation could look like this:
> > > >
> > >
> >
> https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a
> > > > fe60d389f6c (just a raw sketch though).
> > > >
> > > > What do you think?
> > > >
> > > > Kind regards
> > > > Daniel
> > > >
> > > >
> > > >
> > > >
> > >
> >
>
Re: New Producer - Is the configurable partitioner gone?
Posted by Jay Kreps <ja...@gmail.com>.
Interesting, and this was with the new Java client? This sounds like as
much an opportunity for improvement in the code as anything. Would you be
willing to share the details?
-jay
On Sunday, February 22, 2015, Steven Wu <st...@gmail.com> wrote:
> > The low connection partitioner might work for this
> by attempting to reuse recently used nodes whenever possible. That is
> useful in environments with lots and lots of producers where you don't care
> about semantic partitioning.
>
> In one of the perf test, we found that above "sticky" partitioner improved
> batching and reduced cpu util at broker side by 60%. We plan to make it our
> default partitioner.
>
>
>
> On Sun, Feb 22, 2015 at 10:28 AM, Jay Kreps <jay.kreps@gmail.com
> <javascript:;>> wrote:
>
> > Hey Daniel,
> >
> > Yeah I think that would be doable. If you want to pursue it you would
> need
> > to do a quick KIP just to get everyone on the same page since this would
> be
> > a public interface we would have to support over a long time:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> >
> > When we have the details worked out, then it should be a fairly
> > straight-forward patch to make that pluggable.
> >
> > A few comments:
> > - I think we should just make the DefaultPartitioner the default value
> for
> > that configuration, rather than having it be a fall back.
> > - You need to pass in the binary key and value in addition to the java
> > objects. Otherwise any partitioning based on the binary value will
> require
> > reserializing these.
> > - If we add this option we should really ship at least one other useful
> > partitioning strategy. The low connection partitioner might work for this
> > by attempting to reuse recently used nodes whenever possible. That is
> > useful in environments with lots and lots of producers where you don't
> care
> > about semantic partitioning. It would be good to think through if there
> are
> > any other useful partitioning strategies to make sure they would also be
> > doable with the interface we would end up with.
> > - Currently Cluster is not a public class so we'll have to think about
> > whether we want to make that public.
> >
> > -Jay
> >
> >
> > On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener <
> > daniel.wegener@holisticon.de <javascript:;>> wrote:
> >
> > >
> > > Jay Kreps <ja...@...> writes:
> > >
> > > >
> > > > Hey Daniel,
> > > >
> > > > partitionsFor() will block the very first time it sees a new topic
> that
> > > it
> > > > doesn't have metadata for yet. If you want to ensure you don't block
> > even
> > > > that one time, call it prior to your regular usage so it initializes
> > > then.
> > > >
> > > > The rationale for adding a partition in ProducerRecord was that there
> > are
> > > > actually cases where you want to associate a key with the record but
> > not
> > > > use it for partitioning. The rationale for not retaining the
> pluggable
> > > > partitioner was that you could always just use the partition (many
> > people
> > > > dislike the plugin apis and asked for it). Personally, like you, I
> > > > preferred the plugin apis.
> > > >
> > > > We aren't cut off from exposing the existing partitioner usage as a
> > > public
> > > > api that you can override if there is anyone who wants that. I think
> > one
> > > > nice thing about it would be the ability to ship with an alternative
> > > > partitioning strategy that you could enable purely in config. For
> > example
> > > > the old producer had a partitioning strategy that attempted to
> minimize
> > > the
> > > > number of TCP connections for cases where there was no key. 98% of
> > people
> > > > loathed that, but for 2% of people it was useful and this would be a
> > way
> > > to
> > > > still include that behavior for people migrating to the new producer.
> > > >
> > > > -Jay
> > > >
> > > > On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
> > > > daniel.wegener@...> wrote:
> > > >
> > > > > Gwen Shapira <gshapira <at> ...> writes:
> > > > >
> > > > > >
> > > > > > Hi Daniel,
> > > > > >
> > > > > > I think you can still use the same logic you had in the custom
> > > > > partitioner
> > > > > > in the old producer. You just move it to the client that creates
> > the
> > > > > > records.
> > > > > > The reason you don't cache the result of partitionsFor is that
> the
> > > > > producer
> > > > > > should handle the caching for you, so its not necessarily a long
> or
> > > > > > blocking call.
> > > > > >
> > > > > > I see it as a pretty small change to the API. But I'm not sure
> what
> > > drove
> > > > > > the change either.
> > > > > >
> > > > > > Gwen
> > > > > >
> > > > > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > > > > > Daniel.Wegener <at> ...> wrote:
> > > > > >
> > > > > > > Hello Kafka-users!
> > > > > > >
> > > > > > > I am facing a migration from a kind of ( a bit self plumbed)
> > kafka
> > > > > 0.8.1
> > > > > > > producer to the new kafka-clients API. I just recognized, that
> > the
> > > new
> > > > > > > KafkaProducer initializes its own Partitioner that cannot be
> > > changed
> > > > > (final
> > > > > > > field, no ctor-param, no
> > > > > > >
> > Class.forName(config.getPartitionerClassNameFoo()).newInstance()).
> > > Is
> > > > > this
> > > > > > > an intentional change?
> > > > > > > If i understand the new API correctly, one should either
> define a
> > > key
> > > > > for
> > > > > > > a message and let the default Partitioner care that it will be
> > > > > distributed
> > > > > > > over all available partitions or to set an explicit partition
> > > number
> > > > > per
> > > > > > > message that will be written to.
> > > > > > >
> > > > > > > The old API api allowed to create ProducerRecors with a key
> > and/or
> > > a
> > > > > key
> > > > > > > used only for partitioning (but one that is not sent down the
> > wire)
> > > and
> > > > > > > then to provide a custom Partitioner that later could
> distribute
> > > this
> > > > > > > partitioning key over all available partitions when the message
> > is
> > > > > actually
> > > > > > > sent.
> > > > > > >
> > > > > > > The difference in the new procuder API is that we need to know
> > the
> > > > > exact
> > > > > > > number of available partitions before we even create a
> > > ProducerRecord.
> > > > > If
> > > > > > > we dont ensure the correct number of partitions and try to
> send a
> > > > > message
> > > > > > > to a partition that does not exist, the whole message will blow
> > up
> > > > > later
> > > > > > > when the producer tries to send it.
> > > > > > >
> > > > > > > I dont expect the partition count to change that often but the
> > API-
> > > doc
> > > > > > > states that a partitionsFor(String topic) result _should not_
> be
> > > > > cached.
> > > > > > > But I do not really want to check for changed partition counts
> > > before
> > > > > every
> > > > > > > creation of a ProducerRecord. The old pluggable partitioner
> was,
> > > for
> > > > > us,
> > > > > > > especially useful for partition-stickyness by business keys
> (and
> > > thus
> > > > > > > stateful processing stages across multiple topics). This
> ensured
> > > that a
> > > > > > > message that was processed on stage1:partition2 will eventually
> > be
> > > > > > > processed on stageN:partition2. Not very clever in terms of
> > > scalability
> > > > > per
> > > > > > > stage, but it makes reasoning about the message flow alot
> easier.
> > > > > > >
> > > > > > > So for a single ProducerRecord, for my understanding it might
> be
> > > nicer
> > > > > to
> > > > > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or
> > > Java8
> > > > > > > lambda equivalent) instead of the nullable Partition attribute
> > and
> > > > > evaluate
> > > > > > > this function in the producer.
> > > > > > >
> > > > > > > Kind regards
> > > > > > >
> > > > > > >
> > > > > > >
> > -------------------------------------------------------------------
> > > ----
> > > > > -------------
> > > > > > > Daniel Wegener
> > > > > > > Holisticon AG
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > > Cheers for you quick reply Gwen!
> > > > > Good to know that partitionsFor is (most time) fast and
> non-blocking.
> > > Im
> > > > > just wondering if this leaves a (maybe rather artificial?) use case
> > > > > uncovered: If you really try to use the Producer in a completly
> non-
> > > > > blocking fashion (with block.on.buffer.full=false, maybe for a
> > > reactive-
> > > > > streams adapter?) you would still have to call partitionsFor what
> may
> > > > > occasionally be blocking.
> > > > >
> > > > > Dont get me wrong, I am happy with this solution but I think the
> old
> > > API
> > > > > was a bit clearer about what you really give into the producer and
> > what
> > > it
> > > > > does with it (a partition number (that may be discovered as invalid
> > > later
> > > > > and throws exceptions) vs a partitioner that let you distribute
> your
> > > > > payloads over the available number of partitions). In the new api
> it
> > > just
> > > > > feels a bit more - well - java'ish :).
> > > > >
> > > > > Kind regards
> > > > > Daniel
> > > > >
> > > > >
> > > >
> > >
> > > Hi Jay.
> > >
> > > Thank you for this clarification, I agree it's perfectly fine to
> enforce
> > > the metadata resolution when you start a producer once.
> > >
> > > If there would be support for some kind of user defined partitioner I'd
> > > have the following thougts:
> > >
> > > - Let the user still be able to optionally choose a partition for a
> > > ProducerRecord.
> > > - Let the user optionally provide a _CustomPartitioner_ as
> KafkaProducer
> > > ctor
> > > param or ctor props.
> > > - Keep the current default partitioners behavior that will just prefer
> > the
> > > result of the CustomPartitioner over its default strategies (hashing
> the
> > > encoded key before round-robin), but AFTER trying the
> > > ProducerRecord.partition.
> > >
> > > The type signature of a CustomPartitioner could look like this:
> > >
> > > ```
> > > public interface CustomPartitioner<K,V> extends Configurable {
> > >
> > > /**
> > > * Compute the partition for the given record.
> > > *
> > > * @param topic The topic name
> > > * @param key The key to partition on (or null if no key)
> > > * @param value The value to partition on
> > > * @param cluster The current cluster metadata
> > > * @returns a partition or {@code null} if this partitioner cannot
> > make
> > > a useful decision. This will lead to a fallback to the default
> > partitioning
> > > behaviour.
> > > */
> > > public Integer partition(String topic, K key, V value, Cluster
> > > cluster);
> > >
> > > }
> > > ```
> > >
> > > This would:
> > > - Not introduce any breaking changes to the API
> > > - Allow users to partition based on their unserialized "business" keys
> or
> > > Values. This is consistent with the type-parameterized user provided
> > > serializers.
> > >
> > > It might still make the concept of partitioning more complex and thus
> > > harder to grasp.
> > >
> > > An implementation could look like this:
> > >
> >
> https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a
> > > fe60d389f6c (just a raw sketch though).
> > >
> > > What do you think?
> > >
> > > Kind regards
> > > Daniel
> > >
> > >
> > >
> > >
> >
>
Re: New Producer - Is the configurable partitioner gone?
Posted by Steven Wu <st...@gmail.com>.
> The low connection partitioner might work for this
by attempting to reuse recently used nodes whenever possible. That is
useful in environments with lots and lots of producers where you don't care
about semantic partitioning.
In one of the perf test, we found that above "sticky" partitioner improved
batching and reduced cpu util at broker side by 60%. We plan to make it our
default partitioner.
On Sun, Feb 22, 2015 at 10:28 AM, Jay Kreps <ja...@gmail.com> wrote:
> Hey Daniel,
>
> Yeah I think that would be doable. If you want to pursue it you would need
> to do a quick KIP just to get everyone on the same page since this would be
> a public interface we would have to support over a long time:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
>
> When we have the details worked out, then it should be a fairly
> straight-forward patch to make that pluggable.
>
> A few comments:
> - I think we should just make the DefaultPartitioner the default value for
> that configuration, rather than having it be a fall back.
> - You need to pass in the binary key and value in addition to the java
> objects. Otherwise any partitioning based on the binary value will require
> reserializing these.
> - If we add this option we should really ship at least one other useful
> partitioning strategy. The low connection partitioner might work for this
> by attempting to reuse recently used nodes whenever possible. That is
> useful in environments with lots and lots of producers where you don't care
> about semantic partitioning. It would be good to think through if there are
> any other useful partitioning strategies to make sure they would also be
> doable with the interface we would end up with.
> - Currently Cluster is not a public class so we'll have to think about
> whether we want to make that public.
>
> -Jay
>
>
> On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener <
> daniel.wegener@holisticon.de> wrote:
>
> >
> > Jay Kreps <ja...@...> writes:
> >
> > >
> > > Hey Daniel,
> > >
> > > partitionsFor() will block the very first time it sees a new topic that
> > it
> > > doesn't have metadata for yet. If you want to ensure you don't block
> even
> > > that one time, call it prior to your regular usage so it initializes
> > then.
> > >
> > > The rationale for adding a partition in ProducerRecord was that there
> are
> > > actually cases where you want to associate a key with the record but
> not
> > > use it for partitioning. The rationale for not retaining the pluggable
> > > partitioner was that you could always just use the partition (many
> people
> > > dislike the plugin apis and asked for it). Personally, like you, I
> > > preferred the plugin apis.
> > >
> > > We aren't cut off from exposing the existing partitioner usage as a
> > public
> > > api that you can override if there is anyone who wants that. I think
> one
> > > nice thing about it would be the ability to ship with an alternative
> > > partitioning strategy that you could enable purely in config. For
> example
> > > the old producer had a partitioning strategy that attempted to minimize
> > the
> > > number of TCP connections for cases where there was no key. 98% of
> people
> > > loathed that, but for 2% of people it was useful and this would be a
> way
> > to
> > > still include that behavior for people migrating to the new producer.
> > >
> > > -Jay
> > >
> > > On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
> > > daniel.wegener@...> wrote:
> > >
> > > > Gwen Shapira <gshapira <at> ...> writes:
> > > >
> > > > >
> > > > > Hi Daniel,
> > > > >
> > > > > I think you can still use the same logic you had in the custom
> > > > partitioner
> > > > > in the old producer. You just move it to the client that creates
> the
> > > > > records.
> > > > > The reason you don't cache the result of partitionsFor is that the
> > > > producer
> > > > > should handle the caching for you, so its not necessarily a long or
> > > > > blocking call.
> > > > >
> > > > > I see it as a pretty small change to the API. But I'm not sure what
> > drove
> > > > > the change either.
> > > > >
> > > > > Gwen
> > > > >
> > > > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > > > > Daniel.Wegener <at> ...> wrote:
> > > > >
> > > > > > Hello Kafka-users!
> > > > > >
> > > > > > I am facing a migration from a kind of ( a bit self plumbed)
> kafka
> > > > 0.8.1
> > > > > > producer to the new kafka-clients API. I just recognized, that
> the
> > new
> > > > > > KafkaProducer initializes its own Partitioner that cannot be
> > changed
> > > > (final
> > > > > > field, no ctor-param, no
> > > > > >
> Class.forName(config.getPartitionerClassNameFoo()).newInstance()).
> > Is
> > > > this
> > > > > > an intentional change?
> > > > > > If i understand the new API correctly, one should either define a
> > key
> > > > for
> > > > > > a message and let the default Partitioner care that it will be
> > > > distributed
> > > > > > over all available partitions or to set an explicit partition
> > number
> > > > per
> > > > > > message that will be written to.
> > > > > >
> > > > > > The old API api allowed to create ProducerRecors with a key
> and/or
> > a
> > > > key
> > > > > > used only for partitioning (but one that is not sent down the
> wire)
> > and
> > > > > > then to provide a custom Partitioner that later could distribute
> > this
> > > > > > partitioning key over all available partitions when the message
> is
> > > > actually
> > > > > > sent.
> > > > > >
> > > > > > The difference in the new procuder API is that we need to know
> the
> > > > exact
> > > > > > number of available partitions before we even create a
> > ProducerRecord.
> > > > If
> > > > > > we dont ensure the correct number of partitions and try to send a
> > > > message
> > > > > > to a partition that does not exist, the whole message will blow
> up
> > > > later
> > > > > > when the producer tries to send it.
> > > > > >
> > > > > > I dont expect the partition count to change that often but the
> API-
> > doc
> > > > > > states that a partitionsFor(String topic) result _should not_ be
> > > > cached.
> > > > > > But I do not really want to check for changed partition counts
> > before
> > > > every
> > > > > > creation of a ProducerRecord. The old pluggable partitioner was,
> > for
> > > > us,
> > > > > > especially useful for partition-stickyness by business keys (and
> > thus
> > > > > > stateful processing stages across multiple topics). This ensured
> > that a
> > > > > > message that was processed on stage1:partition2 will eventually
> be
> > > > > > processed on stageN:partition2. Not very clever in terms of
> > scalability
> > > > per
> > > > > > stage, but it makes reasoning about the message flow alot easier.
> > > > > >
> > > > > > So for a single ProducerRecord, for my understanding it might be
> > nicer
> > > > to
> > > > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or
> > Java8
> > > > > > lambda equivalent) instead of the nullable Partition attribute
> and
> > > > evaluate
> > > > > > this function in the producer.
> > > > > >
> > > > > > Kind regards
> > > > > >
> > > > > >
> > > > > >
> -------------------------------------------------------------------
> > ----
> > > > -------------
> > > > > > Daniel Wegener
> > > > > > Holisticon AG
> > > > > >
> > > > > >
> > > > >
> > > >
> > > > Cheers for you quick reply Gwen!
> > > > Good to know that partitionsFor is (most time) fast and non-blocking.
> > Im
> > > > just wondering if this leaves a (maybe rather artificial?) use case
> > > > uncovered: If you really try to use the Producer in a completly non-
> > > > blocking fashion (with block.on.buffer.full=false, maybe for a
> > reactive-
> > > > streams adapter?) you would still have to call partitionsFor what may
> > > > occasionally be blocking.
> > > >
> > > > Dont get me wrong, I am happy with this solution but I think the old
> > API
> > > > was a bit clearer about what you really give into the producer and
> what
> > it
> > > > does with it (a partition number (that may be discovered as invalid
> > later
> > > > and throws exceptions) vs a partitioner that let you distribute your
> > > > payloads over the available number of partitions). In the new api it
> > just
> > > > feels a bit more - well - java'ish :).
> > > >
> > > > Kind regards
> > > > Daniel
> > > >
> > > >
> > >
> >
> > Hi Jay.
> >
> > Thank you for this clarification, I agree it's perfectly fine to enforce
> > the metadata resolution when you start a producer once.
> >
> > If there would be support for some kind of user defined partitioner I'd
> > have the following thougts:
> >
> > - Let the user still be able to optionally choose a partition for a
> > ProducerRecord.
> > - Let the user optionally provide a _CustomPartitioner_ as KafkaProducer
> > ctor
> > param or ctor props.
> > - Keep the current default partitioners behavior that will just prefer
> the
> > result of the CustomPartitioner over its default strategies (hashing the
> > encoded key before round-robin), but AFTER trying the
> > ProducerRecord.partition.
> >
> > The type signature of a CustomPartitioner could look like this:
> >
> > ```
> > public interface CustomPartitioner<K,V> extends Configurable {
> >
> > /**
> > * Compute the partition for the given record.
> > *
> > * @param topic The topic name
> > * @param key The key to partition on (or null if no key)
> > * @param value The value to partition on
> > * @param cluster The current cluster metadata
> > * @returns a partition or {@code null} if this partitioner cannot
> make
> > a useful decision. This will lead to a fallback to the default
> partitioning
> > behaviour.
> > */
> > public Integer partition(String topic, K key, V value, Cluster
> > cluster);
> >
> > }
> > ```
> >
> > This would:
> > - Not introduce any breaking changes to the API
> > - Allow users to partition based on their unserialized "business" keys or
> > Values. This is consistent with the type-parameterized user provided
> > serializers.
> >
> > It might still make the concept of partitioning more complex and thus
> > harder to grasp.
> >
> > An implementation could look like this:
> >
> https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a
> > fe60d389f6c (just a raw sketch though).
> >
> > What do you think?
> >
> > Kind regards
> > Daniel
> >
> >
> >
> >
>
Re: New Producer - Is the configurable partitioner gone?
Posted by Jay Kreps <ja...@gmail.com>.
Hey Daniel,
Yeah I think that would be doable. If you want to pursue it you would need
to do a quick KIP just to get everyone on the same page since this would be
a public interface we would have to support over a long time:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
When we have the details worked out, then it should be a fairly
straight-forward patch to make that pluggable.
A few comments:
- I think we should just make the DefaultPartitioner the default value for
that configuration, rather than having it be a fall back.
- You need to pass in the binary key and value in addition to the java
objects. Otherwise any partitioning based on the binary value will require
reserializing these.
- If we add this option we should really ship at least one other useful
partitioning strategy. The low connection partitioner might work for this
by attempting to reuse recently used nodes whenever possible. That is
useful in environments with lots and lots of producers where you don't care
about semantic partitioning. It would be good to think through if there are
any other useful partitioning strategies to make sure they would also be
doable with the interface we would end up with.
- Currently Cluster is not a public class so we'll have to think about
whether we want to make that public.
-Jay
On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener <
daniel.wegener@holisticon.de> wrote:
>
> Jay Kreps <ja...@...> writes:
>
> >
> > Hey Daniel,
> >
> > partitionsFor() will block the very first time it sees a new topic that
> it
> > doesn't have metadata for yet. If you want to ensure you don't block even
> > that one time, call it prior to your regular usage so it initializes
> then.
> >
> > The rationale for adding a partition in ProducerRecord was that there are
> > actually cases where you want to associate a key with the record but not
> > use it for partitioning. The rationale for not retaining the pluggable
> > partitioner was that you could always just use the partition (many people
> > dislike the plugin apis and asked for it). Personally, like you, I
> > preferred the plugin apis.
> >
> > We aren't cut off from exposing the existing partitioner usage as a
> public
> > api that you can override if there is anyone who wants that. I think one
> > nice thing about it would be the ability to ship with an alternative
> > partitioning strategy that you could enable purely in config. For example
> > the old producer had a partitioning strategy that attempted to minimize
> the
> > number of TCP connections for cases where there was no key. 98% of people
> > loathed that, but for 2% of people it was useful and this would be a way
> to
> > still include that behavior for people migrating to the new producer.
> >
> > -Jay
> >
> > On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
> > daniel.wegener@...> wrote:
> >
> > > Gwen Shapira <gshapira <at> ...> writes:
> > >
> > > >
> > > > Hi Daniel,
> > > >
> > > > I think you can still use the same logic you had in the custom
> > > partitioner
> > > > in the old producer. You just move it to the client that creates the
> > > > records.
> > > > The reason you don't cache the result of partitionsFor is that the
> > > producer
> > > > should handle the caching for you, so its not necessarily a long or
> > > > blocking call.
> > > >
> > > > I see it as a pretty small change to the API. But I'm not sure what
> drove
> > > > the change either.
> > > >
> > > > Gwen
> > > >
> > > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > > > Daniel.Wegener <at> ...> wrote:
> > > >
> > > > > Hello Kafka-users!
> > > > >
> > > > > I am facing a migration from a kind of ( a bit self plumbed) kafka
> > > 0.8.1
> > > > > producer to the new kafka-clients API. I just recognized, that the
> new
> > > > > KafkaProducer initializes its own Partitioner that cannot be
> changed
> > > (final
> > > > > field, no ctor-param, no
> > > > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()).
> Is
> > > this
> > > > > an intentional change?
> > > > > If i understand the new API correctly, one should either define a
> key
> > > for
> > > > > a message and let the default Partitioner care that it will be
> > > distributed
> > > > > over all available partitions or to set an explicit partition
> number
> > > per
> > > > > message that will be written to.
> > > > >
> > > > > The old API api allowed to create ProducerRecors with a key and/or
> a
> > > key
> > > > > used only for partitioning (but one that is not sent down the wire)
> and
> > > > > then to provide a custom Partitioner that later could distribute
> this
> > > > > partitioning key over all available partitions when the message is
> > > actually
> > > > > sent.
> > > > >
> > > > > The difference in the new procuder API is that we need to know the
> > > exact
> > > > > number of available partitions before we even create a
> ProducerRecord.
> > > If
> > > > > we dont ensure the correct number of partitions and try to send a
> > > message
> > > > > to a partition that does not exist, the whole message will blow up
> > > later
> > > > > when the producer tries to send it.
> > > > >
> > > > > I dont expect the partition count to change that often but the API-
> doc
> > > > > states that a partitionsFor(String topic) result _should not_ be
> > > cached.
> > > > > But I do not really want to check for changed partition counts
> before
> > > every
> > > > > creation of a ProducerRecord. The old pluggable partitioner was,
> for
> > > us,
> > > > > especially useful for partition-stickyness by business keys (and
> thus
> > > > > stateful processing stages across multiple topics). This ensured
> that a
> > > > > message that was processed on stage1:partition2 will eventually be
> > > > > processed on stageN:partition2. Not very clever in terms of
> scalability
> > > per
> > > > > stage, but it makes reasoning about the message flow alot easier.
> > > > >
> > > > > So for a single ProducerRecord, for my understanding it might be
> nicer
> > > to
> > > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or
> Java8
> > > > > lambda equivalent) instead of the nullable Partition attribute and
> > > evaluate
> > > > > this function in the producer.
> > > > >
> > > > > Kind regards
> > > > >
> > > > >
> > > > > -------------------------------------------------------------------
> ----
> > > -------------
> > > > > Daniel Wegener
> > > > > Holisticon AG
> > > > >
> > > > >
> > > >
> > >
> > > Cheers for you quick reply Gwen!
> > > Good to know that partitionsFor is (most time) fast and non-blocking.
> Im
> > > just wondering if this leaves a (maybe rather artificial?) use case
> > > uncovered: If you really try to use the Producer in a completly non-
> > > blocking fashion (with block.on.buffer.full=false, maybe for a
> reactive-
> > > streams adapter?) you would still have to call partitionsFor what may
> > > occasionally be blocking.
> > >
> > > Dont get me wrong, I am happy with this solution but I think the old
> API
> > > was a bit clearer about what you really give into the producer and what
> it
> > > does with it (a partition number (that may be discovered as invalid
> later
> > > and throws exceptions) vs a partitioner that let you distribute your
> > > payloads over the available number of partitions). In the new api it
> just
> > > feels a bit more - well - java'ish :).
> > >
> > > Kind regards
> > > Daniel
> > >
> > >
> >
>
> Hi Jay.
>
> Thank you for this clarification, I agree it's perfectly fine to enforce
> the metadata resolution when you start a producer once.
>
> If there would be support for some kind of user defined partitioner I'd
> have the following thougts:
>
> - Let the user still be able to optionally choose a partition for a
> ProducerRecord.
> - Let the user optionally provide a _CustomPartitioner_ as KafkaProducer
> ctor
> param or ctor props.
> - Keep the current default partitioners behavior that will just prefer the
> result of the CustomPartitioner over its default strategies (hashing the
> encoded key before round-robin), but AFTER trying the
> ProducerRecord.partition.
>
> The type signature of a CustomPartitioner could look like this:
>
> ```
> public interface CustomPartitioner<K,V> extends Configurable {
>
> /**
> * Compute the partition for the given record.
> *
> * @param topic The topic name
> * @param key The key to partition on (or null if no key)
> * @param value The value to partition on
> * @param cluster The current cluster metadata
> * @returns a partition or {@code null} if this partitioner cannot make
> a useful decision. This will lead to a fallback to the default partitioning
> behaviour.
> */
> public Integer partition(String topic, K key, V value, Cluster
> cluster);
>
> }
> ```
>
> This would:
> - Not introduce any breaking changes to the API
> - Allow users to partition based on their unserialized "business" keys or
> Values. This is consistent with the type-parameterized user provided
> serializers.
>
> It might still make the concept of partitioning more complex and thus
> harder to grasp.
>
> An implementation could look like this:
> https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a
> fe60d389f6c (just a raw sketch though).
>
> What do you think?
>
> Kind regards
> Daniel
>
>
>
>
Re: New Producer - Is the configurable partitioner gone?
Posted by Daniel Wegener <da...@holisticon.de>.
Jay Kreps <ja...@...> writes:
>
> Hey Daniel,
>
> partitionsFor() will block the very first time it sees a new topic that
it
> doesn't have metadata for yet. If you want to ensure you don't block even
> that one time, call it prior to your regular usage so it initializes
then.
>
> The rationale for adding a partition in ProducerRecord was that there are
> actually cases where you want to associate a key with the record but not
> use it for partitioning. The rationale for not retaining the pluggable
> partitioner was that you could always just use the partition (many people
> dislike the plugin apis and asked for it). Personally, like you, I
> preferred the plugin apis.
>
> We aren't cut off from exposing the existing partitioner usage as a
public
> api that you can override if there is anyone who wants that. I think one
> nice thing about it would be the ability to ship with an alternative
> partitioning strategy that you could enable purely in config. For example
> the old producer had a partitioning strategy that attempted to minimize
the
> number of TCP connections for cases where there was no key. 98% of people
> loathed that, but for 2% of people it was useful and this would be a way
to
> still include that behavior for people migrating to the new producer.
>
> -Jay
>
> On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
> daniel.wegener@...> wrote:
>
> > Gwen Shapira <gshapira <at> ...> writes:
> >
> > >
> > > Hi Daniel,
> > >
> > > I think you can still use the same logic you had in the custom
> > partitioner
> > > in the old producer. You just move it to the client that creates the
> > > records.
> > > The reason you don't cache the result of partitionsFor is that the
> > producer
> > > should handle the caching for you, so its not necessarily a long or
> > > blocking call.
> > >
> > > I see it as a pretty small change to the API. But I'm not sure what
drove
> > > the change either.
> > >
> > > Gwen
> > >
> > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > > Daniel.Wegener <at> ...> wrote:
> > >
> > > > Hello Kafka-users!
> > > >
> > > > I am facing a migration from a kind of ( a bit self plumbed) kafka
> > 0.8.1
> > > > producer to the new kafka-clients API. I just recognized, that the
new
> > > > KafkaProducer initializes its own Partitioner that cannot be
changed
> > (final
> > > > field, no ctor-param, no
> > > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()).
Is
> > this
> > > > an intentional change?
> > > > If i understand the new API correctly, one should either define a
key
> > for
> > > > a message and let the default Partitioner care that it will be
> > distributed
> > > > over all available partitions or to set an explicit partition
number
> > per
> > > > message that will be written to.
> > > >
> > > > The old API api allowed to create ProducerRecors with a key and/or
a
> > key
> > > > used only for partitioning (but one that is not sent down the wire)
and
> > > > then to provide a custom Partitioner that later could distribute
this
> > > > partitioning key over all available partitions when the message is
> > actually
> > > > sent.
> > > >
> > > > The difference in the new procuder API is that we need to know the
> > exact
> > > > number of available partitions before we even create a
ProducerRecord.
> > If
> > > > we dont ensure the correct number of partitions and try to send a
> > message
> > > > to a partition that does not exist, the whole message will blow up
> > later
> > > > when the producer tries to send it.
> > > >
> > > > I dont expect the partition count to change that often but the API-
doc
> > > > states that a partitionsFor(String topic) result _should not_ be
> > cached.
> > > > But I do not really want to check for changed partition counts
before
> > every
> > > > creation of a ProducerRecord. The old pluggable partitioner was,
for
> > us,
> > > > especially useful for partition-stickyness by business keys (and
thus
> > > > stateful processing stages across multiple topics). This ensured
that a
> > > > message that was processed on stage1:partition2 will eventually be
> > > > processed on stageN:partition2. Not very clever in terms of
scalability
> > per
> > > > stage, but it makes reasoning about the message flow alot easier.
> > > >
> > > > So for a single ProducerRecord, for my understanding it might be
nicer
> > to
> > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or
Java8
> > > > lambda equivalent) instead of the nullable Partition attribute and
> > evaluate
> > > > this function in the producer.
> > > >
> > > > Kind regards
> > > >
> > > >
> > > > -------------------------------------------------------------------
----
> > -------------
> > > > Daniel Wegener
> > > > Holisticon AG
> > > >
> > > >
> > >
> >
> > Cheers for you quick reply Gwen!
> > Good to know that partitionsFor is (most time) fast and non-blocking.
Im
> > just wondering if this leaves a (maybe rather artificial?) use case
> > uncovered: If you really try to use the Producer in a completly non-
> > blocking fashion (with block.on.buffer.full=false, maybe for a
reactive-
> > streams adapter?) you would still have to call partitionsFor what may
> > occasionally be blocking.
> >
> > Dont get me wrong, I am happy with this solution but I think the old
API
> > was a bit clearer about what you really give into the producer and what
it
> > does with it (a partition number (that may be discovered as invalid
later
> > and throws exceptions) vs a partitioner that let you distribute your
> > payloads over the available number of partitions). In the new api it
just
> > feels a bit more - well - java'ish :).
> >
> > Kind regards
> > Daniel
> >
> >
>
Hi Jay.
Thank you for this clarification, I agree it's perfectly fine to enforce
the metadata resolution when you start a producer once.
If there would be support for some kind of user defined partitioner I'd
have the following thougts:
- Let the user still be able to optionally choose a partition for a
ProducerRecord.
- Let the user optionally provide a _CustomPartitioner_ as KafkaProducer
ctor
param or ctor props.
- Keep the current default partitioners behavior that will just prefer the
result of the CustomPartitioner over its default strategies (hashing the
encoded key before round-robin), but AFTER trying the
ProducerRecord.partition.
The type signature of a CustomPartitioner could look like this:
```
public interface CustomPartitioner<K,V> extends Configurable {
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param value The value to partition on
* @param cluster The current cluster metadata
* @returns a partition or {@code null} if this partitioner cannot make
a useful decision. This will lead to a fallback to the default partitioning
behaviour.
*/
public Integer partition(String topic, K key, V value, Cluster
cluster);
}
```
This would:
- Not introduce any breaking changes to the API
- Allow users to partition based on their unserialized "business" keys or
Values. This is consistent with the type-parameterized user provided
serializers.
It might still make the concept of partitioning more complex and thus
harder to grasp.
An implementation could look like this:
https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a
fe60d389f6c (just a raw sketch though).
What do you think?
Kind regards
Daniel
Re: New Producer - Is the configurable partitioner gone?
Posted by Jay Kreps <ja...@gmail.com>.
Hey Daniel,
partitionsFor() will block the very first time it sees a new topic that it
doesn't have metadata for yet. If you want to ensure you don't block even
that one time, call it prior to your regular usage so it initializes then.
The rationale for adding a partition in ProducerRecord was that there are
actually cases where you want to associate a key with the record but not
use it for partitioning. The rationale for not retaining the pluggable
partitioner was that you could always just use the partition (many people
dislike the plugin apis and asked for it). Personally, like you, I
preferred the plugin apis.
We aren't cut off from exposing the existing partitioner usage as a public
api that you can override if there is anyone who wants that. I think one
nice thing about it would be the ability to ship with an alternative
partitioning strategy that you could enable purely in config. For example
the old producer had a partitioning strategy that attempted to minimize the
number of TCP connections for cases where there was no key. 98% of people
loathed that, but for 2% of people it was useful and this would be a way to
still include that behavior for people migrating to the new producer.
-Jay
On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
daniel.wegener@holisticon.de> wrote:
> Gwen Shapira <gs...@...> writes:
>
> >
> > Hi Daniel,
> >
> > I think you can still use the same logic you had in the custom
> partitioner
> > in the old producer. You just move it to the client that creates the
> > records.
> > The reason you don't cache the result of partitionsFor is that the
> producer
> > should handle the caching for you, so its not necessarily a long or
> > blocking call.
> >
> > I see it as a pretty small change to the API. But I'm not sure what drove
> > the change either.
> >
> > Gwen
> >
> > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > Daniel.Wegener@...> wrote:
> >
> > > Hello Kafka-users!
> > >
> > > I am facing a migration from a kind of ( a bit self plumbed) kafka
> 0.8.1
> > > producer to the new kafka-clients API. I just recognized, that the new
> > > KafkaProducer initializes its own Partitioner that cannot be changed
> (final
> > > field, no ctor-param, no
> > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()). Is
> this
> > > an intentional change?
> > > If i understand the new API correctly, one should either define a key
> for
> > > a message and let the default Partitioner care that it will be
> distributed
> > > over all available partitions or to set an explicit partition number
> per
> > > message that will be written to.
> > >
> > > The old API api allowed to create ProducerRecors with a key and/or a
> key
> > > used only for partitioning (but one that is not sent down the wire) and
> > > then to provide a custom Partitioner that later could distribute this
> > > partitioning key over all available partitions when the message is
> actually
> > > sent.
> > >
> > > The difference in the new procuder API is that we need to know the
> exact
> > > number of available partitions before we even create a ProducerRecord.
> If
> > > we dont ensure the correct number of partitions and try to send a
> message
> > > to a partition that does not exist, the whole message will blow up
> later
> > > when the producer tries to send it.
> > >
> > > I dont expect the partition count to change that often but the API-doc
> > > states that a partitionsFor(String topic) result _should not_ be
> cached.
> > > But I do not really want to check for changed partition counts before
> every
> > > creation of a ProducerRecord. The old pluggable partitioner was, for
> us,
> > > especially useful for partition-stickyness by business keys (and thus
> > > stateful processing stages across multiple topics). This ensured that a
> > > message that was processed on stage1:partition2 will eventually be
> > > processed on stageN:partition2. Not very clever in terms of scalability
> per
> > > stage, but it makes reasoning about the message flow alot easier.
> > >
> > > So for a single ProducerRecord, for my understanding it might be nicer
> to
> > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or Java8
> > > lambda equivalent) instead of the nullable Partition attribute and
> evaluate
> > > this function in the producer.
> > >
> > > Kind regards
> > >
> > >
> > > -----------------------------------------------------------------------
> -------------
> > > Daniel Wegener
> > > Holisticon AG
> > >
> > >
> >
>
> Cheers for you quick reply Gwen!
> Good to know that partitionsFor is (most time) fast and non-blocking. Im
> just wondering if this leaves a (maybe rather artificial?) use case
> uncovered: If you really try to use the Producer in a completly non-
> blocking fashion (with block.on.buffer.full=false, maybe for a reactive-
> streams adapter?) you would still have to call partitionsFor what may
> occasionally be blocking.
>
> Dont get me wrong, I am happy with this solution but I think the old API
> was a bit clearer about what you really give into the producer and what it
> does with it (a partition number (that may be discovered as invalid later
> and throws exceptions) vs a partitioner that let you distribute your
> payloads over the available number of partitions). In the new api it just
> feels a bit more - well - java'ish :).
>
> Kind regards
> Daniel
>
>
Re: New Producer - Is the configurable partitioner gone?
Posted by Daniel Wegener <da...@holisticon.de>.
Gwen Shapira <gs...@...> writes:
>
> Hi Daniel,
>
> I think you can still use the same logic you had in the custom
partitioner
> in the old producer. You just move it to the client that creates the
> records.
> The reason you don't cache the result of partitionsFor is that the
producer
> should handle the caching for you, so its not necessarily a long or
> blocking call.
>
> I see it as a pretty small change to the API. But I'm not sure what drove
> the change either.
>
> Gwen
>
> On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> Daniel.Wegener@...> wrote:
>
> > Hello Kafka-users!
> >
> > I am facing a migration from a kind of ( a bit self plumbed) kafka
0.8.1
> > producer to the new kafka-clients API. I just recognized, that the new
> > KafkaProducer initializes its own Partitioner that cannot be changed
(final
> > field, no ctor-param, no
> > Class.forName(config.getPartitionerClassNameFoo()).newInstance()). Is
this
> > an intentional change?
> > If i understand the new API correctly, one should either define a key
for
> > a message and let the default Partitioner care that it will be
distributed
> > over all available partitions or to set an explicit partition number
per
> > message that will be written to.
> >
> > The old API api allowed to create ProducerRecors with a key and/or a
key
> > used only for partitioning (but one that is not sent down the wire) and
> > then to provide a custom Partitioner that later could distribute this
> > partitioning key over all available partitions when the message is
actually
> > sent.
> >
> > The difference in the new procuder API is that we need to know the
exact
> > number of available partitions before we even create a ProducerRecord.
If
> > we dont ensure the correct number of partitions and try to send a
message
> > to a partition that does not exist, the whole message will blow up
later
> > when the producer tries to send it.
> >
> > I dont expect the partition count to change that often but the API-doc
> > states that a partitionsFor(String topic) result _should not_ be
cached.
> > But I do not really want to check for changed partition counts before
every
> > creation of a ProducerRecord. The old pluggable partitioner was, for
us,
> > especially useful for partition-stickyness by business keys (and thus
> > stateful processing stages across multiple topics). This ensured that a
> > message that was processed on stage1:partition2 will eventually be
> > processed on stageN:partition2. Not very clever in terms of scalability
per
> > stage, but it makes reasoning about the message flow alot easier.
> >
> > So for a single ProducerRecord, for my understanding it might be nicer
to
> > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or Java8
> > lambda equivalent) instead of the nullable Partition attribute and
evaluate
> > this function in the producer.
> >
> > Kind regards
> >
> >
> > -----------------------------------------------------------------------
-------------
> > Daniel Wegener
> > Holisticon AG
> >
> >
>
Cheers for you quick reply Gwen!
Good to know that partitionsFor is (most time) fast and non-blocking. Im
just wondering if this leaves a (maybe rather artificial?) use case
uncovered: If you really try to use the Producer in a completly non-
blocking fashion (with block.on.buffer.full=false, maybe for a reactive-
streams adapter?) you would still have to call partitionsFor what may
occasionally be blocking.
Dont get me wrong, I am happy with this solution but I think the old API
was a bit clearer about what you really give into the producer and what it
does with it (a partition number (that may be discovered as invalid later
and throws exceptions) vs a partitioner that let you distribute your
payloads over the available number of partitions). In the new api it just
feels a bit more - well - java'ish :).
Kind regards
Daniel
Re: New Producer - Is the configurable partitioner gone?
Posted by Gwen Shapira <gs...@cloudera.com>.
Hi Daniel,
I think you can still use the same logic you had in the custom partitioner
in the old producer. You just move it to the client that creates the
records.
The reason you don't cache the result of partitionsFor is that the producer
should handle the caching for you, so its not necessarily a long or
blocking call.
I see it as a pretty small change to the API. But I'm not sure what drove
the change either.
Gwen
On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
Daniel.Wegener@holisticon.de> wrote:
> Hello Kafka-users!
>
> I am facing a migration from a kind of ( a bit self plumbed) kafka 0.8.1
> producer to the new kafka-clients API. I just recognized, that the new
> KafkaProducer initializes its own Partitioner that cannot be changed (final
> field, no ctor-param, no
> Class.forName(config.getPartitionerClassNameFoo()).newInstance()). Is this
> an intentional change?
> If i understand the new API correctly, one should either define a key for
> a message and let the default Partitioner care that it will be distributed
> over all available partitions or to set an explicit partition number per
> message that will be written to.
>
> The old API api allowed to create ProducerRecors with a key and/or a key
> used only for partitioning (but one that is not sent down the wire) and
> then to provide a custom Partitioner that later could distribute this
> partitioning key over all available partitions when the message is actually
> sent.
>
> The difference in the new procuder API is that we need to know the exact
> number of available partitions before we even create a ProducerRecord. If
> we dont ensure the correct number of partitions and try to send a message
> to a partition that does not exist, the whole message will blow up later
> when the producer tries to send it.
>
> I dont expect the partition count to change that often but the API-doc
> states that a partitionsFor(String topic) result _should not_ be cached.
> But I do not really want to check for changed partition counts before every
> creation of a ProducerRecord. The old pluggable partitioner was, for us,
> especially useful for partition-stickyness by business keys (and thus
> stateful processing stages across multiple topics). This ensured that a
> message that was processed on stage1:partition2 will eventually be
> processed on stageN:partition2. Not very clever in terms of scalability per
> stage, but it makes reasoning about the message flow alot easier.
>
> So for a single ProducerRecord, for my understanding it might be nicer to
> have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or Java8
> lambda equivalent) instead of the nullable Partition attribute and evaluate
> this function in the producer.
>
> Kind regards
>
>
> ------------------------------------------------------------------------------------
> Daniel Wegener
> Holisticon AG
>
>