You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Mickael Maison <mi...@gmail.com> on 2020/08/20 15:46:22 UTC

[DISCUSS] KIP-660: Pluggable ReplicaAssignor

Hi,

I've created KIP-660 to make the replica assignment logic pluggable.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-660%3A+Pluggable+ReplicaAssignor

Please take a look and let me know if you have any feedback.

Thanks

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Colin McCabe <cm...@apache.org>.
On Thu, Dec 3, 2020, at 07:41, Edoardo Comar wrote:
> Hi Colin
> thanks for your comments.
> I think your objections to creating an interface for replica placement
> could be used against similar server-side plug-ins (Authorizer,
> QuotaCallback).

Hi Edoardo,

The Authorizer API is not really comparable, I think.  Most organizations have some kind of identity management solution in place already (ActiveDirectory, LDAP, Kerberos, whatever) and it makes sense for Kafka to integrate with that existing system.  In contrast, nobody has an existing replica placement system sitting around and wants to integrate Kafka with that.

best,
Colin


> They too are on sensitive code paths, can cause problems if badly
> written/poorly tested and may cause a pain on the evolution of Kafka if
> backward compatibility is promised.
> 
> On backward compatibility, I'm all for marking the interfaces as
> implement-at-your-risk :-) so that a minority of implementers won't keep
> the ecosystem on a handbrake.
> 
> On 'users', I think that the implementers of these interfaces are teams
> running Kafka clusters, not 'users' publishing and consuming messages or
> streams.
> Therefore I'd expect them to use these plug-in point implementations
> with due care, or else they risk extinction by survival of the fittest ...
> :-)
> 
> cheers
> Edoardo
> 
> On Wed, 2 Dec 2020 at 18:46, Colin McCabe <cm...@apache.org> wrote:
> 
> > Hi Mickael,
> >
> > To be honest, I think it would be better not to make replica placement
> > pluggable.
> >
> > When I worked on the Hadoop Distributed Filesystem, we supported pluggable
> > replica placement policies, and it never worked out well.  Users would
> > write plugin code that ran in a very sensitive context, and it would crash
> > or perform poorly, causing problems which they often asked us to
> > troubleshoot.  The plugin interface became very complex because people kept
> > adding stuff, and nothing could be removed because it would break API
> > compatibility.
> >
> > Our advice to users was always not to do this, but to do placement on the
> > client side if they really wanted to control placement.  In general I think
> > we made a mistake by creating that API at all, and we should have just
> > provided more configuration knobs for placement instead.  (We did add a few
> > such knobs over time, and they were much better at solving people's
> > problems than the placement API...)
> >
> > The reality is that in order to do a good job with replica placement, your
> > code needs to be very well integrated with the rest of the system.  As you
> > said, you need to know about what other partitions and brokers exist.  This
> > information can be quite large, if you have a large cluster, and it is
> > constantly changing.  More than that, if you want to do better than the
> > current built-in policy, you need additional information like metrics about
> > how much load each broker is under, how much disk space it has, reads vs.
> > writes, etc. etc.  If you want to treat some nodes specially, you need a
> > place to store that metadata.  I think people who want to customize this
> > would be better off forking.
> >
> > If we are absolutely convinced we want to do this, we should at least mark
> > any internal classes we're exposing here as Evolving, rather than Stable,
> > so that we can change them at will.  Basically not give a compatibility
> > guarantee for this.
> >
> > best,
> > Colin
> >
> >
> > On Thu, Nov 26, 2020, at 08:47, Mickael Maison wrote:
> > > Thanks Gwen for following up.
> > >
> > > With this extra bit of context, David's comments make more sense.
> > >
> > > If we move the replica placement plugin to the controller, I think
> > > most of the API can stay the same. However, as David mentioned,
> > > Cluster may be problematic.
> > > In a replica placement plugin, you'd typically want to retrieve the
> > > list of brokers and the list of partitions (including leader and
> > > replicas) so it should not be too hard to come up with a new interface
> > > instead of using Cluster.
> > >
> > > But until KIP-631 is done, new types used for metadata in the
> > > controller are not known. So I wonder if we need to wait for that KIP
> > > before we can make further progress here or if we should be fine
> > > having a pretty generic Metadata interface.
> > >
> > > Maybe Colin/Ismael can comment and advise us here?
> > >
> > > On Tue, Nov 24, 2020 at 8:47 PM Gwen Shapira <gw...@confluent.io> wrote:
> > > >
> > > > Michael,
> > > >
> > > > I talked a bit to Colin and Ismael offline and got some clarity about
> > > > KIP-500. Basically, replica placement requires an entire metadata map
> > > > - all the placements of all replicas. Since one of the goals of
> > > > KIP-500 is to go to 1M or even 10M partitions on the cluster, this
> > > > will be a rather large map. Since brokers normally don't need to know
> > > > all the placements (they just need to know about the subset of
> > > > replicas that they lead or follow), the idea is to keep this map on
> > > > the controller only. Which means that the replica placement plugin
> > > > will ideally be on the controller too. This also has a nice side
> > > > benefit - since we will be able to run the controller quorum on a
> > > > separate set of machines, we'll be able to replace the replica
> > > > placement plugin by updating 3-5 controller nodes, not the entire
> > > > cluster.
> > > >
> > > > Gwen
> > > >
> > > > On Thu, Nov 19, 2020 at 4:54 AM Mickael Maison <
> > mickael.maison@gmail.com> wrote:
> > > > >
> > > > > Hi David,
> > > > >
> > > > > I think using updateClusterMetadata() like in ClientQuotaCallback is
> > a
> > > > > great idea. I've updated the KIP accordingly.
> > > > >
> > > > > As mentioned, computing assignments for the full batch of
> > > > > topics/partitions was considered but it made the logic in
> > AdminManager
> > > > > really complicated. For the initial use cases this KIP is targetting,
> > > > > it felt simpler and acceptable to compute assignments one topic at a
> > > > > time.
> > > > >
> > > > > Cluster is already used in other APIs, like ClientQuotaCallback so I
> > > > > think it makes sense to reuse it here.
> > > > >
> > > > > I'm not fully up to date with the latest advances on KIP-500, but
> > like
> > > > > Gwen, I'm not sure we'd want to move that logic into the Controller.
> > > > > This KIP is keeping the metadata creation in AdminManager and just
> > > > > making the logic pluggable.
> > > > >
> > > > > Thanks
> > > > >
> > > > > On Mon, Nov 16, 2020 at 3:56 PM Gwen Shapira <gw...@confluent.io>
> > wrote:
> > > > > >
> > > > > > Why would the replica placement logic run in the controller rather
> > than in
> > > > > > the AdminManager?
> > > > > >
> > > > > > My understanding, and correct me if I got it wrong, is that we are
> > aiming
> > > > > > at better separation of concerns. The controller job will be
> > managing
> > > > > > consensus and consistency of metadata, but creating the metadata
> > itself
> > > > > > will be in the AdminManager.
> > > > > >
> > > > > > On Mon, Nov 16, 2020, 5:31 AM David Jacot <dj...@confluent.io>
> > wrote:
> > > > > >
> > > > > > > Hi Mickael,
> > > > > > >
> > > > > > > Thanks for the KIP. It is an interesting one.
> > > > > > >
> > > > > > > I imagine that custom assignors may use a rather complex model
> > of the
> > > > > > > cluster in order
> > > > > > > to be able to allocate partitions in smarter ways. For instance,
> > one may
> > > > > > > use the distribution
> > > > > > > of leaders in the cluster to allocate the new leaders. With the
> > current
> > > > > > > interface, that
> > > > > > > means computing the distribution based on the Cluster received
> > for every
> > > > > > > assignment
> > > > > > > request. That could become pretty inefficient in clusters with a
> > large
> > > > > > > number of nodes
> > > > > > > and/or partitions. That could become even worse if the model is
> > more
> > > > > > > complicated.
> > > > > > >
> > > > > > > I wonder if you have thought about this or experienced this with
> > your
> > > > > > > prototype?
> > > > > > >
> > > > > > > Have you considered going with an approach à la
> > ClientQuotaCallback where
> > > > > > > the plugin
> > > > > > > is updated when the Cluster has changed? That would allow to
> > keep an
> > > > > > > internal model
> > > > > > > ready. Another way would be to use batching as suggested as it
> > would allow
> > > > > > > to amortize
> > > > > > > the cost of building a model for the current request/user.
> > > > > > >
> > > > > > > I also wonder if using Cluster is a good idea here. With
> > KIP-500, I can
> > > > > > > imagine that this
> > > > > > > plugin will run in the controller directly instead of running in
> > the
> > > > > > > AdminManager as today.
> > > > > > > In this case, we could obviously continue to build that Cluster
> > object but
> > > > > > > we may have
> > > > > > > better ways. Therefore, I wonder if having an interface to
> > represent the
> > > > > > > cluster may be
> > > > > > > better from an evolution perspective. Have you considered this?
> > > > > > >
> > > > > > > Best,
> > > > > > > David
> > > > > > >
> > > > > > > On Mon, Nov 16, 2020 at 12:10 PM Mickael Maison <
> > mickael.maison@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > If I don't see additional feedback in the next few days, I'll
> > start a
> > > > > > > vote.
> > > > > > > > Thanks
> > > > > > > >
> > > > > > > > On Thu, Nov 5, 2020 at 6:29 PM Mickael Maison <
> > mickael.maison@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > I've updated the KIP to reflect the latest discussions.
> > > > > > > > >
> > > > > > > > > Tom,
> > > > > > > > > 2) Updated
> > > > > > > > > 4) I decided against switching to a "batch interface" and
> > added the
> > > > > > > > > reasons in the Rejected Alternatives section
> > > > > > > > >
> > > > > > > > > Please take a look and let me know if you have any feedback.
> > > > > > > > > Thanks
> > > > > > > > >
> > > > > > > > > On Tue, Oct 6, 2020 at 9:43 AM Mickael Maison <
> > > > > > > mickael.maison@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Hi Efe,
> > > > > > > > > >
> > > > > > > > > > Thanks for the feedback.
> > > > > > > > > > We also need to assign replicas when adding partitions to
> > an existing
> > > > > > > > > > topic. This is why I choose to use a list of partition
> > ids. Otherwise
> > > > > > > > > > we'd need the number of partitions and the starting
> > partition id.
> > > > > > > > > >
> > > > > > > > > > Let me know if you have more questions
> > > > > > > > > >
> > > > > > > > > > On Tue, Oct 6, 2020 at 2:16 AM Efe Gencer
> > > > > > > <ag...@linkedin.com.invalid>
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > Hi Mickael,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > A call to an external system, e.g. Cruise Control, in the
> > > > > > > > implementation of the provided interface can indeed help with
> > the initial
> > > > > > > > assignment of partitions.
> > > > > > > > > > >
> > > > > > > > > > > I am curious why the proposed
> > > > > > > > `ReplicaAssignor#assignReplicasToBrokers` receives a list of
> > partition
> > > > > > > ids
> > > > > > > > as opposed to the number of partitions to create the topic
> > with?
> > > > > > > > > > >
> > > > > > > > > > > Would you clarify if this API is expected to be used (1)
> > only for
> > > > > > > > new topics or (2) also for existing topics?
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Efe
> > > > > > > > > > > ________________________________
> > > > > > > > > > > From: Mickael Maison <mi...@gmail.com>
> > > > > > > > > > > Sent: Thursday, October 1, 2020 9:43 AM
> > > > > > > > > > > To: dev <de...@kafka.apache.org>
> > > > > > > > > > > Subject: Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor
> > > > > > > > > > >
> > > > > > > > > > > Thanks Tom for the feedback!
> > > > > > > > > > >
> > > > > > > > > > > 1. If the data returned by the ReplicaAssignor
> > implementation does
> > > > > > > > not
> > > > > > > > > > > match that was requested, we'll also throw a
> > > > > > > ReplicaAssignorException
> > > > > > > > > > >
> > > > > > > > > > > 2. Good point, I'll update the KIP
> > > > > > > > > > >
> > > > > > > > > > > 3. The KIP mentions an error code associated with
> > > > > > > > > > > ReplicaAssignorException: REPLICA_ASSIGNOR_FAILED
> > > > > > > > > > >
> > > > > > > > > > > 4. (I'm naming your last question 4.) I spent some time
> > looking at
> > > > > > > > it.
> > > > > > > > > > > Initially I wanted to follow the model from the topic
> > policies. But
> > > > > > > > as
> > > > > > > > > > > you said, computing assignments for the whole batch may
> > be more
> > > > > > > > > > > desirable and also avoids incrementally updating the
> > cluster state.
> > > > > > > > > > > The logic in AdminManager is very much centered around
> > doing 1
> > > > > > > topic
> > > > > > > > > > > at a time but as far as I can tell we should be able to
> > update it
> > > > > > > to
> > > > > > > > > > > compute assignments for the whole batch.
> > > > > > > > > > >
> > > > > > > > > > > I'll play a bit more with 4. and I'll update the KIP in
> > the next
> > > > > > > few
> > > > > > > > days
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Sep 21, 2020 at 10:29 AM Tom Bentley <
> > tbentley@redhat.com>
> > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Hi Mickael,
> > > > > > > > > > > >
> > > > > > > > > > > > A few thoughts about the ReplicaAssignor contract:
> > > > > > > > > > > >
> > > > > > > > > > > > 1. What happens if a ReplicaAssignor impl returns a
> > Map where
> > > > > > > some
> > > > > > > > > > > > assignments don't meet the given replication factor?
> > > > > > > > > > > > 2. Fixing the signature of assignReplicasToBrokers()
> > as you have
> > > > > > > > would make
> > > > > > > > > > > > it hard to pass extra information in the future (e.g.
> > maybe
> > > > > > > > someone comes
> > > > > > > > > > > > up with a use case where passing the clientId would be
> > needed)
> > > > > > > > because it
> > > > > > > > > > > > would require the interface be changed. If you
> > factored all the
> > > > > > > > parameters
> > > > > > > > > > > > into some new type then the signature could be
> > > > > > > > > > > > assignReplicasToBrokers(RequiredReplicaAssignment) and
> > adding any
> > > > > > > > new
> > > > > > > > > > > > properties to RequiredReplicaAssignment wouldn't break
> > the
> > > > > > > > contract.
> > > > > > > > > > > > 3. When an assignor throws RepliacAssignorException
> > what error
> > > > > > > > code will be
> > > > > > > > > > > > returned to the client?
> > > > > > > > > > > >
> > > > > > > > > > > > Also, this sentence got me thinking:
> > > > > > > > > > > >
> > > > > > > > > > > > > If multiple topics are present in the request,
> > AdminManager
> > > > > > > will
> > > > > > > > update
> > > > > > > > > > > > the Cluster object so the ReplicaAssignor class has
> > access to the
> > > > > > > > up to
> > > > > > > > > > > > date cluster metadata.
> > > > > > > > > > > >
> > > > > > > > > > > > Previously I've looked at how we can improve Kafka's
> > pluggable
> > > > > > > > policy
> > > > > > > > > > > > support to pass the more of the cluster state to policy
> > > > > > > > implementations. A
> > > > > > > > > > > > similar problem exists there, but the more cluster
> > state you pass
> > > > > > > > the
> > > > > > > > > > > > harder it is to incrementally change it as you iterate
> > through
> > > > > > > the
> > > > > > > > topics
> > > > > > > > > > > > to be created/modified. This likely isn't a problem
> > here and now,
> > > > > > > > but it
> > > > > > > > > > > > could limit any future changes to the pluggable
> > assignors. Did
> > > > > > > you
> > > > > > > > consider
> > > > > > > > > > > > the alternative of the assignor just being passed a
> > Set of
> > > > > > > > assignments?
> > > > > > > > > > > > That means you can just pass the cluster state as it
> > exists at
> > > > > > > the
> > > > > > > > time. It
> > > > > > > > > > > > also gives the implementation more information to work
> > with to
> > > > > > > > find more
> > > > > > > > > > > > optimal assignments. For example, it could perform a
> > bin packing
> > > > > > > > type
> > > > > > > > > > > > assignment which found a better optimum for the whole
> > collection
> > > > > > > > of topics
> > > > > > > > > > > > than one which was only told about all the topics in
> > the request
> > > > > > > > > > > > sequentially.
> > > > > > > > > > > >
> > > > > > > > > > > > Otherwise this looks like a valuable feature to me.
> > > > > > > > > > > >
> > > > > > > > > > > > Kind regards,
> > > > > > > > > > > >
> > > > > > > > > > > > Tom
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Sep 11, 2020 at 6:19 PM Robert Barrett <
> > > > > > > > bob.barrett@confluent.io>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Thanks Mickael, I think adding the new Exception
> > resolves my
> > > > > > > > concerns.
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <
> > > > > > > > mickael.maison@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks Robert and Ryanne for the feedback.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > ReplicaAssignor implementations can throw an
> > exception to
> > > > > > > > indicate an
> > > > > > > > > > > > > > assignment can't be computed. This is already what
> > the
> > > > > > > current
> > > > > > > > round
> > > > > > > > > > > > > > robin assignor does. Unfortunately at the moment,
> > there are
> > > > > > > no
> > > > > > > > generic
> > > > > > > > > > > > > > error codes if it fails, it's either
> > INVALID_PARTITIONS,
> > > > > > > > > > > > > > INVALID_REPLICATION_FACTOR or worse
> > UNKNOWN_SERVER_ERROR.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > So I think it would be nice to introduce a new
> > > > > > > Exception/Error
> > > > > > > > code to
> > > > > > > > > > > > > > cover any failures in the assignor and avoid
> > > > > > > > UNKNOWN_SERVER_ERROR.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I've updated the KIP accordingly, let me know if
> > you have
> > > > > > > more
> > > > > > > > questions.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <
> > > > > > > > ryannedolan@gmail.com>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks Mickael, the KIP makes sense to me, esp
> > for cases
> > > > > > > > where an
> > > > > > > > > > > > > > external
> > > > > > > > > > > > > > > system (like cruise control or an operator)
> > knows more
> > > > > > > about
> > > > > > > > the target
> > > > > > > > > > > > > > > cluster state than the broker does.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Ryanne
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <
> > > > > > > > > > > > > mickael.maison@gmail.com>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I've created KIP-660 to make the replica
> > assignment logic
> > > > > > > > pluggable.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > >
> > > > > > >
> > https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-660%253A%2BPluggable%2BReplicaAssignor&amp;data=02%7C01%7Cagencer%40linkedin.com%7Ca156bf97031b4100b62d08d866293434%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637371674445085863&amp;sdata=Cz1u3y1M%2BH5dFIx%2BHkQwugN%2FqTH1ugjXaaBhbToCkDM%3D&amp;reserved=0
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Please take a look and let me know if you have
> > any
> > > > > > > > feedback.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > >
> > > > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Gwen Shapira
> > > > Engineering Manager | Confluent
> > > > 650.450.2760 | @gwenshap
> > > > Follow us: Twitter | blog
> > >
> >
>

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Edoardo Comar <ed...@gmail.com>.
Hi Colin
thanks for your comments.
I think your objections to creating an interface for replica placement
could be used against similar server-side plug-ins (Authorizer,
QuotaCallback).
They too are on sensitive code paths, can cause problems if badly
written/poorly tested and may cause a pain on the evolution of Kafka if
backward compatibility is promised.

On backward compatibility, I'm all for marking the interfaces as
implement-at-your-risk :-) so that a minority of implementers won't keep
the ecosystem on a handbrake.

On 'users', I think that the implementers of these interfaces are teams
running Kafka clusters, not 'users' publishing and consuming messages or
streams.
Therefore I'd expect them to use these plug-in point implementations
with due care, or else they risk extinction by survival of the fittest ...
:-)

cheers
Edoardo

On Wed, 2 Dec 2020 at 18:46, Colin McCabe <cm...@apache.org> wrote:

> Hi Mickael,
>
> To be honest, I think it would be better not to make replica placement
> pluggable.
>
> When I worked on the Hadoop Distributed Filesystem, we supported pluggable
> replica placement policies, and it never worked out well.  Users would
> write plugin code that ran in a very sensitive context, and it would crash
> or perform poorly, causing problems which they often asked us to
> troubleshoot.  The plugin interface became very complex because people kept
> adding stuff, and nothing could be removed because it would break API
> compatibility.
>
> Our advice to users was always not to do this, but to do placement on the
> client side if they really wanted to control placement.  In general I think
> we made a mistake by creating that API at all, and we should have just
> provided more configuration knobs for placement instead.  (We did add a few
> such knobs over time, and they were much better at solving people's
> problems than the placement API...)
>
> The reality is that in order to do a good job with replica placement, your
> code needs to be very well integrated with the rest of the system.  As you
> said, you need to know about what other partitions and brokers exist.  This
> information can be quite large, if you have a large cluster, and it is
> constantly changing.  More than that, if you want to do better than the
> current built-in policy, you need additional information like metrics about
> how much load each broker is under, how much disk space it has, reads vs.
> writes, etc. etc.  If you want to treat some nodes specially, you need a
> place to store that metadata.  I think people who want to customize this
> would be better off forking.
>
> If we are absolutely convinced we want to do this, we should at least mark
> any internal classes we're exposing here as Evolving, rather than Stable,
> so that we can change them at will.  Basically not give a compatibility
> guarantee for this.
>
> best,
> Colin
>
>
> On Thu, Nov 26, 2020, at 08:47, Mickael Maison wrote:
> > Thanks Gwen for following up.
> >
> > With this extra bit of context, David's comments make more sense.
> >
> > If we move the replica placement plugin to the controller, I think
> > most of the API can stay the same. However, as David mentioned,
> > Cluster may be problematic.
> > In a replica placement plugin, you'd typically want to retrieve the
> > list of brokers and the list of partitions (including leader and
> > replicas) so it should not be too hard to come up with a new interface
> > instead of using Cluster.
> >
> > But until KIP-631 is done, new types used for metadata in the
> > controller are not known. So I wonder if we need to wait for that KIP
> > before we can make further progress here or if we should be fine
> > having a pretty generic Metadata interface.
> >
> > Maybe Colin/Ismael can comment and advise us here?
> >
> > On Tue, Nov 24, 2020 at 8:47 PM Gwen Shapira <gw...@confluent.io> wrote:
> > >
> > > Michael,
> > >
> > > I talked a bit to Colin and Ismael offline and got some clarity about
> > > KIP-500. Basically, replica placement requires an entire metadata map
> > > - all the placements of all replicas. Since one of the goals of
> > > KIP-500 is to go to 1M or even 10M partitions on the cluster, this
> > > will be a rather large map. Since brokers normally don't need to know
> > > all the placements (they just need to know about the subset of
> > > replicas that they lead or follow), the idea is to keep this map on
> > > the controller only. Which means that the replica placement plugin
> > > will ideally be on the controller too. This also has a nice side
> > > benefit - since we will be able to run the controller quorum on a
> > > separate set of machines, we'll be able to replace the replica
> > > placement plugin by updating 3-5 controller nodes, not the entire
> > > cluster.
> > >
> > > Gwen
> > >
> > > On Thu, Nov 19, 2020 at 4:54 AM Mickael Maison <
> mickael.maison@gmail.com> wrote:
> > > >
> > > > Hi David,
> > > >
> > > > I think using updateClusterMetadata() like in ClientQuotaCallback is
> a
> > > > great idea. I've updated the KIP accordingly.
> > > >
> > > > As mentioned, computing assignments for the full batch of
> > > > topics/partitions was considered but it made the logic in
> AdminManager
> > > > really complicated. For the initial use cases this KIP is targetting,
> > > > it felt simpler and acceptable to compute assignments one topic at a
> > > > time.
> > > >
> > > > Cluster is already used in other APIs, like ClientQuotaCallback so I
> > > > think it makes sense to reuse it here.
> > > >
> > > > I'm not fully up to date with the latest advances on KIP-500, but
> like
> > > > Gwen, I'm not sure we'd want to move that logic into the Controller.
> > > > This KIP is keeping the metadata creation in AdminManager and just
> > > > making the logic pluggable.
> > > >
> > > > Thanks
> > > >
> > > > On Mon, Nov 16, 2020 at 3:56 PM Gwen Shapira <gw...@confluent.io>
> wrote:
> > > > >
> > > > > Why would the replica placement logic run in the controller rather
> than in
> > > > > the AdminManager?
> > > > >
> > > > > My understanding, and correct me if I got it wrong, is that we are
> aiming
> > > > > at better separation of concerns. The controller job will be
> managing
> > > > > consensus and consistency of metadata, but creating the metadata
> itself
> > > > > will be in the AdminManager.
> > > > >
> > > > > On Mon, Nov 16, 2020, 5:31 AM David Jacot <dj...@confluent.io>
> wrote:
> > > > >
> > > > > > Hi Mickael,
> > > > > >
> > > > > > Thanks for the KIP. It is an interesting one.
> > > > > >
> > > > > > I imagine that custom assignors may use a rather complex model
> of the
> > > > > > cluster in order
> > > > > > to be able to allocate partitions in smarter ways. For instance,
> one may
> > > > > > use the distribution
> > > > > > of leaders in the cluster to allocate the new leaders. With the
> current
> > > > > > interface, that
> > > > > > means computing the distribution based on the Cluster received
> for every
> > > > > > assignment
> > > > > > request. That could become pretty inefficient in clusters with a
> large
> > > > > > number of nodes
> > > > > > and/or partitions. That could become even worse if the model is
> more
> > > > > > complicated.
> > > > > >
> > > > > > I wonder if you have thought about this or experienced this with
> your
> > > > > > prototype?
> > > > > >
> > > > > > Have you considered going with an approach à la
> ClientQuotaCallback where
> > > > > > the plugin
> > > > > > is updated when the Cluster has changed? That would allow to
> keep an
> > > > > > internal model
> > > > > > ready. Another way would be to use batching as suggested as it
> would allow
> > > > > > to amortize
> > > > > > the cost of building a model for the current request/user.
> > > > > >
> > > > > > I also wonder if using Cluster is a good idea here. With
> KIP-500, I can
> > > > > > imagine that this
> > > > > > plugin will run in the controller directly instead of running in
> the
> > > > > > AdminManager as today.
> > > > > > In this case, we could obviously continue to build that Cluster
> object but
> > > > > > we may have
> > > > > > better ways. Therefore, I wonder if having an interface to
> represent the
> > > > > > cluster may be
> > > > > > better from an evolution perspective. Have you considered this?
> > > > > >
> > > > > > Best,
> > > > > > David
> > > > > >
> > > > > > On Mon, Nov 16, 2020 at 12:10 PM Mickael Maison <
> mickael.maison@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > If I don't see additional feedback in the next few days, I'll
> start a
> > > > > > vote.
> > > > > > > Thanks
> > > > > > >
> > > > > > > On Thu, Nov 5, 2020 at 6:29 PM Mickael Maison <
> mickael.maison@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I've updated the KIP to reflect the latest discussions.
> > > > > > > >
> > > > > > > > Tom,
> > > > > > > > 2) Updated
> > > > > > > > 4) I decided against switching to a "batch interface" and
> added the
> > > > > > > > reasons in the Rejected Alternatives section
> > > > > > > >
> > > > > > > > Please take a look and let me know if you have any feedback.
> > > > > > > > Thanks
> > > > > > > >
> > > > > > > > On Tue, Oct 6, 2020 at 9:43 AM Mickael Maison <
> > > > > > mickael.maison@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi Efe,
> > > > > > > > >
> > > > > > > > > Thanks for the feedback.
> > > > > > > > > We also need to assign replicas when adding partitions to
> an existing
> > > > > > > > > topic. This is why I choose to use a list of partition
> ids. Otherwise
> > > > > > > > > we'd need the number of partitions and the starting
> partition id.
> > > > > > > > >
> > > > > > > > > Let me know if you have more questions
> > > > > > > > >
> > > > > > > > > On Tue, Oct 6, 2020 at 2:16 AM Efe Gencer
> > > > > > <ag...@linkedin.com.invalid>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Hi Mickael,
> > > > > > > > > >
> > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > A call to an external system, e.g. Cruise Control, in the
> > > > > > > implementation of the provided interface can indeed help with
> the initial
> > > > > > > assignment of partitions.
> > > > > > > > > >
> > > > > > > > > > I am curious why the proposed
> > > > > > > `ReplicaAssignor#assignReplicasToBrokers` receives a list of
> partition
> > > > > > ids
> > > > > > > as opposed to the number of partitions to create the topic
> with?
> > > > > > > > > >
> > > > > > > > > > Would you clarify if this API is expected to be used (1)
> only for
> > > > > > > new topics or (2) also for existing topics?
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Efe
> > > > > > > > > > ________________________________
> > > > > > > > > > From: Mickael Maison <mi...@gmail.com>
> > > > > > > > > > Sent: Thursday, October 1, 2020 9:43 AM
> > > > > > > > > > To: dev <de...@kafka.apache.org>
> > > > > > > > > > Subject: Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor
> > > > > > > > > >
> > > > > > > > > > Thanks Tom for the feedback!
> > > > > > > > > >
> > > > > > > > > > 1. If the data returned by the ReplicaAssignor
> implementation does
> > > > > > > not
> > > > > > > > > > match that was requested, we'll also throw a
> > > > > > ReplicaAssignorException
> > > > > > > > > >
> > > > > > > > > > 2. Good point, I'll update the KIP
> > > > > > > > > >
> > > > > > > > > > 3. The KIP mentions an error code associated with
> > > > > > > > > > ReplicaAssignorException: REPLICA_ASSIGNOR_FAILED
> > > > > > > > > >
> > > > > > > > > > 4. (I'm naming your last question 4.) I spent some time
> looking at
> > > > > > > it.
> > > > > > > > > > Initially I wanted to follow the model from the topic
> policies. But
> > > > > > > as
> > > > > > > > > > you said, computing assignments for the whole batch may
> be more
> > > > > > > > > > desirable and also avoids incrementally updating the
> cluster state.
> > > > > > > > > > The logic in AdminManager is very much centered around
> doing 1
> > > > > > topic
> > > > > > > > > > at a time but as far as I can tell we should be able to
> update it
> > > > > > to
> > > > > > > > > > compute assignments for the whole batch.
> > > > > > > > > >
> > > > > > > > > > I'll play a bit more with 4. and I'll update the KIP in
> the next
> > > > > > few
> > > > > > > days
> > > > > > > > > >
> > > > > > > > > > On Mon, Sep 21, 2020 at 10:29 AM Tom Bentley <
> tbentley@redhat.com>
> > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > Hi Mickael,
> > > > > > > > > > >
> > > > > > > > > > > A few thoughts about the ReplicaAssignor contract:
> > > > > > > > > > >
> > > > > > > > > > > 1. What happens if a ReplicaAssignor impl returns a
> Map where
> > > > > > some
> > > > > > > > > > > assignments don't meet the given replication factor?
> > > > > > > > > > > 2. Fixing the signature of assignReplicasToBrokers()
> as you have
> > > > > > > would make
> > > > > > > > > > > it hard to pass extra information in the future (e.g.
> maybe
> > > > > > > someone comes
> > > > > > > > > > > up with a use case where passing the clientId would be
> needed)
> > > > > > > because it
> > > > > > > > > > > would require the interface be changed. If you
> factored all the
> > > > > > > parameters
> > > > > > > > > > > into some new type then the signature could be
> > > > > > > > > > > assignReplicasToBrokers(RequiredReplicaAssignment) and
> adding any
> > > > > > > new
> > > > > > > > > > > properties to RequiredReplicaAssignment wouldn't break
> the
> > > > > > > contract.
> > > > > > > > > > > 3. When an assignor throws RepliacAssignorException
> what error
> > > > > > > code will be
> > > > > > > > > > > returned to the client?
> > > > > > > > > > >
> > > > > > > > > > > Also, this sentence got me thinking:
> > > > > > > > > > >
> > > > > > > > > > > > If multiple topics are present in the request,
> AdminManager
> > > > > > will
> > > > > > > update
> > > > > > > > > > > the Cluster object so the ReplicaAssignor class has
> access to the
> > > > > > > up to
> > > > > > > > > > > date cluster metadata.
> > > > > > > > > > >
> > > > > > > > > > > Previously I've looked at how we can improve Kafka's
> pluggable
> > > > > > > policy
> > > > > > > > > > > support to pass the more of the cluster state to policy
> > > > > > > implementations. A
> > > > > > > > > > > similar problem exists there, but the more cluster
> state you pass
> > > > > > > the
> > > > > > > > > > > harder it is to incrementally change it as you iterate
> through
> > > > > > the
> > > > > > > topics
> > > > > > > > > > > to be created/modified. This likely isn't a problem
> here and now,
> > > > > > > but it
> > > > > > > > > > > could limit any future changes to the pluggable
> assignors. Did
> > > > > > you
> > > > > > > consider
> > > > > > > > > > > the alternative of the assignor just being passed a
> Set of
> > > > > > > assignments?
> > > > > > > > > > > That means you can just pass the cluster state as it
> exists at
> > > > > > the
> > > > > > > time. It
> > > > > > > > > > > also gives the implementation more information to work
> with to
> > > > > > > find more
> > > > > > > > > > > optimal assignments. For example, it could perform a
> bin packing
> > > > > > > type
> > > > > > > > > > > assignment which found a better optimum for the whole
> collection
> > > > > > > of topics
> > > > > > > > > > > than one which was only told about all the topics in
> the request
> > > > > > > > > > > sequentially.
> > > > > > > > > > >
> > > > > > > > > > > Otherwise this looks like a valuable feature to me.
> > > > > > > > > > >
> > > > > > > > > > > Kind regards,
> > > > > > > > > > >
> > > > > > > > > > > Tom
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Sep 11, 2020 at 6:19 PM Robert Barrett <
> > > > > > > bob.barrett@confluent.io>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thanks Mickael, I think adding the new Exception
> resolves my
> > > > > > > concerns.
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <
> > > > > > > mickael.maison@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Thanks Robert and Ryanne for the feedback.
> > > > > > > > > > > > >
> > > > > > > > > > > > > ReplicaAssignor implementations can throw an
> exception to
> > > > > > > indicate an
> > > > > > > > > > > > > assignment can't be computed. This is already what
> the
> > > > > > current
> > > > > > > round
> > > > > > > > > > > > > robin assignor does. Unfortunately at the moment,
> there are
> > > > > > no
> > > > > > > generic
> > > > > > > > > > > > > error codes if it fails, it's either
> INVALID_PARTITIONS,
> > > > > > > > > > > > > INVALID_REPLICATION_FACTOR or worse
> UNKNOWN_SERVER_ERROR.
> > > > > > > > > > > > >
> > > > > > > > > > > > > So I think it would be nice to introduce a new
> > > > > > Exception/Error
> > > > > > > code to
> > > > > > > > > > > > > cover any failures in the assignor and avoid
> > > > > > > UNKNOWN_SERVER_ERROR.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I've updated the KIP accordingly, let me know if
> you have
> > > > > > more
> > > > > > > questions.
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <
> > > > > > > ryannedolan@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks Mickael, the KIP makes sense to me, esp
> for cases
> > > > > > > where an
> > > > > > > > > > > > > external
> > > > > > > > > > > > > > system (like cruise control or an operator)
> knows more
> > > > > > about
> > > > > > > the target
> > > > > > > > > > > > > > cluster state than the broker does.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Ryanne
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <
> > > > > > > > > > > > mickael.maison@gmail.com>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I've created KIP-660 to make the replica
> assignment logic
> > > > > > > pluggable.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > >
> > > > > >
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-660%253A%2BPluggable%2BReplicaAssignor&amp;data=02%7C01%7Cagencer%40linkedin.com%7Ca156bf97031b4100b62d08d866293434%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637371674445085863&amp;sdata=Cz1u3y1M%2BH5dFIx%2BHkQwugN%2FqTH1ugjXaaBhbToCkDM%3D&amp;reserved=0
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Please take a look and let me know if you have
> any
> > > > > > > feedback.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks
> > > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > >
> > > > > >
> > >
> > >
> > >
> > > --
> > > Gwen Shapira
> > > Engineering Manager | Confluent
> > > 650.450.2760 | @gwenshap
> > > Follow us: Twitter | blog
> >
>

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Colin McCabe <cm...@apache.org>.
Hi Mickael,

To be honest, I think it would be better not to make replica placement pluggable.

When I worked on the Hadoop Distributed Filesystem, we supported pluggable replica placement policies, and it never worked out well.  Users would write plugin code that ran in a very sensitive context, and it would crash or perform poorly, causing problems which they often asked us to troubleshoot.  The plugin interface became very complex because people kept adding stuff, and nothing could be removed because it would break API compatibility.

Our advice to users was always not to do this, but to do placement on the client side if they really wanted to control placement.  In general I think we made a mistake by creating that API at all, and we should have just provided more configuration knobs for placement instead.  (We did add a few such knobs over time, and they were much better at solving people's problems than the placement API...)

The reality is that in order to do a good job with replica placement, your code needs to be very well integrated with the rest of the system.  As you said, you need to know about what other partitions and brokers exist.  This information can be quite large, if you have a large cluster, and it is constantly changing.  More than that, if you want to do better than the current built-in policy, you need additional information like metrics about how much load each broker is under, how much disk space it has, reads vs. writes, etc. etc.  If you want to treat some nodes specially, you need a place to store that metadata.  I think people who want to customize this would be better off forking.

If we are absolutely convinced we want to do this, we should at least mark any internal classes we're exposing here as Evolving, rather than Stable, so that we can change them at will.  Basically not give a compatibility guarantee for this.

best,
Colin


On Thu, Nov 26, 2020, at 08:47, Mickael Maison wrote:
> Thanks Gwen for following up.
> 
> With this extra bit of context, David's comments make more sense.
> 
> If we move the replica placement plugin to the controller, I think
> most of the API can stay the same. However, as David mentioned,
> Cluster may be problematic.
> In a replica placement plugin, you'd typically want to retrieve the
> list of brokers and the list of partitions (including leader and
> replicas) so it should not be too hard to come up with a new interface
> instead of using Cluster.
> 
> But until KIP-631 is done, new types used for metadata in the
> controller are not known. So I wonder if we need to wait for that KIP
> before we can make further progress here or if we should be fine
> having a pretty generic Metadata interface.
> 
> Maybe Colin/Ismael can comment and advise us here?
> 
> On Tue, Nov 24, 2020 at 8:47 PM Gwen Shapira <gw...@confluent.io> wrote:
> >
> > Michael,
> >
> > I talked a bit to Colin and Ismael offline and got some clarity about
> > KIP-500. Basically, replica placement requires an entire metadata map
> > - all the placements of all replicas. Since one of the goals of
> > KIP-500 is to go to 1M or even 10M partitions on the cluster, this
> > will be a rather large map. Since brokers normally don't need to know
> > all the placements (they just need to know about the subset of
> > replicas that they lead or follow), the idea is to keep this map on
> > the controller only. Which means that the replica placement plugin
> > will ideally be on the controller too. This also has a nice side
> > benefit - since we will be able to run the controller quorum on a
> > separate set of machines, we'll be able to replace the replica
> > placement plugin by updating 3-5 controller nodes, not the entire
> > cluster.
> >
> > Gwen
> >
> > On Thu, Nov 19, 2020 at 4:54 AM Mickael Maison <mi...@gmail.com> wrote:
> > >
> > > Hi David,
> > >
> > > I think using updateClusterMetadata() like in ClientQuotaCallback is a
> > > great idea. I've updated the KIP accordingly.
> > >
> > > As mentioned, computing assignments for the full batch of
> > > topics/partitions was considered but it made the logic in AdminManager
> > > really complicated. For the initial use cases this KIP is targetting,
> > > it felt simpler and acceptable to compute assignments one topic at a
> > > time.
> > >
> > > Cluster is already used in other APIs, like ClientQuotaCallback so I
> > > think it makes sense to reuse it here.
> > >
> > > I'm not fully up to date with the latest advances on KIP-500, but like
> > > Gwen, I'm not sure we'd want to move that logic into the Controller.
> > > This KIP is keeping the metadata creation in AdminManager and just
> > > making the logic pluggable.
> > >
> > > Thanks
> > >
> > > On Mon, Nov 16, 2020 at 3:56 PM Gwen Shapira <gw...@confluent.io> wrote:
> > > >
> > > > Why would the replica placement logic run in the controller rather than in
> > > > the AdminManager?
> > > >
> > > > My understanding, and correct me if I got it wrong, is that we are aiming
> > > > at better separation of concerns. The controller job will be managing
> > > > consensus and consistency of metadata, but creating the metadata itself
> > > > will be in the AdminManager.
> > > >
> > > > On Mon, Nov 16, 2020, 5:31 AM David Jacot <dj...@confluent.io> wrote:
> > > >
> > > > > Hi Mickael,
> > > > >
> > > > > Thanks for the KIP. It is an interesting one.
> > > > >
> > > > > I imagine that custom assignors may use a rather complex model of the
> > > > > cluster in order
> > > > > to be able to allocate partitions in smarter ways. For instance, one may
> > > > > use the distribution
> > > > > of leaders in the cluster to allocate the new leaders. With the current
> > > > > interface, that
> > > > > means computing the distribution based on the Cluster received for every
> > > > > assignment
> > > > > request. That could become pretty inefficient in clusters with a large
> > > > > number of nodes
> > > > > and/or partitions. That could become even worse if the model is more
> > > > > complicated.
> > > > >
> > > > > I wonder if you have thought about this or experienced this with your
> > > > > prototype?
> > > > >
> > > > > Have you considered going with an approach à la ClientQuotaCallback where
> > > > > the plugin
> > > > > is updated when the Cluster has changed? That would allow to keep an
> > > > > internal model
> > > > > ready. Another way would be to use batching as suggested as it would allow
> > > > > to amortize
> > > > > the cost of building a model for the current request/user.
> > > > >
> > > > > I also wonder if using Cluster is a good idea here. With KIP-500, I can
> > > > > imagine that this
> > > > > plugin will run in the controller directly instead of running in the
> > > > > AdminManager as today.
> > > > > In this case, we could obviously continue to build that Cluster object but
> > > > > we may have
> > > > > better ways. Therefore, I wonder if having an interface to represent the
> > > > > cluster may be
> > > > > better from an evolution perspective. Have you considered this?
> > > > >
> > > > > Best,
> > > > > David
> > > > >
> > > > > On Mon, Nov 16, 2020 at 12:10 PM Mickael Maison <mi...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > If I don't see additional feedback in the next few days, I'll start a
> > > > > vote.
> > > > > > Thanks
> > > > > >
> > > > > > On Thu, Nov 5, 2020 at 6:29 PM Mickael Maison <mi...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I've updated the KIP to reflect the latest discussions.
> > > > > > >
> > > > > > > Tom,
> > > > > > > 2) Updated
> > > > > > > 4) I decided against switching to a "batch interface" and added the
> > > > > > > reasons in the Rejected Alternatives section
> > > > > > >
> > > > > > > Please take a look and let me know if you have any feedback.
> > > > > > > Thanks
> > > > > > >
> > > > > > > On Tue, Oct 6, 2020 at 9:43 AM Mickael Maison <
> > > > > mickael.maison@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi Efe,
> > > > > > > >
> > > > > > > > Thanks for the feedback.
> > > > > > > > We also need to assign replicas when adding partitions to an existing
> > > > > > > > topic. This is why I choose to use a list of partition ids. Otherwise
> > > > > > > > we'd need the number of partitions and the starting partition id.
> > > > > > > >
> > > > > > > > Let me know if you have more questions
> > > > > > > >
> > > > > > > > On Tue, Oct 6, 2020 at 2:16 AM Efe Gencer
> > > > > <ag...@linkedin.com.invalid>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi Mickael,
> > > > > > > > >
> > > > > > > > > Thanks for the KIP!
> > > > > > > > > A call to an external system, e.g. Cruise Control, in the
> > > > > > implementation of the provided interface can indeed help with the initial
> > > > > > assignment of partitions.
> > > > > > > > >
> > > > > > > > > I am curious why the proposed
> > > > > > `ReplicaAssignor#assignReplicasToBrokers` receives a list of partition
> > > > > ids
> > > > > > as opposed to the number of partitions to create the topic with?
> > > > > > > > >
> > > > > > > > > Would you clarify if this API is expected to be used (1) only for
> > > > > > new topics or (2) also for existing topics?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Efe
> > > > > > > > > ________________________________
> > > > > > > > > From: Mickael Maison <mi...@gmail.com>
> > > > > > > > > Sent: Thursday, October 1, 2020 9:43 AM
> > > > > > > > > To: dev <de...@kafka.apache.org>
> > > > > > > > > Subject: Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor
> > > > > > > > >
> > > > > > > > > Thanks Tom for the feedback!
> > > > > > > > >
> > > > > > > > > 1. If the data returned by the ReplicaAssignor implementation does
> > > > > > not
> > > > > > > > > match that was requested, we'll also throw a
> > > > > ReplicaAssignorException
> > > > > > > > >
> > > > > > > > > 2. Good point, I'll update the KIP
> > > > > > > > >
> > > > > > > > > 3. The KIP mentions an error code associated with
> > > > > > > > > ReplicaAssignorException: REPLICA_ASSIGNOR_FAILED
> > > > > > > > >
> > > > > > > > > 4. (I'm naming your last question 4.) I spent some time looking at
> > > > > > it.
> > > > > > > > > Initially I wanted to follow the model from the topic policies. But
> > > > > > as
> > > > > > > > > you said, computing assignments for the whole batch may be more
> > > > > > > > > desirable and also avoids incrementally updating the cluster state.
> > > > > > > > > The logic in AdminManager is very much centered around doing 1
> > > > > topic
> > > > > > > > > at a time but as far as I can tell we should be able to update it
> > > > > to
> > > > > > > > > compute assignments for the whole batch.
> > > > > > > > >
> > > > > > > > > I'll play a bit more with 4. and I'll update the KIP in the next
> > > > > few
> > > > > > days
> > > > > > > > >
> > > > > > > > > On Mon, Sep 21, 2020 at 10:29 AM Tom Bentley <tb...@redhat.com>
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Hi Mickael,
> > > > > > > > > >
> > > > > > > > > > A few thoughts about the ReplicaAssignor contract:
> > > > > > > > > >
> > > > > > > > > > 1. What happens if a ReplicaAssignor impl returns a Map where
> > > > > some
> > > > > > > > > > assignments don't meet the given replication factor?
> > > > > > > > > > 2. Fixing the signature of assignReplicasToBrokers() as you have
> > > > > > would make
> > > > > > > > > > it hard to pass extra information in the future (e.g. maybe
> > > > > > someone comes
> > > > > > > > > > up with a use case where passing the clientId would be needed)
> > > > > > because it
> > > > > > > > > > would require the interface be changed. If you factored all the
> > > > > > parameters
> > > > > > > > > > into some new type then the signature could be
> > > > > > > > > > assignReplicasToBrokers(RequiredReplicaAssignment) and adding any
> > > > > > new
> > > > > > > > > > properties to RequiredReplicaAssignment wouldn't break the
> > > > > > contract.
> > > > > > > > > > 3. When an assignor throws RepliacAssignorException what error
> > > > > > code will be
> > > > > > > > > > returned to the client?
> > > > > > > > > >
> > > > > > > > > > Also, this sentence got me thinking:
> > > > > > > > > >
> > > > > > > > > > > If multiple topics are present in the request, AdminManager
> > > > > will
> > > > > > update
> > > > > > > > > > the Cluster object so the ReplicaAssignor class has access to the
> > > > > > up to
> > > > > > > > > > date cluster metadata.
> > > > > > > > > >
> > > > > > > > > > Previously I've looked at how we can improve Kafka's pluggable
> > > > > > policy
> > > > > > > > > > support to pass the more of the cluster state to policy
> > > > > > implementations. A
> > > > > > > > > > similar problem exists there, but the more cluster state you pass
> > > > > > the
> > > > > > > > > > harder it is to incrementally change it as you iterate through
> > > > > the
> > > > > > topics
> > > > > > > > > > to be created/modified. This likely isn't a problem here and now,
> > > > > > but it
> > > > > > > > > > could limit any future changes to the pluggable assignors. Did
> > > > > you
> > > > > > consider
> > > > > > > > > > the alternative of the assignor just being passed a Set of
> > > > > > assignments?
> > > > > > > > > > That means you can just pass the cluster state as it exists at
> > > > > the
> > > > > > time. It
> > > > > > > > > > also gives the implementation more information to work with to
> > > > > > find more
> > > > > > > > > > optimal assignments. For example, it could perform a bin packing
> > > > > > type
> > > > > > > > > > assignment which found a better optimum for the whole collection
> > > > > > of topics
> > > > > > > > > > than one which was only told about all the topics in the request
> > > > > > > > > > sequentially.
> > > > > > > > > >
> > > > > > > > > > Otherwise this looks like a valuable feature to me.
> > > > > > > > > >
> > > > > > > > > > Kind regards,
> > > > > > > > > >
> > > > > > > > > > Tom
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Sep 11, 2020 at 6:19 PM Robert Barrett <
> > > > > > bob.barrett@confluent.io>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thanks Mickael, I think adding the new Exception resolves my
> > > > > > concerns.
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <
> > > > > > mickael.maison@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thanks Robert and Ryanne for the feedback.
> > > > > > > > > > > >
> > > > > > > > > > > > ReplicaAssignor implementations can throw an exception to
> > > > > > indicate an
> > > > > > > > > > > > assignment can't be computed. This is already what the
> > > > > current
> > > > > > round
> > > > > > > > > > > > robin assignor does. Unfortunately at the moment, there are
> > > > > no
> > > > > > generic
> > > > > > > > > > > > error codes if it fails, it's either INVALID_PARTITIONS,
> > > > > > > > > > > > INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.
> > > > > > > > > > > >
> > > > > > > > > > > > So I think it would be nice to introduce a new
> > > > > Exception/Error
> > > > > > code to
> > > > > > > > > > > > cover any failures in the assignor and avoid
> > > > > > UNKNOWN_SERVER_ERROR.
> > > > > > > > > > > >
> > > > > > > > > > > > I've updated the KIP accordingly, let me know if you have
> > > > > more
> > > > > > questions.
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <
> > > > > > ryannedolan@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks Mickael, the KIP makes sense to me, esp for cases
> > > > > > where an
> > > > > > > > > > > > external
> > > > > > > > > > > > > system (like cruise control or an operator) knows more
> > > > > about
> > > > > > the target
> > > > > > > > > > > > > cluster state than the broker does.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Ryanne
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <
> > > > > > > > > > > mickael.maison@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I've created KIP-660 to make the replica assignment logic
> > > > > > pluggable.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > >
> > > > > https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-660%253A%2BPluggable%2BReplicaAssignor&amp;data=02%7C01%7Cagencer%40linkedin.com%7Ca156bf97031b4100b62d08d866293434%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637371674445085863&amp;sdata=Cz1u3y1M%2BH5dFIx%2BHkQwugN%2FqTH1ugjXaaBhbToCkDM%3D&amp;reserved=0
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Please take a look and let me know if you have any
> > > > > > feedback.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > >
> > > > >
> >
> >
> >
> > --
> > Gwen Shapira
> > Engineering Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter | blog
>

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Mickael Maison <mi...@gmail.com>.
Thanks Gwen for following up.

With this extra bit of context, David's comments make more sense.

If we move the replica placement plugin to the controller, I think
most of the API can stay the same. However, as David mentioned,
Cluster may be problematic.
In a replica placement plugin, you'd typically want to retrieve the
list of brokers and the list of partitions (including leader and
replicas) so it should not be too hard to come up with a new interface
instead of using Cluster.

But until KIP-631 is done, new types used for metadata in the
controller are not known. So I wonder if we need to wait for that KIP
before we can make further progress here or if we should be fine
having a pretty generic Metadata interface.

Maybe Colin/Ismael can comment and advise us here?

On Tue, Nov 24, 2020 at 8:47 PM Gwen Shapira <gw...@confluent.io> wrote:
>
> Michael,
>
> I talked a bit to Colin and Ismael offline and got some clarity about
> KIP-500. Basically, replica placement requires an entire metadata map
> - all the placements of all replicas. Since one of the goals of
> KIP-500 is to go to 1M or even 10M partitions on the cluster, this
> will be a rather large map. Since brokers normally don't need to know
> all the placements (they just need to know about the subset of
> replicas that they lead or follow), the idea is to keep this map on
> the controller only. Which means that the replica placement plugin
> will ideally be on the controller too. This also has a nice side
> benefit - since we will be able to run the controller quorum on a
> separate set of machines, we'll be able to replace the replica
> placement plugin by updating 3-5 controller nodes, not the entire
> cluster.
>
> Gwen
>
> On Thu, Nov 19, 2020 at 4:54 AM Mickael Maison <mi...@gmail.com> wrote:
> >
> > Hi David,
> >
> > I think using updateClusterMetadata() like in ClientQuotaCallback is a
> > great idea. I've updated the KIP accordingly.
> >
> > As mentioned, computing assignments for the full batch of
> > topics/partitions was considered but it made the logic in AdminManager
> > really complicated. For the initial use cases this KIP is targetting,
> > it felt simpler and acceptable to compute assignments one topic at a
> > time.
> >
> > Cluster is already used in other APIs, like ClientQuotaCallback so I
> > think it makes sense to reuse it here.
> >
> > I'm not fully up to date with the latest advances on KIP-500, but like
> > Gwen, I'm not sure we'd want to move that logic into the Controller.
> > This KIP is keeping the metadata creation in AdminManager and just
> > making the logic pluggable.
> >
> > Thanks
> >
> > On Mon, Nov 16, 2020 at 3:56 PM Gwen Shapira <gw...@confluent.io> wrote:
> > >
> > > Why would the replica placement logic run in the controller rather than in
> > > the AdminManager?
> > >
> > > My understanding, and correct me if I got it wrong, is that we are aiming
> > > at better separation of concerns. The controller job will be managing
> > > consensus and consistency of metadata, but creating the metadata itself
> > > will be in the AdminManager.
> > >
> > > On Mon, Nov 16, 2020, 5:31 AM David Jacot <dj...@confluent.io> wrote:
> > >
> > > > Hi Mickael,
> > > >
> > > > Thanks for the KIP. It is an interesting one.
> > > >
> > > > I imagine that custom assignors may use a rather complex model of the
> > > > cluster in order
> > > > to be able to allocate partitions in smarter ways. For instance, one may
> > > > use the distribution
> > > > of leaders in the cluster to allocate the new leaders. With the current
> > > > interface, that
> > > > means computing the distribution based on the Cluster received for every
> > > > assignment
> > > > request. That could become pretty inefficient in clusters with a large
> > > > number of nodes
> > > > and/or partitions. That could become even worse if the model is more
> > > > complicated.
> > > >
> > > > I wonder if you have thought about this or experienced this with your
> > > > prototype?
> > > >
> > > > Have you considered going with an approach à la ClientQuotaCallback where
> > > > the plugin
> > > > is updated when the Cluster has changed? That would allow to keep an
> > > > internal model
> > > > ready. Another way would be to use batching as suggested as it would allow
> > > > to amortize
> > > > the cost of building a model for the current request/user.
> > > >
> > > > I also wonder if using Cluster is a good idea here. With KIP-500, I can
> > > > imagine that this
> > > > plugin will run in the controller directly instead of running in the
> > > > AdminManager as today.
> > > > In this case, we could obviously continue to build that Cluster object but
> > > > we may have
> > > > better ways. Therefore, I wonder if having an interface to represent the
> > > > cluster may be
> > > > better from an evolution perspective. Have you considered this?
> > > >
> > > > Best,
> > > > David
> > > >
> > > > On Mon, Nov 16, 2020 at 12:10 PM Mickael Maison <mi...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > If I don't see additional feedback in the next few days, I'll start a
> > > > vote.
> > > > > Thanks
> > > > >
> > > > > On Thu, Nov 5, 2020 at 6:29 PM Mickael Maison <mi...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I've updated the KIP to reflect the latest discussions.
> > > > > >
> > > > > > Tom,
> > > > > > 2) Updated
> > > > > > 4) I decided against switching to a "batch interface" and added the
> > > > > > reasons in the Rejected Alternatives section
> > > > > >
> > > > > > Please take a look and let me know if you have any feedback.
> > > > > > Thanks
> > > > > >
> > > > > > On Tue, Oct 6, 2020 at 9:43 AM Mickael Maison <
> > > > mickael.maison@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > Hi Efe,
> > > > > > >
> > > > > > > Thanks for the feedback.
> > > > > > > We also need to assign replicas when adding partitions to an existing
> > > > > > > topic. This is why I choose to use a list of partition ids. Otherwise
> > > > > > > we'd need the number of partitions and the starting partition id.
> > > > > > >
> > > > > > > Let me know if you have more questions
> > > > > > >
> > > > > > > On Tue, Oct 6, 2020 at 2:16 AM Efe Gencer
> > > > <ag...@linkedin.com.invalid>
> > > > > wrote:
> > > > > > > >
> > > > > > > > Hi Mickael,
> > > > > > > >
> > > > > > > > Thanks for the KIP!
> > > > > > > > A call to an external system, e.g. Cruise Control, in the
> > > > > implementation of the provided interface can indeed help with the initial
> > > > > assignment of partitions.
> > > > > > > >
> > > > > > > > I am curious why the proposed
> > > > > `ReplicaAssignor#assignReplicasToBrokers` receives a list of partition
> > > > ids
> > > > > as opposed to the number of partitions to create the topic with?
> > > > > > > >
> > > > > > > > Would you clarify if this API is expected to be used (1) only for
> > > > > new topics or (2) also for existing topics?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Efe
> > > > > > > > ________________________________
> > > > > > > > From: Mickael Maison <mi...@gmail.com>
> > > > > > > > Sent: Thursday, October 1, 2020 9:43 AM
> > > > > > > > To: dev <de...@kafka.apache.org>
> > > > > > > > Subject: Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor
> > > > > > > >
> > > > > > > > Thanks Tom for the feedback!
> > > > > > > >
> > > > > > > > 1. If the data returned by the ReplicaAssignor implementation does
> > > > > not
> > > > > > > > match that was requested, we'll also throw a
> > > > ReplicaAssignorException
> > > > > > > >
> > > > > > > > 2. Good point, I'll update the KIP
> > > > > > > >
> > > > > > > > 3. The KIP mentions an error code associated with
> > > > > > > > ReplicaAssignorException: REPLICA_ASSIGNOR_FAILED
> > > > > > > >
> > > > > > > > 4. (I'm naming your last question 4.) I spent some time looking at
> > > > > it.
> > > > > > > > Initially I wanted to follow the model from the topic policies. But
> > > > > as
> > > > > > > > you said, computing assignments for the whole batch may be more
> > > > > > > > desirable and also avoids incrementally updating the cluster state.
> > > > > > > > The logic in AdminManager is very much centered around doing 1
> > > > topic
> > > > > > > > at a time but as far as I can tell we should be able to update it
> > > > to
> > > > > > > > compute assignments for the whole batch.
> > > > > > > >
> > > > > > > > I'll play a bit more with 4. and I'll update the KIP in the next
> > > > few
> > > > > days
> > > > > > > >
> > > > > > > > On Mon, Sep 21, 2020 at 10:29 AM Tom Bentley <tb...@redhat.com>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi Mickael,
> > > > > > > > >
> > > > > > > > > A few thoughts about the ReplicaAssignor contract:
> > > > > > > > >
> > > > > > > > > 1. What happens if a ReplicaAssignor impl returns a Map where
> > > > some
> > > > > > > > > assignments don't meet the given replication factor?
> > > > > > > > > 2. Fixing the signature of assignReplicasToBrokers() as you have
> > > > > would make
> > > > > > > > > it hard to pass extra information in the future (e.g. maybe
> > > > > someone comes
> > > > > > > > > up with a use case where passing the clientId would be needed)
> > > > > because it
> > > > > > > > > would require the interface be changed. If you factored all the
> > > > > parameters
> > > > > > > > > into some new type then the signature could be
> > > > > > > > > assignReplicasToBrokers(RequiredReplicaAssignment) and adding any
> > > > > new
> > > > > > > > > properties to RequiredReplicaAssignment wouldn't break the
> > > > > contract.
> > > > > > > > > 3. When an assignor throws RepliacAssignorException what error
> > > > > code will be
> > > > > > > > > returned to the client?
> > > > > > > > >
> > > > > > > > > Also, this sentence got me thinking:
> > > > > > > > >
> > > > > > > > > > If multiple topics are present in the request, AdminManager
> > > > will
> > > > > update
> > > > > > > > > the Cluster object so the ReplicaAssignor class has access to the
> > > > > up to
> > > > > > > > > date cluster metadata.
> > > > > > > > >
> > > > > > > > > Previously I've looked at how we can improve Kafka's pluggable
> > > > > policy
> > > > > > > > > support to pass the more of the cluster state to policy
> > > > > implementations. A
> > > > > > > > > similar problem exists there, but the more cluster state you pass
> > > > > the
> > > > > > > > > harder it is to incrementally change it as you iterate through
> > > > the
> > > > > topics
> > > > > > > > > to be created/modified. This likely isn't a problem here and now,
> > > > > but it
> > > > > > > > > could limit any future changes to the pluggable assignors. Did
> > > > you
> > > > > consider
> > > > > > > > > the alternative of the assignor just being passed a Set of
> > > > > assignments?
> > > > > > > > > That means you can just pass the cluster state as it exists at
> > > > the
> > > > > time. It
> > > > > > > > > also gives the implementation more information to work with to
> > > > > find more
> > > > > > > > > optimal assignments. For example, it could perform a bin packing
> > > > > type
> > > > > > > > > assignment which found a better optimum for the whole collection
> > > > > of topics
> > > > > > > > > than one which was only told about all the topics in the request
> > > > > > > > > sequentially.
> > > > > > > > >
> > > > > > > > > Otherwise this looks like a valuable feature to me.
> > > > > > > > >
> > > > > > > > > Kind regards,
> > > > > > > > >
> > > > > > > > > Tom
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Sep 11, 2020 at 6:19 PM Robert Barrett <
> > > > > bob.barrett@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks Mickael, I think adding the new Exception resolves my
> > > > > concerns.
> > > > > > > > > >
> > > > > > > > > > On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <
> > > > > mickael.maison@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thanks Robert and Ryanne for the feedback.
> > > > > > > > > > >
> > > > > > > > > > > ReplicaAssignor implementations can throw an exception to
> > > > > indicate an
> > > > > > > > > > > assignment can't be computed. This is already what the
> > > > current
> > > > > round
> > > > > > > > > > > robin assignor does. Unfortunately at the moment, there are
> > > > no
> > > > > generic
> > > > > > > > > > > error codes if it fails, it's either INVALID_PARTITIONS,
> > > > > > > > > > > INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.
> > > > > > > > > > >
> > > > > > > > > > > So I think it would be nice to introduce a new
> > > > Exception/Error
> > > > > code to
> > > > > > > > > > > cover any failures in the assignor and avoid
> > > > > UNKNOWN_SERVER_ERROR.
> > > > > > > > > > >
> > > > > > > > > > > I've updated the KIP accordingly, let me know if you have
> > > > more
> > > > > questions.
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <
> > > > > ryannedolan@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks Mickael, the KIP makes sense to me, esp for cases
> > > > > where an
> > > > > > > > > > > external
> > > > > > > > > > > > system (like cruise control or an operator) knows more
> > > > about
> > > > > the target
> > > > > > > > > > > > cluster state than the broker does.
> > > > > > > > > > > >
> > > > > > > > > > > > Ryanne
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <
> > > > > > > > > > mickael.maison@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I've created KIP-660 to make the replica assignment logic
> > > > > pluggable.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > >
> > > > https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-660%253A%2BPluggable%2BReplicaAssignor&amp;data=02%7C01%7Cagencer%40linkedin.com%7Ca156bf97031b4100b62d08d866293434%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637371674445085863&amp;sdata=Cz1u3y1M%2BH5dFIx%2BHkQwugN%2FqTH1ugjXaaBhbToCkDM%3D&amp;reserved=0
> > > > > > > > > > > > >
> > > > > > > > > > > > > Please take a look and let me know if you have any
> > > > > feedback.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > >
> > > >
>
>
>
> --
> Gwen Shapira
> Engineering Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Gwen Shapira <gw...@confluent.io>.
Michael,

I talked a bit to Colin and Ismael offline and got some clarity about
KIP-500. Basically, replica placement requires an entire metadata map
- all the placements of all replicas. Since one of the goals of
KIP-500 is to go to 1M or even 10M partitions on the cluster, this
will be a rather large map. Since brokers normally don't need to know
all the placements (they just need to know about the subset of
replicas that they lead or follow), the idea is to keep this map on
the controller only. Which means that the replica placement plugin
will ideally be on the controller too. This also has a nice side
benefit - since we will be able to run the controller quorum on a
separate set of machines, we'll be able to replace the replica
placement plugin by updating 3-5 controller nodes, not the entire
cluster.

Gwen

On Thu, Nov 19, 2020 at 4:54 AM Mickael Maison <mi...@gmail.com> wrote:
>
> Hi David,
>
> I think using updateClusterMetadata() like in ClientQuotaCallback is a
> great idea. I've updated the KIP accordingly.
>
> As mentioned, computing assignments for the full batch of
> topics/partitions was considered but it made the logic in AdminManager
> really complicated. For the initial use cases this KIP is targetting,
> it felt simpler and acceptable to compute assignments one topic at a
> time.
>
> Cluster is already used in other APIs, like ClientQuotaCallback so I
> think it makes sense to reuse it here.
>
> I'm not fully up to date with the latest advances on KIP-500, but like
> Gwen, I'm not sure we'd want to move that logic into the Controller.
> This KIP is keeping the metadata creation in AdminManager and just
> making the logic pluggable.
>
> Thanks
>
> On Mon, Nov 16, 2020 at 3:56 PM Gwen Shapira <gw...@confluent.io> wrote:
> >
> > Why would the replica placement logic run in the controller rather than in
> > the AdminManager?
> >
> > My understanding, and correct me if I got it wrong, is that we are aiming
> > at better separation of concerns. The controller job will be managing
> > consensus and consistency of metadata, but creating the metadata itself
> > will be in the AdminManager.
> >
> > On Mon, Nov 16, 2020, 5:31 AM David Jacot <dj...@confluent.io> wrote:
> >
> > > Hi Mickael,
> > >
> > > Thanks for the KIP. It is an interesting one.
> > >
> > > I imagine that custom assignors may use a rather complex model of the
> > > cluster in order
> > > to be able to allocate partitions in smarter ways. For instance, one may
> > > use the distribution
> > > of leaders in the cluster to allocate the new leaders. With the current
> > > interface, that
> > > means computing the distribution based on the Cluster received for every
> > > assignment
> > > request. That could become pretty inefficient in clusters with a large
> > > number of nodes
> > > and/or partitions. That could become even worse if the model is more
> > > complicated.
> > >
> > > I wonder if you have thought about this or experienced this with your
> > > prototype?
> > >
> > > Have you considered going with an approach à la ClientQuotaCallback where
> > > the plugin
> > > is updated when the Cluster has changed? That would allow to keep an
> > > internal model
> > > ready. Another way would be to use batching as suggested as it would allow
> > > to amortize
> > > the cost of building a model for the current request/user.
> > >
> > > I also wonder if using Cluster is a good idea here. With KIP-500, I can
> > > imagine that this
> > > plugin will run in the controller directly instead of running in the
> > > AdminManager as today.
> > > In this case, we could obviously continue to build that Cluster object but
> > > we may have
> > > better ways. Therefore, I wonder if having an interface to represent the
> > > cluster may be
> > > better from an evolution perspective. Have you considered this?
> > >
> > > Best,
> > > David
> > >
> > > On Mon, Nov 16, 2020 at 12:10 PM Mickael Maison <mi...@gmail.com>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > If I don't see additional feedback in the next few days, I'll start a
> > > vote.
> > > > Thanks
> > > >
> > > > On Thu, Nov 5, 2020 at 6:29 PM Mickael Maison <mi...@gmail.com>
> > > > wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I've updated the KIP to reflect the latest discussions.
> > > > >
> > > > > Tom,
> > > > > 2) Updated
> > > > > 4) I decided against switching to a "batch interface" and added the
> > > > > reasons in the Rejected Alternatives section
> > > > >
> > > > > Please take a look and let me know if you have any feedback.
> > > > > Thanks
> > > > >
> > > > > On Tue, Oct 6, 2020 at 9:43 AM Mickael Maison <
> > > mickael.maison@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > Hi Efe,
> > > > > >
> > > > > > Thanks for the feedback.
> > > > > > We also need to assign replicas when adding partitions to an existing
> > > > > > topic. This is why I choose to use a list of partition ids. Otherwise
> > > > > > we'd need the number of partitions and the starting partition id.
> > > > > >
> > > > > > Let me know if you have more questions
> > > > > >
> > > > > > On Tue, Oct 6, 2020 at 2:16 AM Efe Gencer
> > > <ag...@linkedin.com.invalid>
> > > > wrote:
> > > > > > >
> > > > > > > Hi Mickael,
> > > > > > >
> > > > > > > Thanks for the KIP!
> > > > > > > A call to an external system, e.g. Cruise Control, in the
> > > > implementation of the provided interface can indeed help with the initial
> > > > assignment of partitions.
> > > > > > >
> > > > > > > I am curious why the proposed
> > > > `ReplicaAssignor#assignReplicasToBrokers` receives a list of partition
> > > ids
> > > > as opposed to the number of partitions to create the topic with?
> > > > > > >
> > > > > > > Would you clarify if this API is expected to be used (1) only for
> > > > new topics or (2) also for existing topics?
> > > > > > >
> > > > > > > Best,
> > > > > > > Efe
> > > > > > > ________________________________
> > > > > > > From: Mickael Maison <mi...@gmail.com>
> > > > > > > Sent: Thursday, October 1, 2020 9:43 AM
> > > > > > > To: dev <de...@kafka.apache.org>
> > > > > > > Subject: Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor
> > > > > > >
> > > > > > > Thanks Tom for the feedback!
> > > > > > >
> > > > > > > 1. If the data returned by the ReplicaAssignor implementation does
> > > > not
> > > > > > > match that was requested, we'll also throw a
> > > ReplicaAssignorException
> > > > > > >
> > > > > > > 2. Good point, I'll update the KIP
> > > > > > >
> > > > > > > 3. The KIP mentions an error code associated with
> > > > > > > ReplicaAssignorException: REPLICA_ASSIGNOR_FAILED
> > > > > > >
> > > > > > > 4. (I'm naming your last question 4.) I spent some time looking at
> > > > it.
> > > > > > > Initially I wanted to follow the model from the topic policies. But
> > > > as
> > > > > > > you said, computing assignments for the whole batch may be more
> > > > > > > desirable and also avoids incrementally updating the cluster state.
> > > > > > > The logic in AdminManager is very much centered around doing 1
> > > topic
> > > > > > > at a time but as far as I can tell we should be able to update it
> > > to
> > > > > > > compute assignments for the whole batch.
> > > > > > >
> > > > > > > I'll play a bit more with 4. and I'll update the KIP in the next
> > > few
> > > > days
> > > > > > >
> > > > > > > On Mon, Sep 21, 2020 at 10:29 AM Tom Bentley <tb...@redhat.com>
> > > > wrote:
> > > > > > > >
> > > > > > > > Hi Mickael,
> > > > > > > >
> > > > > > > > A few thoughts about the ReplicaAssignor contract:
> > > > > > > >
> > > > > > > > 1. What happens if a ReplicaAssignor impl returns a Map where
> > > some
> > > > > > > > assignments don't meet the given replication factor?
> > > > > > > > 2. Fixing the signature of assignReplicasToBrokers() as you have
> > > > would make
> > > > > > > > it hard to pass extra information in the future (e.g. maybe
> > > > someone comes
> > > > > > > > up with a use case where passing the clientId would be needed)
> > > > because it
> > > > > > > > would require the interface be changed. If you factored all the
> > > > parameters
> > > > > > > > into some new type then the signature could be
> > > > > > > > assignReplicasToBrokers(RequiredReplicaAssignment) and adding any
> > > > new
> > > > > > > > properties to RequiredReplicaAssignment wouldn't break the
> > > > contract.
> > > > > > > > 3. When an assignor throws RepliacAssignorException what error
> > > > code will be
> > > > > > > > returned to the client?
> > > > > > > >
> > > > > > > > Also, this sentence got me thinking:
> > > > > > > >
> > > > > > > > > If multiple topics are present in the request, AdminManager
> > > will
> > > > update
> > > > > > > > the Cluster object so the ReplicaAssignor class has access to the
> > > > up to
> > > > > > > > date cluster metadata.
> > > > > > > >
> > > > > > > > Previously I've looked at how we can improve Kafka's pluggable
> > > > policy
> > > > > > > > support to pass the more of the cluster state to policy
> > > > implementations. A
> > > > > > > > similar problem exists there, but the more cluster state you pass
> > > > the
> > > > > > > > harder it is to incrementally change it as you iterate through
> > > the
> > > > topics
> > > > > > > > to be created/modified. This likely isn't a problem here and now,
> > > > but it
> > > > > > > > could limit any future changes to the pluggable assignors. Did
> > > you
> > > > consider
> > > > > > > > the alternative of the assignor just being passed a Set of
> > > > assignments?
> > > > > > > > That means you can just pass the cluster state as it exists at
> > > the
> > > > time. It
> > > > > > > > also gives the implementation more information to work with to
> > > > find more
> > > > > > > > optimal assignments. For example, it could perform a bin packing
> > > > type
> > > > > > > > assignment which found a better optimum for the whole collection
> > > > of topics
> > > > > > > > than one which was only told about all the topics in the request
> > > > > > > > sequentially.
> > > > > > > >
> > > > > > > > Otherwise this looks like a valuable feature to me.
> > > > > > > >
> > > > > > > > Kind regards,
> > > > > > > >
> > > > > > > > Tom
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Sep 11, 2020 at 6:19 PM Robert Barrett <
> > > > bob.barrett@confluent.io>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Mickael, I think adding the new Exception resolves my
> > > > concerns.
> > > > > > > > >
> > > > > > > > > On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <
> > > > mickael.maison@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks Robert and Ryanne for the feedback.
> > > > > > > > > >
> > > > > > > > > > ReplicaAssignor implementations can throw an exception to
> > > > indicate an
> > > > > > > > > > assignment can't be computed. This is already what the
> > > current
> > > > round
> > > > > > > > > > robin assignor does. Unfortunately at the moment, there are
> > > no
> > > > generic
> > > > > > > > > > error codes if it fails, it's either INVALID_PARTITIONS,
> > > > > > > > > > INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.
> > > > > > > > > >
> > > > > > > > > > So I think it would be nice to introduce a new
> > > Exception/Error
> > > > code to
> > > > > > > > > > cover any failures in the assignor and avoid
> > > > UNKNOWN_SERVER_ERROR.
> > > > > > > > > >
> > > > > > > > > > I've updated the KIP accordingly, let me know if you have
> > > more
> > > > questions.
> > > > > > > > > >
> > > > > > > > > > On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <
> > > > ryannedolan@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > Thanks Mickael, the KIP makes sense to me, esp for cases
> > > > where an
> > > > > > > > > > external
> > > > > > > > > > > system (like cruise control or an operator) knows more
> > > about
> > > > the target
> > > > > > > > > > > cluster state than the broker does.
> > > > > > > > > > >
> > > > > > > > > > > Ryanne
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <
> > > > > > > > > mickael.maison@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi,
> > > > > > > > > > > >
> > > > > > > > > > > > I've created KIP-660 to make the replica assignment logic
> > > > pluggable.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > >
> > > https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-660%253A%2BPluggable%2BReplicaAssignor&amp;data=02%7C01%7Cagencer%40linkedin.com%7Ca156bf97031b4100b62d08d866293434%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637371674445085863&amp;sdata=Cz1u3y1M%2BH5dFIx%2BHkQwugN%2FqTH1ugjXaaBhbToCkDM%3D&amp;reserved=0
> > > > > > > > > > > >
> > > > > > > > > > > > Please take a look and let me know if you have any
> > > > feedback.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > >
> > >



-- 
Gwen Shapira
Engineering Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Mickael Maison <mi...@gmail.com>.
Hi David,

I think using updateClusterMetadata() like in ClientQuotaCallback is a
great idea. I've updated the KIP accordingly.

As mentioned, computing assignments for the full batch of
topics/partitions was considered but it made the logic in AdminManager
really complicated. For the initial use cases this KIP is targetting,
it felt simpler and acceptable to compute assignments one topic at a
time.

Cluster is already used in other APIs, like ClientQuotaCallback so I
think it makes sense to reuse it here.

I'm not fully up to date with the latest advances on KIP-500, but like
Gwen, I'm not sure we'd want to move that logic into the Controller.
This KIP is keeping the metadata creation in AdminManager and just
making the logic pluggable.

Thanks

On Mon, Nov 16, 2020 at 3:56 PM Gwen Shapira <gw...@confluent.io> wrote:
>
> Why would the replica placement logic run in the controller rather than in
> the AdminManager?
>
> My understanding, and correct me if I got it wrong, is that we are aiming
> at better separation of concerns. The controller job will be managing
> consensus and consistency of metadata, but creating the metadata itself
> will be in the AdminManager.
>
> On Mon, Nov 16, 2020, 5:31 AM David Jacot <dj...@confluent.io> wrote:
>
> > Hi Mickael,
> >
> > Thanks for the KIP. It is an interesting one.
> >
> > I imagine that custom assignors may use a rather complex model of the
> > cluster in order
> > to be able to allocate partitions in smarter ways. For instance, one may
> > use the distribution
> > of leaders in the cluster to allocate the new leaders. With the current
> > interface, that
> > means computing the distribution based on the Cluster received for every
> > assignment
> > request. That could become pretty inefficient in clusters with a large
> > number of nodes
> > and/or partitions. That could become even worse if the model is more
> > complicated.
> >
> > I wonder if you have thought about this or experienced this with your
> > prototype?
> >
> > Have you considered going with an approach à la ClientQuotaCallback where
> > the plugin
> > is updated when the Cluster has changed? That would allow to keep an
> > internal model
> > ready. Another way would be to use batching as suggested as it would allow
> > to amortize
> > the cost of building a model for the current request/user.
> >
> > I also wonder if using Cluster is a good idea here. With KIP-500, I can
> > imagine that this
> > plugin will run in the controller directly instead of running in the
> > AdminManager as today.
> > In this case, we could obviously continue to build that Cluster object but
> > we may have
> > better ways. Therefore, I wonder if having an interface to represent the
> > cluster may be
> > better from an evolution perspective. Have you considered this?
> >
> > Best,
> > David
> >
> > On Mon, Nov 16, 2020 at 12:10 PM Mickael Maison <mi...@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > If I don't see additional feedback in the next few days, I'll start a
> > vote.
> > > Thanks
> > >
> > > On Thu, Nov 5, 2020 at 6:29 PM Mickael Maison <mi...@gmail.com>
> > > wrote:
> > > >
> > > > Hi all,
> > > >
> > > > I've updated the KIP to reflect the latest discussions.
> > > >
> > > > Tom,
> > > > 2) Updated
> > > > 4) I decided against switching to a "batch interface" and added the
> > > > reasons in the Rejected Alternatives section
> > > >
> > > > Please take a look and let me know if you have any feedback.
> > > > Thanks
> > > >
> > > > On Tue, Oct 6, 2020 at 9:43 AM Mickael Maison <
> > mickael.maison@gmail.com>
> > > wrote:
> > > > >
> > > > > Hi Efe,
> > > > >
> > > > > Thanks for the feedback.
> > > > > We also need to assign replicas when adding partitions to an existing
> > > > > topic. This is why I choose to use a list of partition ids. Otherwise
> > > > > we'd need the number of partitions and the starting partition id.
> > > > >
> > > > > Let me know if you have more questions
> > > > >
> > > > > On Tue, Oct 6, 2020 at 2:16 AM Efe Gencer
> > <ag...@linkedin.com.invalid>
> > > wrote:
> > > > > >
> > > > > > Hi Mickael,
> > > > > >
> > > > > > Thanks for the KIP!
> > > > > > A call to an external system, e.g. Cruise Control, in the
> > > implementation of the provided interface can indeed help with the initial
> > > assignment of partitions.
> > > > > >
> > > > > > I am curious why the proposed
> > > `ReplicaAssignor#assignReplicasToBrokers` receives a list of partition
> > ids
> > > as opposed to the number of partitions to create the topic with?
> > > > > >
> > > > > > Would you clarify if this API is expected to be used (1) only for
> > > new topics or (2) also for existing topics?
> > > > > >
> > > > > > Best,
> > > > > > Efe
> > > > > > ________________________________
> > > > > > From: Mickael Maison <mi...@gmail.com>
> > > > > > Sent: Thursday, October 1, 2020 9:43 AM
> > > > > > To: dev <de...@kafka.apache.org>
> > > > > > Subject: Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor
> > > > > >
> > > > > > Thanks Tom for the feedback!
> > > > > >
> > > > > > 1. If the data returned by the ReplicaAssignor implementation does
> > > not
> > > > > > match that was requested, we'll also throw a
> > ReplicaAssignorException
> > > > > >
> > > > > > 2. Good point, I'll update the KIP
> > > > > >
> > > > > > 3. The KIP mentions an error code associated with
> > > > > > ReplicaAssignorException: REPLICA_ASSIGNOR_FAILED
> > > > > >
> > > > > > 4. (I'm naming your last question 4.) I spent some time looking at
> > > it.
> > > > > > Initially I wanted to follow the model from the topic policies. But
> > > as
> > > > > > you said, computing assignments for the whole batch may be more
> > > > > > desirable and also avoids incrementally updating the cluster state.
> > > > > > The logic in AdminManager is very much centered around doing 1
> > topic
> > > > > > at a time but as far as I can tell we should be able to update it
> > to
> > > > > > compute assignments for the whole batch.
> > > > > >
> > > > > > I'll play a bit more with 4. and I'll update the KIP in the next
> > few
> > > days
> > > > > >
> > > > > > On Mon, Sep 21, 2020 at 10:29 AM Tom Bentley <tb...@redhat.com>
> > > wrote:
> > > > > > >
> > > > > > > Hi Mickael,
> > > > > > >
> > > > > > > A few thoughts about the ReplicaAssignor contract:
> > > > > > >
> > > > > > > 1. What happens if a ReplicaAssignor impl returns a Map where
> > some
> > > > > > > assignments don't meet the given replication factor?
> > > > > > > 2. Fixing the signature of assignReplicasToBrokers() as you have
> > > would make
> > > > > > > it hard to pass extra information in the future (e.g. maybe
> > > someone comes
> > > > > > > up with a use case where passing the clientId would be needed)
> > > because it
> > > > > > > would require the interface be changed. If you factored all the
> > > parameters
> > > > > > > into some new type then the signature could be
> > > > > > > assignReplicasToBrokers(RequiredReplicaAssignment) and adding any
> > > new
> > > > > > > properties to RequiredReplicaAssignment wouldn't break the
> > > contract.
> > > > > > > 3. When an assignor throws RepliacAssignorException what error
> > > code will be
> > > > > > > returned to the client?
> > > > > > >
> > > > > > > Also, this sentence got me thinking:
> > > > > > >
> > > > > > > > If multiple topics are present in the request, AdminManager
> > will
> > > update
> > > > > > > the Cluster object so the ReplicaAssignor class has access to the
> > > up to
> > > > > > > date cluster metadata.
> > > > > > >
> > > > > > > Previously I've looked at how we can improve Kafka's pluggable
> > > policy
> > > > > > > support to pass the more of the cluster state to policy
> > > implementations. A
> > > > > > > similar problem exists there, but the more cluster state you pass
> > > the
> > > > > > > harder it is to incrementally change it as you iterate through
> > the
> > > topics
> > > > > > > to be created/modified. This likely isn't a problem here and now,
> > > but it
> > > > > > > could limit any future changes to the pluggable assignors. Did
> > you
> > > consider
> > > > > > > the alternative of the assignor just being passed a Set of
> > > assignments?
> > > > > > > That means you can just pass the cluster state as it exists at
> > the
> > > time. It
> > > > > > > also gives the implementation more information to work with to
> > > find more
> > > > > > > optimal assignments. For example, it could perform a bin packing
> > > type
> > > > > > > assignment which found a better optimum for the whole collection
> > > of topics
> > > > > > > than one which was only told about all the topics in the request
> > > > > > > sequentially.
> > > > > > >
> > > > > > > Otherwise this looks like a valuable feature to me.
> > > > > > >
> > > > > > > Kind regards,
> > > > > > >
> > > > > > > Tom
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Sep 11, 2020 at 6:19 PM Robert Barrett <
> > > bob.barrett@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks Mickael, I think adding the new Exception resolves my
> > > concerns.
> > > > > > > >
> > > > > > > > On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <
> > > mickael.maison@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Robert and Ryanne for the feedback.
> > > > > > > > >
> > > > > > > > > ReplicaAssignor implementations can throw an exception to
> > > indicate an
> > > > > > > > > assignment can't be computed. This is already what the
> > current
> > > round
> > > > > > > > > robin assignor does. Unfortunately at the moment, there are
> > no
> > > generic
> > > > > > > > > error codes if it fails, it's either INVALID_PARTITIONS,
> > > > > > > > > INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.
> > > > > > > > >
> > > > > > > > > So I think it would be nice to introduce a new
> > Exception/Error
> > > code to
> > > > > > > > > cover any failures in the assignor and avoid
> > > UNKNOWN_SERVER_ERROR.
> > > > > > > > >
> > > > > > > > > I've updated the KIP accordingly, let me know if you have
> > more
> > > questions.
> > > > > > > > >
> > > > > > > > > On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <
> > > ryannedolan@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Thanks Mickael, the KIP makes sense to me, esp for cases
> > > where an
> > > > > > > > > external
> > > > > > > > > > system (like cruise control or an operator) knows more
> > about
> > > the target
> > > > > > > > > > cluster state than the broker does.
> > > > > > > > > >
> > > > > > > > > > Ryanne
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <
> > > > > > > > mickael.maison@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi,
> > > > > > > > > > >
> > > > > > > > > > > I've created KIP-660 to make the replica assignment logic
> > > pluggable.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > >
> > https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-660%253A%2BPluggable%2BReplicaAssignor&amp;data=02%7C01%7Cagencer%40linkedin.com%7Ca156bf97031b4100b62d08d866293434%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637371674445085863&amp;sdata=Cz1u3y1M%2BH5dFIx%2BHkQwugN%2FqTH1ugjXaaBhbToCkDM%3D&amp;reserved=0
> > > > > > > > > > >
> > > > > > > > > > > Please take a look and let me know if you have any
> > > feedback.
> > > > > > > > > > >
> > > > > > > > > > > Thanks
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > >
> >

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Gwen Shapira <gw...@confluent.io>.
Why would the replica placement logic run in the controller rather than in
the AdminManager?

My understanding, and correct me if I got it wrong, is that we are aiming
at better separation of concerns. The controller job will be managing
consensus and consistency of metadata, but creating the metadata itself
will be in the AdminManager.

On Mon, Nov 16, 2020, 5:31 AM David Jacot <dj...@confluent.io> wrote:

> Hi Mickael,
>
> Thanks for the KIP. It is an interesting one.
>
> I imagine that custom assignors may use a rather complex model of the
> cluster in order
> to be able to allocate partitions in smarter ways. For instance, one may
> use the distribution
> of leaders in the cluster to allocate the new leaders. With the current
> interface, that
> means computing the distribution based on the Cluster received for every
> assignment
> request. That could become pretty inefficient in clusters with a large
> number of nodes
> and/or partitions. That could become even worse if the model is more
> complicated.
>
> I wonder if you have thought about this or experienced this with your
> prototype?
>
> Have you considered going with an approach à la ClientQuotaCallback where
> the plugin
> is updated when the Cluster has changed? That would allow to keep an
> internal model
> ready. Another way would be to use batching as suggested as it would allow
> to amortize
> the cost of building a model for the current request/user.
>
> I also wonder if using Cluster is a good idea here. With KIP-500, I can
> imagine that this
> plugin will run in the controller directly instead of running in the
> AdminManager as today.
> In this case, we could obviously continue to build that Cluster object but
> we may have
> better ways. Therefore, I wonder if having an interface to represent the
> cluster may be
> better from an evolution perspective. Have you considered this?
>
> Best,
> David
>
> On Mon, Nov 16, 2020 at 12:10 PM Mickael Maison <mi...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > If I don't see additional feedback in the next few days, I'll start a
> vote.
> > Thanks
> >
> > On Thu, Nov 5, 2020 at 6:29 PM Mickael Maison <mi...@gmail.com>
> > wrote:
> > >
> > > Hi all,
> > >
> > > I've updated the KIP to reflect the latest discussions.
> > >
> > > Tom,
> > > 2) Updated
> > > 4) I decided against switching to a "batch interface" and added the
> > > reasons in the Rejected Alternatives section
> > >
> > > Please take a look and let me know if you have any feedback.
> > > Thanks
> > >
> > > On Tue, Oct 6, 2020 at 9:43 AM Mickael Maison <
> mickael.maison@gmail.com>
> > wrote:
> > > >
> > > > Hi Efe,
> > > >
> > > > Thanks for the feedback.
> > > > We also need to assign replicas when adding partitions to an existing
> > > > topic. This is why I choose to use a list of partition ids. Otherwise
> > > > we'd need the number of partitions and the starting partition id.
> > > >
> > > > Let me know if you have more questions
> > > >
> > > > On Tue, Oct 6, 2020 at 2:16 AM Efe Gencer
> <ag...@linkedin.com.invalid>
> > wrote:
> > > > >
> > > > > Hi Mickael,
> > > > >
> > > > > Thanks for the KIP!
> > > > > A call to an external system, e.g. Cruise Control, in the
> > implementation of the provided interface can indeed help with the initial
> > assignment of partitions.
> > > > >
> > > > > I am curious why the proposed
> > `ReplicaAssignor#assignReplicasToBrokers` receives a list of partition
> ids
> > as opposed to the number of partitions to create the topic with?
> > > > >
> > > > > Would you clarify if this API is expected to be used (1) only for
> > new topics or (2) also for existing topics?
> > > > >
> > > > > Best,
> > > > > Efe
> > > > > ________________________________
> > > > > From: Mickael Maison <mi...@gmail.com>
> > > > > Sent: Thursday, October 1, 2020 9:43 AM
> > > > > To: dev <de...@kafka.apache.org>
> > > > > Subject: Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor
> > > > >
> > > > > Thanks Tom for the feedback!
> > > > >
> > > > > 1. If the data returned by the ReplicaAssignor implementation does
> > not
> > > > > match that was requested, we'll also throw a
> ReplicaAssignorException
> > > > >
> > > > > 2. Good point, I'll update the KIP
> > > > >
> > > > > 3. The KIP mentions an error code associated with
> > > > > ReplicaAssignorException: REPLICA_ASSIGNOR_FAILED
> > > > >
> > > > > 4. (I'm naming your last question 4.) I spent some time looking at
> > it.
> > > > > Initially I wanted to follow the model from the topic policies. But
> > as
> > > > > you said, computing assignments for the whole batch may be more
> > > > > desirable and also avoids incrementally updating the cluster state.
> > > > > The logic in AdminManager is very much centered around doing 1
> topic
> > > > > at a time but as far as I can tell we should be able to update it
> to
> > > > > compute assignments for the whole batch.
> > > > >
> > > > > I'll play a bit more with 4. and I'll update the KIP in the next
> few
> > days
> > > > >
> > > > > On Mon, Sep 21, 2020 at 10:29 AM Tom Bentley <tb...@redhat.com>
> > wrote:
> > > > > >
> > > > > > Hi Mickael,
> > > > > >
> > > > > > A few thoughts about the ReplicaAssignor contract:
> > > > > >
> > > > > > 1. What happens if a ReplicaAssignor impl returns a Map where
> some
> > > > > > assignments don't meet the given replication factor?
> > > > > > 2. Fixing the signature of assignReplicasToBrokers() as you have
> > would make
> > > > > > it hard to pass extra information in the future (e.g. maybe
> > someone comes
> > > > > > up with a use case where passing the clientId would be needed)
> > because it
> > > > > > would require the interface be changed. If you factored all the
> > parameters
> > > > > > into some new type then the signature could be
> > > > > > assignReplicasToBrokers(RequiredReplicaAssignment) and adding any
> > new
> > > > > > properties to RequiredReplicaAssignment wouldn't break the
> > contract.
> > > > > > 3. When an assignor throws RepliacAssignorException what error
> > code will be
> > > > > > returned to the client?
> > > > > >
> > > > > > Also, this sentence got me thinking:
> > > > > >
> > > > > > > If multiple topics are present in the request, AdminManager
> will
> > update
> > > > > > the Cluster object so the ReplicaAssignor class has access to the
> > up to
> > > > > > date cluster metadata.
> > > > > >
> > > > > > Previously I've looked at how we can improve Kafka's pluggable
> > policy
> > > > > > support to pass the more of the cluster state to policy
> > implementations. A
> > > > > > similar problem exists there, but the more cluster state you pass
> > the
> > > > > > harder it is to incrementally change it as you iterate through
> the
> > topics
> > > > > > to be created/modified. This likely isn't a problem here and now,
> > but it
> > > > > > could limit any future changes to the pluggable assignors. Did
> you
> > consider
> > > > > > the alternative of the assignor just being passed a Set of
> > assignments?
> > > > > > That means you can just pass the cluster state as it exists at
> the
> > time. It
> > > > > > also gives the implementation more information to work with to
> > find more
> > > > > > optimal assignments. For example, it could perform a bin packing
> > type
> > > > > > assignment which found a better optimum for the whole collection
> > of topics
> > > > > > than one which was only told about all the topics in the request
> > > > > > sequentially.
> > > > > >
> > > > > > Otherwise this looks like a valuable feature to me.
> > > > > >
> > > > > > Kind regards,
> > > > > >
> > > > > > Tom
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Sep 11, 2020 at 6:19 PM Robert Barrett <
> > bob.barrett@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks Mickael, I think adding the new Exception resolves my
> > concerns.
> > > > > > >
> > > > > > > On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <
> > mickael.maison@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks Robert and Ryanne for the feedback.
> > > > > > > >
> > > > > > > > ReplicaAssignor implementations can throw an exception to
> > indicate an
> > > > > > > > assignment can't be computed. This is already what the
> current
> > round
> > > > > > > > robin assignor does. Unfortunately at the moment, there are
> no
> > generic
> > > > > > > > error codes if it fails, it's either INVALID_PARTITIONS,
> > > > > > > > INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.
> > > > > > > >
> > > > > > > > So I think it would be nice to introduce a new
> Exception/Error
> > code to
> > > > > > > > cover any failures in the assignor and avoid
> > UNKNOWN_SERVER_ERROR.
> > > > > > > >
> > > > > > > > I've updated the KIP accordingly, let me know if you have
> more
> > questions.
> > > > > > > >
> > > > > > > > On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <
> > ryannedolan@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Thanks Mickael, the KIP makes sense to me, esp for cases
> > where an
> > > > > > > > external
> > > > > > > > > system (like cruise control or an operator) knows more
> about
> > the target
> > > > > > > > > cluster state than the broker does.
> > > > > > > > >
> > > > > > > > > Ryanne
> > > > > > > > >
> > > > > > > > > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <
> > > > > > > mickael.maison@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > >
> > > > > > > > > > I've created KIP-660 to make the replica assignment logic
> > pluggable.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > > >
> >
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-660%253A%2BPluggable%2BReplicaAssignor&amp;data=02%7C01%7Cagencer%40linkedin.com%7Ca156bf97031b4100b62d08d866293434%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637371674445085863&amp;sdata=Cz1u3y1M%2BH5dFIx%2BHkQwugN%2FqTH1ugjXaaBhbToCkDM%3D&amp;reserved=0
> > > > > > > > > >
> > > > > > > > > > Please take a look and let me know if you have any
> > feedback.
> > > > > > > > > >
> > > > > > > > > > Thanks
> > > > > > > > > >
> > > > > > > >
> > > > > > >
> >
>

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by David Jacot <dj...@confluent.io>.
Hi Mickael,

Thanks for the KIP. It is an interesting one.

I imagine that custom assignors may use a rather complex model of the
cluster in order
to be able to allocate partitions in smarter ways. For instance, one may
use the distribution
of leaders in the cluster to allocate the new leaders. With the current
interface, that
means computing the distribution based on the Cluster received for every
assignment
request. That could become pretty inefficient in clusters with a large
number of nodes
and/or partitions. That could become even worse if the model is more
complicated.

I wonder if you have thought about this or experienced this with your
prototype?

Have you considered going with an approach à la ClientQuotaCallback where
the plugin
is updated when the Cluster has changed? That would allow to keep an
internal model
ready. Another way would be to use batching as suggested as it would allow
to amortize
the cost of building a model for the current request/user.

I also wonder if using Cluster is a good idea here. With KIP-500, I can
imagine that this
plugin will run in the controller directly instead of running in the
AdminManager as today.
In this case, we could obviously continue to build that Cluster object but
we may have
better ways. Therefore, I wonder if having an interface to represent the
cluster may be
better from an evolution perspective. Have you considered this?

Best,
David

On Mon, Nov 16, 2020 at 12:10 PM Mickael Maison <mi...@gmail.com>
wrote:

> Hi all,
>
> If I don't see additional feedback in the next few days, I'll start a vote.
> Thanks
>
> On Thu, Nov 5, 2020 at 6:29 PM Mickael Maison <mi...@gmail.com>
> wrote:
> >
> > Hi all,
> >
> > I've updated the KIP to reflect the latest discussions.
> >
> > Tom,
> > 2) Updated
> > 4) I decided against switching to a "batch interface" and added the
> > reasons in the Rejected Alternatives section
> >
> > Please take a look and let me know if you have any feedback.
> > Thanks
> >
> > On Tue, Oct 6, 2020 at 9:43 AM Mickael Maison <mi...@gmail.com>
> wrote:
> > >
> > > Hi Efe,
> > >
> > > Thanks for the feedback.
> > > We also need to assign replicas when adding partitions to an existing
> > > topic. This is why I choose to use a list of partition ids. Otherwise
> > > we'd need the number of partitions and the starting partition id.
> > >
> > > Let me know if you have more questions
> > >
> > > On Tue, Oct 6, 2020 at 2:16 AM Efe Gencer <ag...@linkedin.com.invalid>
> wrote:
> > > >
> > > > Hi Mickael,
> > > >
> > > > Thanks for the KIP!
> > > > A call to an external system, e.g. Cruise Control, in the
> implementation of the provided interface can indeed help with the initial
> assignment of partitions.
> > > >
> > > > I am curious why the proposed
> `ReplicaAssignor#assignReplicasToBrokers` receives a list of partition ids
> as opposed to the number of partitions to create the topic with?
> > > >
> > > > Would you clarify if this API is expected to be used (1) only for
> new topics or (2) also for existing topics?
> > > >
> > > > Best,
> > > > Efe
> > > > ________________________________
> > > > From: Mickael Maison <mi...@gmail.com>
> > > > Sent: Thursday, October 1, 2020 9:43 AM
> > > > To: dev <de...@kafka.apache.org>
> > > > Subject: Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor
> > > >
> > > > Thanks Tom for the feedback!
> > > >
> > > > 1. If the data returned by the ReplicaAssignor implementation does
> not
> > > > match that was requested, we'll also throw a ReplicaAssignorException
> > > >
> > > > 2. Good point, I'll update the KIP
> > > >
> > > > 3. The KIP mentions an error code associated with
> > > > ReplicaAssignorException: REPLICA_ASSIGNOR_FAILED
> > > >
> > > > 4. (I'm naming your last question 4.) I spent some time looking at
> it.
> > > > Initially I wanted to follow the model from the topic policies. But
> as
> > > > you said, computing assignments for the whole batch may be more
> > > > desirable and also avoids incrementally updating the cluster state.
> > > > The logic in AdminManager is very much centered around doing 1 topic
> > > > at a time but as far as I can tell we should be able to update it to
> > > > compute assignments for the whole batch.
> > > >
> > > > I'll play a bit more with 4. and I'll update the KIP in the next few
> days
> > > >
> > > > On Mon, Sep 21, 2020 at 10:29 AM Tom Bentley <tb...@redhat.com>
> wrote:
> > > > >
> > > > > Hi Mickael,
> > > > >
> > > > > A few thoughts about the ReplicaAssignor contract:
> > > > >
> > > > > 1. What happens if a ReplicaAssignor impl returns a Map where some
> > > > > assignments don't meet the given replication factor?
> > > > > 2. Fixing the signature of assignReplicasToBrokers() as you have
> would make
> > > > > it hard to pass extra information in the future (e.g. maybe
> someone comes
> > > > > up with a use case where passing the clientId would be needed)
> because it
> > > > > would require the interface be changed. If you factored all the
> parameters
> > > > > into some new type then the signature could be
> > > > > assignReplicasToBrokers(RequiredReplicaAssignment) and adding any
> new
> > > > > properties to RequiredReplicaAssignment wouldn't break the
> contract.
> > > > > 3. When an assignor throws RepliacAssignorException what error
> code will be
> > > > > returned to the client?
> > > > >
> > > > > Also, this sentence got me thinking:
> > > > >
> > > > > > If multiple topics are present in the request, AdminManager will
> update
> > > > > the Cluster object so the ReplicaAssignor class has access to the
> up to
> > > > > date cluster metadata.
> > > > >
> > > > > Previously I've looked at how we can improve Kafka's pluggable
> policy
> > > > > support to pass the more of the cluster state to policy
> implementations. A
> > > > > similar problem exists there, but the more cluster state you pass
> the
> > > > > harder it is to incrementally change it as you iterate through the
> topics
> > > > > to be created/modified. This likely isn't a problem here and now,
> but it
> > > > > could limit any future changes to the pluggable assignors. Did you
> consider
> > > > > the alternative of the assignor just being passed a Set of
> assignments?
> > > > > That means you can just pass the cluster state as it exists at the
> time. It
> > > > > also gives the implementation more information to work with to
> find more
> > > > > optimal assignments. For example, it could perform a bin packing
> type
> > > > > assignment which found a better optimum for the whole collection
> of topics
> > > > > than one which was only told about all the topics in the request
> > > > > sequentially.
> > > > >
> > > > > Otherwise this looks like a valuable feature to me.
> > > > >
> > > > > Kind regards,
> > > > >
> > > > > Tom
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Sep 11, 2020 at 6:19 PM Robert Barrett <
> bob.barrett@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Thanks Mickael, I think adding the new Exception resolves my
> concerns.
> > > > > >
> > > > > > On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <
> mickael.maison@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks Robert and Ryanne for the feedback.
> > > > > > >
> > > > > > > ReplicaAssignor implementations can throw an exception to
> indicate an
> > > > > > > assignment can't be computed. This is already what the current
> round
> > > > > > > robin assignor does. Unfortunately at the moment, there are no
> generic
> > > > > > > error codes if it fails, it's either INVALID_PARTITIONS,
> > > > > > > INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.
> > > > > > >
> > > > > > > So I think it would be nice to introduce a new Exception/Error
> code to
> > > > > > > cover any failures in the assignor and avoid
> UNKNOWN_SERVER_ERROR.
> > > > > > >
> > > > > > > I've updated the KIP accordingly, let me know if you have more
> questions.
> > > > > > >
> > > > > > > On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <
> ryannedolan@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Thanks Mickael, the KIP makes sense to me, esp for cases
> where an
> > > > > > > external
> > > > > > > > system (like cruise control or an operator) knows more about
> the target
> > > > > > > > cluster state than the broker does.
> > > > > > > >
> > > > > > > > Ryanne
> > > > > > > >
> > > > > > > > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <
> > > > > > mickael.maison@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > I've created KIP-660 to make the replica assignment logic
> pluggable.
> > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-660%253A%2BPluggable%2BReplicaAssignor&amp;data=02%7C01%7Cagencer%40linkedin.com%7Ca156bf97031b4100b62d08d866293434%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637371674445085863&amp;sdata=Cz1u3y1M%2BH5dFIx%2BHkQwugN%2FqTH1ugjXaaBhbToCkDM%3D&amp;reserved=0
> > > > > > > > >
> > > > > > > > > Please take a look and let me know if you have any
> feedback.
> > > > > > > > >
> > > > > > > > > Thanks
> > > > > > > > >
> > > > > > >
> > > > > >
>

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Mickael Maison <mi...@gmail.com>.
Hi all,

If I don't see additional feedback in the next few days, I'll start a vote.
Thanks

On Thu, Nov 5, 2020 at 6:29 PM Mickael Maison <mi...@gmail.com> wrote:
>
> Hi all,
>
> I've updated the KIP to reflect the latest discussions.
>
> Tom,
> 2) Updated
> 4) I decided against switching to a "batch interface" and added the
> reasons in the Rejected Alternatives section
>
> Please take a look and let me know if you have any feedback.
> Thanks
>
> On Tue, Oct 6, 2020 at 9:43 AM Mickael Maison <mi...@gmail.com> wrote:
> >
> > Hi Efe,
> >
> > Thanks for the feedback.
> > We also need to assign replicas when adding partitions to an existing
> > topic. This is why I choose to use a list of partition ids. Otherwise
> > we'd need the number of partitions and the starting partition id.
> >
> > Let me know if you have more questions
> >
> > On Tue, Oct 6, 2020 at 2:16 AM Efe Gencer <ag...@linkedin.com.invalid> wrote:
> > >
> > > Hi Mickael,
> > >
> > > Thanks for the KIP!
> > > A call to an external system, e.g. Cruise Control, in the implementation of the provided interface can indeed help with the initial assignment of partitions.
> > >
> > > I am curious why the proposed `ReplicaAssignor#assignReplicasToBrokers` receives a list of partition ids as opposed to the number of partitions to create the topic with?
> > >
> > > Would you clarify if this API is expected to be used (1) only for new topics or (2) also for existing topics?
> > >
> > > Best,
> > > Efe
> > > ________________________________
> > > From: Mickael Maison <mi...@gmail.com>
> > > Sent: Thursday, October 1, 2020 9:43 AM
> > > To: dev <de...@kafka.apache.org>
> > > Subject: Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor
> > >
> > > Thanks Tom for the feedback!
> > >
> > > 1. If the data returned by the ReplicaAssignor implementation does not
> > > match that was requested, we'll also throw a ReplicaAssignorException
> > >
> > > 2. Good point, I'll update the KIP
> > >
> > > 3. The KIP mentions an error code associated with
> > > ReplicaAssignorException: REPLICA_ASSIGNOR_FAILED
> > >
> > > 4. (I'm naming your last question 4.) I spent some time looking at it.
> > > Initially I wanted to follow the model from the topic policies. But as
> > > you said, computing assignments for the whole batch may be more
> > > desirable and also avoids incrementally updating the cluster state.
> > > The logic in AdminManager is very much centered around doing 1 topic
> > > at a time but as far as I can tell we should be able to update it to
> > > compute assignments for the whole batch.
> > >
> > > I'll play a bit more with 4. and I'll update the KIP in the next few days
> > >
> > > On Mon, Sep 21, 2020 at 10:29 AM Tom Bentley <tb...@redhat.com> wrote:
> > > >
> > > > Hi Mickael,
> > > >
> > > > A few thoughts about the ReplicaAssignor contract:
> > > >
> > > > 1. What happens if a ReplicaAssignor impl returns a Map where some
> > > > assignments don't meet the given replication factor?
> > > > 2. Fixing the signature of assignReplicasToBrokers() as you have would make
> > > > it hard to pass extra information in the future (e.g. maybe someone comes
> > > > up with a use case where passing the clientId would be needed) because it
> > > > would require the interface be changed. If you factored all the parameters
> > > > into some new type then the signature could be
> > > > assignReplicasToBrokers(RequiredReplicaAssignment) and adding any new
> > > > properties to RequiredReplicaAssignment wouldn't break the contract.
> > > > 3. When an assignor throws RepliacAssignorException what error code will be
> > > > returned to the client?
> > > >
> > > > Also, this sentence got me thinking:
> > > >
> > > > > If multiple topics are present in the request, AdminManager will update
> > > > the Cluster object so the ReplicaAssignor class has access to the up to
> > > > date cluster metadata.
> > > >
> > > > Previously I've looked at how we can improve Kafka's pluggable policy
> > > > support to pass the more of the cluster state to policy implementations. A
> > > > similar problem exists there, but the more cluster state you pass the
> > > > harder it is to incrementally change it as you iterate through the topics
> > > > to be created/modified. This likely isn't a problem here and now, but it
> > > > could limit any future changes to the pluggable assignors. Did you consider
> > > > the alternative of the assignor just being passed a Set of assignments?
> > > > That means you can just pass the cluster state as it exists at the time. It
> > > > also gives the implementation more information to work with to find more
> > > > optimal assignments. For example, it could perform a bin packing type
> > > > assignment which found a better optimum for the whole collection of topics
> > > > than one which was only told about all the topics in the request
> > > > sequentially.
> > > >
> > > > Otherwise this looks like a valuable feature to me.
> > > >
> > > > Kind regards,
> > > >
> > > > Tom
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Sep 11, 2020 at 6:19 PM Robert Barrett <bo...@confluent.io>
> > > > wrote:
> > > >
> > > > > Thanks Mickael, I think adding the new Exception resolves my concerns.
> > > > >
> > > > > On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <mi...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Thanks Robert and Ryanne for the feedback.
> > > > > >
> > > > > > ReplicaAssignor implementations can throw an exception to indicate an
> > > > > > assignment can't be computed. This is already what the current round
> > > > > > robin assignor does. Unfortunately at the moment, there are no generic
> > > > > > error codes if it fails, it's either INVALID_PARTITIONS,
> > > > > > INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.
> > > > > >
> > > > > > So I think it would be nice to introduce a new Exception/Error code to
> > > > > > cover any failures in the assignor and avoid UNKNOWN_SERVER_ERROR.
> > > > > >
> > > > > > I've updated the KIP accordingly, let me know if you have more questions.
> > > > > >
> > > > > > On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <ry...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > Thanks Mickael, the KIP makes sense to me, esp for cases where an
> > > > > > external
> > > > > > > system (like cruise control or an operator) knows more about the target
> > > > > > > cluster state than the broker does.
> > > > > > >
> > > > > > > Ryanne
> > > > > > >
> > > > > > > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <
> > > > > mickael.maison@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > I've created KIP-660 to make the replica assignment logic pluggable.
> > > > > > > >
> > > > > > > >
> > > > > >
> > > > > https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-660%253A%2BPluggable%2BReplicaAssignor&amp;data=02%7C01%7Cagencer%40linkedin.com%7Ca156bf97031b4100b62d08d866293434%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637371674445085863&amp;sdata=Cz1u3y1M%2BH5dFIx%2BHkQwugN%2FqTH1ugjXaaBhbToCkDM%3D&amp;reserved=0
> > > > > > > >
> > > > > > > > Please take a look and let me know if you have any feedback.
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > >
> > > > > >
> > > > >

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Mickael Maison <mi...@gmail.com>.
Hi all,

I've updated the KIP to reflect the latest discussions.

Tom,
2) Updated
4) I decided against switching to a "batch interface" and added the
reasons in the Rejected Alternatives section

Please take a look and let me know if you have any feedback.
Thanks

On Tue, Oct 6, 2020 at 9:43 AM Mickael Maison <mi...@gmail.com> wrote:
>
> Hi Efe,
>
> Thanks for the feedback.
> We also need to assign replicas when adding partitions to an existing
> topic. This is why I choose to use a list of partition ids. Otherwise
> we'd need the number of partitions and the starting partition id.
>
> Let me know if you have more questions
>
> On Tue, Oct 6, 2020 at 2:16 AM Efe Gencer <ag...@linkedin.com.invalid> wrote:
> >
> > Hi Mickael,
> >
> > Thanks for the KIP!
> > A call to an external system, e.g. Cruise Control, in the implementation of the provided interface can indeed help with the initial assignment of partitions.
> >
> > I am curious why the proposed `ReplicaAssignor#assignReplicasToBrokers` receives a list of partition ids as opposed to the number of partitions to create the topic with?
> >
> > Would you clarify if this API is expected to be used (1) only for new topics or (2) also for existing topics?
> >
> > Best,
> > Efe
> > ________________________________
> > From: Mickael Maison <mi...@gmail.com>
> > Sent: Thursday, October 1, 2020 9:43 AM
> > To: dev <de...@kafka.apache.org>
> > Subject: Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor
> >
> > Thanks Tom for the feedback!
> >
> > 1. If the data returned by the ReplicaAssignor implementation does not
> > match that was requested, we'll also throw a ReplicaAssignorException
> >
> > 2. Good point, I'll update the KIP
> >
> > 3. The KIP mentions an error code associated with
> > ReplicaAssignorException: REPLICA_ASSIGNOR_FAILED
> >
> > 4. (I'm naming your last question 4.) I spent some time looking at it.
> > Initially I wanted to follow the model from the topic policies. But as
> > you said, computing assignments for the whole batch may be more
> > desirable and also avoids incrementally updating the cluster state.
> > The logic in AdminManager is very much centered around doing 1 topic
> > at a time but as far as I can tell we should be able to update it to
> > compute assignments for the whole batch.
> >
> > I'll play a bit more with 4. and I'll update the KIP in the next few days
> >
> > On Mon, Sep 21, 2020 at 10:29 AM Tom Bentley <tb...@redhat.com> wrote:
> > >
> > > Hi Mickael,
> > >
> > > A few thoughts about the ReplicaAssignor contract:
> > >
> > > 1. What happens if a ReplicaAssignor impl returns a Map where some
> > > assignments don't meet the given replication factor?
> > > 2. Fixing the signature of assignReplicasToBrokers() as you have would make
> > > it hard to pass extra information in the future (e.g. maybe someone comes
> > > up with a use case where passing the clientId would be needed) because it
> > > would require the interface be changed. If you factored all the parameters
> > > into some new type then the signature could be
> > > assignReplicasToBrokers(RequiredReplicaAssignment) and adding any new
> > > properties to RequiredReplicaAssignment wouldn't break the contract.
> > > 3. When an assignor throws RepliacAssignorException what error code will be
> > > returned to the client?
> > >
> > > Also, this sentence got me thinking:
> > >
> > > > If multiple topics are present in the request, AdminManager will update
> > > the Cluster object so the ReplicaAssignor class has access to the up to
> > > date cluster metadata.
> > >
> > > Previously I've looked at how we can improve Kafka's pluggable policy
> > > support to pass the more of the cluster state to policy implementations. A
> > > similar problem exists there, but the more cluster state you pass the
> > > harder it is to incrementally change it as you iterate through the topics
> > > to be created/modified. This likely isn't a problem here and now, but it
> > > could limit any future changes to the pluggable assignors. Did you consider
> > > the alternative of the assignor just being passed a Set of assignments?
> > > That means you can just pass the cluster state as it exists at the time. It
> > > also gives the implementation more information to work with to find more
> > > optimal assignments. For example, it could perform a bin packing type
> > > assignment which found a better optimum for the whole collection of topics
> > > than one which was only told about all the topics in the request
> > > sequentially.
> > >
> > > Otherwise this looks like a valuable feature to me.
> > >
> > > Kind regards,
> > >
> > > Tom
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Sep 11, 2020 at 6:19 PM Robert Barrett <bo...@confluent.io>
> > > wrote:
> > >
> > > > Thanks Mickael, I think adding the new Exception resolves my concerns.
> > > >
> > > > On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <mi...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks Robert and Ryanne for the feedback.
> > > > >
> > > > > ReplicaAssignor implementations can throw an exception to indicate an
> > > > > assignment can't be computed. This is already what the current round
> > > > > robin assignor does. Unfortunately at the moment, there are no generic
> > > > > error codes if it fails, it's either INVALID_PARTITIONS,
> > > > > INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.
> > > > >
> > > > > So I think it would be nice to introduce a new Exception/Error code to
> > > > > cover any failures in the assignor and avoid UNKNOWN_SERVER_ERROR.
> > > > >
> > > > > I've updated the KIP accordingly, let me know if you have more questions.
> > > > >
> > > > > On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <ry...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > Thanks Mickael, the KIP makes sense to me, esp for cases where an
> > > > > external
> > > > > > system (like cruise control or an operator) knows more about the target
> > > > > > cluster state than the broker does.
> > > > > >
> > > > > > Ryanne
> > > > > >
> > > > > > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <
> > > > mickael.maison@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I've created KIP-660 to make the replica assignment logic pluggable.
> > > > > > >
> > > > > > >
> > > > >
> > > > https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-660%253A%2BPluggable%2BReplicaAssignor&amp;data=02%7C01%7Cagencer%40linkedin.com%7Ca156bf97031b4100b62d08d866293434%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637371674445085863&amp;sdata=Cz1u3y1M%2BH5dFIx%2BHkQwugN%2FqTH1ugjXaaBhbToCkDM%3D&amp;reserved=0
> > > > > > >
> > > > > > > Please take a look and let me know if you have any feedback.
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > >
> > > >

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Mickael Maison <mi...@gmail.com>.
Hi Efe,

Thanks for the feedback.
We also need to assign replicas when adding partitions to an existing
topic. This is why I choose to use a list of partition ids. Otherwise
we'd need the number of partitions and the starting partition id.

Let me know if you have more questions

On Tue, Oct 6, 2020 at 2:16 AM Efe Gencer <ag...@linkedin.com.invalid> wrote:
>
> Hi Mickael,
>
> Thanks for the KIP!
> A call to an external system, e.g. Cruise Control, in the implementation of the provided interface can indeed help with the initial assignment of partitions.
>
> I am curious why the proposed `ReplicaAssignor#assignReplicasToBrokers` receives a list of partition ids as opposed to the number of partitions to create the topic with?
>
> Would you clarify if this API is expected to be used (1) only for new topics or (2) also for existing topics?
>
> Best,
> Efe
> ________________________________
> From: Mickael Maison <mi...@gmail.com>
> Sent: Thursday, October 1, 2020 9:43 AM
> To: dev <de...@kafka.apache.org>
> Subject: Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor
>
> Thanks Tom for the feedback!
>
> 1. If the data returned by the ReplicaAssignor implementation does not
> match that was requested, we'll also throw a ReplicaAssignorException
>
> 2. Good point, I'll update the KIP
>
> 3. The KIP mentions an error code associated with
> ReplicaAssignorException: REPLICA_ASSIGNOR_FAILED
>
> 4. (I'm naming your last question 4.) I spent some time looking at it.
> Initially I wanted to follow the model from the topic policies. But as
> you said, computing assignments for the whole batch may be more
> desirable and also avoids incrementally updating the cluster state.
> The logic in AdminManager is very much centered around doing 1 topic
> at a time but as far as I can tell we should be able to update it to
> compute assignments for the whole batch.
>
> I'll play a bit more with 4. and I'll update the KIP in the next few days
>
> On Mon, Sep 21, 2020 at 10:29 AM Tom Bentley <tb...@redhat.com> wrote:
> >
> > Hi Mickael,
> >
> > A few thoughts about the ReplicaAssignor contract:
> >
> > 1. What happens if a ReplicaAssignor impl returns a Map where some
> > assignments don't meet the given replication factor?
> > 2. Fixing the signature of assignReplicasToBrokers() as you have would make
> > it hard to pass extra information in the future (e.g. maybe someone comes
> > up with a use case where passing the clientId would be needed) because it
> > would require the interface be changed. If you factored all the parameters
> > into some new type then the signature could be
> > assignReplicasToBrokers(RequiredReplicaAssignment) and adding any new
> > properties to RequiredReplicaAssignment wouldn't break the contract.
> > 3. When an assignor throws RepliacAssignorException what error code will be
> > returned to the client?
> >
> > Also, this sentence got me thinking:
> >
> > > If multiple topics are present in the request, AdminManager will update
> > the Cluster object so the ReplicaAssignor class has access to the up to
> > date cluster metadata.
> >
> > Previously I've looked at how we can improve Kafka's pluggable policy
> > support to pass the more of the cluster state to policy implementations. A
> > similar problem exists there, but the more cluster state you pass the
> > harder it is to incrementally change it as you iterate through the topics
> > to be created/modified. This likely isn't a problem here and now, but it
> > could limit any future changes to the pluggable assignors. Did you consider
> > the alternative of the assignor just being passed a Set of assignments?
> > That means you can just pass the cluster state as it exists at the time. It
> > also gives the implementation more information to work with to find more
> > optimal assignments. For example, it could perform a bin packing type
> > assignment which found a better optimum for the whole collection of topics
> > than one which was only told about all the topics in the request
> > sequentially.
> >
> > Otherwise this looks like a valuable feature to me.
> >
> > Kind regards,
> >
> > Tom
> >
> >
> >
> >
> >
> > On Fri, Sep 11, 2020 at 6:19 PM Robert Barrett <bo...@confluent.io>
> > wrote:
> >
> > > Thanks Mickael, I think adding the new Exception resolves my concerns.
> > >
> > > On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <mi...@gmail.com>
> > > wrote:
> > >
> > > > Thanks Robert and Ryanne for the feedback.
> > > >
> > > > ReplicaAssignor implementations can throw an exception to indicate an
> > > > assignment can't be computed. This is already what the current round
> > > > robin assignor does. Unfortunately at the moment, there are no generic
> > > > error codes if it fails, it's either INVALID_PARTITIONS,
> > > > INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.
> > > >
> > > > So I think it would be nice to introduce a new Exception/Error code to
> > > > cover any failures in the assignor and avoid UNKNOWN_SERVER_ERROR.
> > > >
> > > > I've updated the KIP accordingly, let me know if you have more questions.
> > > >
> > > > On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <ry...@gmail.com>
> > > > wrote:
> > > > >
> > > > > Thanks Mickael, the KIP makes sense to me, esp for cases where an
> > > > external
> > > > > system (like cruise control or an operator) knows more about the target
> > > > > cluster state than the broker does.
> > > > >
> > > > > Ryanne
> > > > >
> > > > > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <
> > > mickael.maison@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I've created KIP-660 to make the replica assignment logic pluggable.
> > > > > >
> > > > > >
> > > >
> > > https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-660%253A%2BPluggable%2BReplicaAssignor&amp;data=02%7C01%7Cagencer%40linkedin.com%7Ca156bf97031b4100b62d08d866293434%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637371674445085863&amp;sdata=Cz1u3y1M%2BH5dFIx%2BHkQwugN%2FqTH1ugjXaaBhbToCkDM%3D&amp;reserved=0
> > > > > >
> > > > > > Please take a look and let me know if you have any feedback.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > >
> > >

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Efe Gencer <ag...@linkedin.com.INVALID>.
Hi Mickael,

Thanks for the KIP!
A call to an external system, e.g. Cruise Control, in the implementation of the provided interface can indeed help with the initial assignment of partitions.

I am curious why the proposed `ReplicaAssignor#assignReplicasToBrokers` receives a list of partition ids as opposed to the number of partitions to create the topic with?

Would you clarify if this API is expected to be used (1) only for new topics or (2) also for existing topics?

Best,
Efe
________________________________
From: Mickael Maison <mi...@gmail.com>
Sent: Thursday, October 1, 2020 9:43 AM
To: dev <de...@kafka.apache.org>
Subject: Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Thanks Tom for the feedback!

1. If the data returned by the ReplicaAssignor implementation does not
match that was requested, we'll also throw a ReplicaAssignorException

2. Good point, I'll update the KIP

3. The KIP mentions an error code associated with
ReplicaAssignorException: REPLICA_ASSIGNOR_FAILED

4. (I'm naming your last question 4.) I spent some time looking at it.
Initially I wanted to follow the model from the topic policies. But as
you said, computing assignments for the whole batch may be more
desirable and also avoids incrementally updating the cluster state.
The logic in AdminManager is very much centered around doing 1 topic
at a time but as far as I can tell we should be able to update it to
compute assignments for the whole batch.

I'll play a bit more with 4. and I'll update the KIP in the next few days

On Mon, Sep 21, 2020 at 10:29 AM Tom Bentley <tb...@redhat.com> wrote:
>
> Hi Mickael,
>
> A few thoughts about the ReplicaAssignor contract:
>
> 1. What happens if a ReplicaAssignor impl returns a Map where some
> assignments don't meet the given replication factor?
> 2. Fixing the signature of assignReplicasToBrokers() as you have would make
> it hard to pass extra information in the future (e.g. maybe someone comes
> up with a use case where passing the clientId would be needed) because it
> would require the interface be changed. If you factored all the parameters
> into some new type then the signature could be
> assignReplicasToBrokers(RequiredReplicaAssignment) and adding any new
> properties to RequiredReplicaAssignment wouldn't break the contract.
> 3. When an assignor throws RepliacAssignorException what error code will be
> returned to the client?
>
> Also, this sentence got me thinking:
>
> > If multiple topics are present in the request, AdminManager will update
> the Cluster object so the ReplicaAssignor class has access to the up to
> date cluster metadata.
>
> Previously I've looked at how we can improve Kafka's pluggable policy
> support to pass the more of the cluster state to policy implementations. A
> similar problem exists there, but the more cluster state you pass the
> harder it is to incrementally change it as you iterate through the topics
> to be created/modified. This likely isn't a problem here and now, but it
> could limit any future changes to the pluggable assignors. Did you consider
> the alternative of the assignor just being passed a Set of assignments?
> That means you can just pass the cluster state as it exists at the time. It
> also gives the implementation more information to work with to find more
> optimal assignments. For example, it could perform a bin packing type
> assignment which found a better optimum for the whole collection of topics
> than one which was only told about all the topics in the request
> sequentially.
>
> Otherwise this looks like a valuable feature to me.
>
> Kind regards,
>
> Tom
>
>
>
>
>
> On Fri, Sep 11, 2020 at 6:19 PM Robert Barrett <bo...@confluent.io>
> wrote:
>
> > Thanks Mickael, I think adding the new Exception resolves my concerns.
> >
> > On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <mi...@gmail.com>
> > wrote:
> >
> > > Thanks Robert and Ryanne for the feedback.
> > >
> > > ReplicaAssignor implementations can throw an exception to indicate an
> > > assignment can't be computed. This is already what the current round
> > > robin assignor does. Unfortunately at the moment, there are no generic
> > > error codes if it fails, it's either INVALID_PARTITIONS,
> > > INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.
> > >
> > > So I think it would be nice to introduce a new Exception/Error code to
> > > cover any failures in the assignor and avoid UNKNOWN_SERVER_ERROR.
> > >
> > > I've updated the KIP accordingly, let me know if you have more questions.
> > >
> > > On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <ry...@gmail.com>
> > > wrote:
> > > >
> > > > Thanks Mickael, the KIP makes sense to me, esp for cases where an
> > > external
> > > > system (like cruise control or an operator) knows more about the target
> > > > cluster state than the broker does.
> > > >
> > > > Ryanne
> > > >
> > > > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <
> > mickael.maison@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I've created KIP-660 to make the replica assignment logic pluggable.
> > > > >
> > > > >
> > >
> > https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-660%253A%2BPluggable%2BReplicaAssignor&amp;data=02%7C01%7Cagencer%40linkedin.com%7Ca156bf97031b4100b62d08d866293434%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637371674445085863&amp;sdata=Cz1u3y1M%2BH5dFIx%2BHkQwugN%2FqTH1ugjXaaBhbToCkDM%3D&amp;reserved=0
> > > > >
> > > > > Please take a look and let me know if you have any feedback.
> > > > >
> > > > > Thanks
> > > > >
> > >
> >

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Mickael Maison <mi...@gmail.com>.
Thanks Tom for the feedback!

1. If the data returned by the ReplicaAssignor implementation does not
match that was requested, we'll also throw a ReplicaAssignorException

2. Good point, I'll update the KIP

3. The KIP mentions an error code associated with
ReplicaAssignorException: REPLICA_ASSIGNOR_FAILED

4. (I'm naming your last question 4.) I spent some time looking at it.
Initially I wanted to follow the model from the topic policies. But as
you said, computing assignments for the whole batch may be more
desirable and also avoids incrementally updating the cluster state.
The logic in AdminManager is very much centered around doing 1 topic
at a time but as far as I can tell we should be able to update it to
compute assignments for the whole batch.

I'll play a bit more with 4. and I'll update the KIP in the next few days

On Mon, Sep 21, 2020 at 10:29 AM Tom Bentley <tb...@redhat.com> wrote:
>
> Hi Mickael,
>
> A few thoughts about the ReplicaAssignor contract:
>
> 1. What happens if a ReplicaAssignor impl returns a Map where some
> assignments don't meet the given replication factor?
> 2. Fixing the signature of assignReplicasToBrokers() as you have would make
> it hard to pass extra information in the future (e.g. maybe someone comes
> up with a use case where passing the clientId would be needed) because it
> would require the interface be changed. If you factored all the parameters
> into some new type then the signature could be
> assignReplicasToBrokers(RequiredReplicaAssignment) and adding any new
> properties to RequiredReplicaAssignment wouldn't break the contract.
> 3. When an assignor throws RepliacAssignorException what error code will be
> returned to the client?
>
> Also, this sentence got me thinking:
>
> > If multiple topics are present in the request, AdminManager will update
> the Cluster object so the ReplicaAssignor class has access to the up to
> date cluster metadata.
>
> Previously I've looked at how we can improve Kafka's pluggable policy
> support to pass the more of the cluster state to policy implementations. A
> similar problem exists there, but the more cluster state you pass the
> harder it is to incrementally change it as you iterate through the topics
> to be created/modified. This likely isn't a problem here and now, but it
> could limit any future changes to the pluggable assignors. Did you consider
> the alternative of the assignor just being passed a Set of assignments?
> That means you can just pass the cluster state as it exists at the time. It
> also gives the implementation more information to work with to find more
> optimal assignments. For example, it could perform a bin packing type
> assignment which found a better optimum for the whole collection of topics
> than one which was only told about all the topics in the request
> sequentially.
>
> Otherwise this looks like a valuable feature to me.
>
> Kind regards,
>
> Tom
>
>
>
>
>
> On Fri, Sep 11, 2020 at 6:19 PM Robert Barrett <bo...@confluent.io>
> wrote:
>
> > Thanks Mickael, I think adding the new Exception resolves my concerns.
> >
> > On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <mi...@gmail.com>
> > wrote:
> >
> > > Thanks Robert and Ryanne for the feedback.
> > >
> > > ReplicaAssignor implementations can throw an exception to indicate an
> > > assignment can't be computed. This is already what the current round
> > > robin assignor does. Unfortunately at the moment, there are no generic
> > > error codes if it fails, it's either INVALID_PARTITIONS,
> > > INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.
> > >
> > > So I think it would be nice to introduce a new Exception/Error code to
> > > cover any failures in the assignor and avoid UNKNOWN_SERVER_ERROR.
> > >
> > > I've updated the KIP accordingly, let me know if you have more questions.
> > >
> > > On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <ry...@gmail.com>
> > > wrote:
> > > >
> > > > Thanks Mickael, the KIP makes sense to me, esp for cases where an
> > > external
> > > > system (like cruise control or an operator) knows more about the target
> > > > cluster state than the broker does.
> > > >
> > > > Ryanne
> > > >
> > > > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <
> > mickael.maison@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I've created KIP-660 to make the replica assignment logic pluggable.
> > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-660%3A+Pluggable+ReplicaAssignor
> > > > >
> > > > > Please take a look and let me know if you have any feedback.
> > > > >
> > > > > Thanks
> > > > >
> > >
> >

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Tom Bentley <tb...@redhat.com>.
Hi Mickael,

A few thoughts about the ReplicaAssignor contract:

1. What happens if a ReplicaAssignor impl returns a Map where some
assignments don't meet the given replication factor?
2. Fixing the signature of assignReplicasToBrokers() as you have would make
it hard to pass extra information in the future (e.g. maybe someone comes
up with a use case where passing the clientId would be needed) because it
would require the interface be changed. If you factored all the parameters
into some new type then the signature could be
assignReplicasToBrokers(RequiredReplicaAssignment) and adding any new
properties to RequiredReplicaAssignment wouldn't break the contract.
3. When an assignor throws RepliacAssignorException what error code will be
returned to the client?

Also, this sentence got me thinking:

> If multiple topics are present in the request, AdminManager will update
the Cluster object so the ReplicaAssignor class has access to the up to
date cluster metadata.

Previously I've looked at how we can improve Kafka's pluggable policy
support to pass the more of the cluster state to policy implementations. A
similar problem exists there, but the more cluster state you pass the
harder it is to incrementally change it as you iterate through the topics
to be created/modified. This likely isn't a problem here and now, but it
could limit any future changes to the pluggable assignors. Did you consider
the alternative of the assignor just being passed a Set of assignments?
That means you can just pass the cluster state as it exists at the time. It
also gives the implementation more information to work with to find more
optimal assignments. For example, it could perform a bin packing type
assignment which found a better optimum for the whole collection of topics
than one which was only told about all the topics in the request
sequentially.

Otherwise this looks like a valuable feature to me.

Kind regards,

Tom





On Fri, Sep 11, 2020 at 6:19 PM Robert Barrett <bo...@confluent.io>
wrote:

> Thanks Mickael, I think adding the new Exception resolves my concerns.
>
> On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <mi...@gmail.com>
> wrote:
>
> > Thanks Robert and Ryanne for the feedback.
> >
> > ReplicaAssignor implementations can throw an exception to indicate an
> > assignment can't be computed. This is already what the current round
> > robin assignor does. Unfortunately at the moment, there are no generic
> > error codes if it fails, it's either INVALID_PARTITIONS,
> > INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.
> >
> > So I think it would be nice to introduce a new Exception/Error code to
> > cover any failures in the assignor and avoid UNKNOWN_SERVER_ERROR.
> >
> > I've updated the KIP accordingly, let me know if you have more questions.
> >
> > On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <ry...@gmail.com>
> > wrote:
> > >
> > > Thanks Mickael, the KIP makes sense to me, esp for cases where an
> > external
> > > system (like cruise control or an operator) knows more about the target
> > > cluster state than the broker does.
> > >
> > > Ryanne
> > >
> > > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <
> mickael.maison@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I've created KIP-660 to make the replica assignment logic pluggable.
> > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-660%3A+Pluggable+ReplicaAssignor
> > > >
> > > > Please take a look and let me know if you have any feedback.
> > > >
> > > > Thanks
> > > >
> >
>

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Robert Barrett <bo...@confluent.io>.
Thanks Mickael, I think adding the new Exception resolves my concerns.

On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison <mi...@gmail.com>
wrote:

> Thanks Robert and Ryanne for the feedback.
>
> ReplicaAssignor implementations can throw an exception to indicate an
> assignment can't be computed. This is already what the current round
> robin assignor does. Unfortunately at the moment, there are no generic
> error codes if it fails, it's either INVALID_PARTITIONS,
> INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.
>
> So I think it would be nice to introduce a new Exception/Error code to
> cover any failures in the assignor and avoid UNKNOWN_SERVER_ERROR.
>
> I've updated the KIP accordingly, let me know if you have more questions.
>
> On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <ry...@gmail.com>
> wrote:
> >
> > Thanks Mickael, the KIP makes sense to me, esp for cases where an
> external
> > system (like cruise control or an operator) knows more about the target
> > cluster state than the broker does.
> >
> > Ryanne
> >
> > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <mi...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I've created KIP-660 to make the replica assignment logic pluggable.
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-660%3A+Pluggable+ReplicaAssignor
> > >
> > > Please take a look and let me know if you have any feedback.
> > >
> > > Thanks
> > >
>

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Mickael Maison <mi...@gmail.com>.
Thanks Robert and Ryanne for the feedback.

ReplicaAssignor implementations can throw an exception to indicate an
assignment can't be computed. This is already what the current round
robin assignor does. Unfortunately at the moment, there are no generic
error codes if it fails, it's either INVALID_PARTITIONS,
INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.

So I think it would be nice to introduce a new Exception/Error code to
cover any failures in the assignor and avoid UNKNOWN_SERVER_ERROR.

I've updated the KIP accordingly, let me know if you have more questions.

On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan <ry...@gmail.com> wrote:
>
> Thanks Mickael, the KIP makes sense to me, esp for cases where an external
> system (like cruise control or an operator) knows more about the target
> cluster state than the broker does.
>
> Ryanne
>
> On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <mi...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I've created KIP-660 to make the replica assignment logic pluggable.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-660%3A+Pluggable+ReplicaAssignor
> >
> > Please take a look and let me know if you have any feedback.
> >
> > Thanks
> >

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Ryanne Dolan <ry...@gmail.com>.
Thanks Mickael, the KIP makes sense to me, esp for cases where an external
system (like cruise control or an operator) knows more about the target
cluster state than the broker does.

Ryanne

On Thu, Aug 20, 2020, 10:46 AM Mickael Maison <mi...@gmail.com>
wrote:

> Hi,
>
> I've created KIP-660 to make the replica assignment logic pluggable.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-660%3A+Pluggable+ReplicaAssignor
>
> Please take a look and let me know if you have any feedback.
>
> Thanks
>

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

Posted by Robert Barrett <bo...@confluent.io>.
Hi Mickael,

Thanks for the KIP!

One question I have is around failure cases. Are ReplicaAssignor
implementations expected to always compute an assignment, or is it possible
for them to have unsatisfiable conditions? One example I can think of is a
requirement that at least one partition be placed in a specific rack, but
no brokers in that rack are currently online. If it is possible for the
assignor to fail, we should specify what that looks like (exception type,
how it's handled, etc). If not, we should call out that the implementations
are expected to always compute an assignment.

Thanks,
Bob

On Thu, Aug 20, 2020 at 8:46 AM Mickael Maison <mi...@gmail.com>
wrote:

> Hi,
>
> I've created KIP-660 to make the replica assignment logic pluggable.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-660%3A+Pluggable+ReplicaAssignor
>
> Please take a look and let me know if you have any feedback.
>
> Thanks
>