You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Sophie Blee-Goldman <so...@confluent.io> on 2020/02/08 02:37:59 UTC

[DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Hi all,

In light of some recent and upcoming rebalancing and availability
improvements, it seems we have a need for explicitly triggering a consumer
group rebalance. Therefore I'd like to propose adding a new rejoinGroup()method
to the Consumer client (better method name suggestions are very welcome).

Please take a look at the KIP and let me know what you think!

KIP document:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer

JIRA: https://issues.apache.org/jira/browse/KAFKA-9525

Cheers,
Sophie

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks Sophie, I think for KIP-441 the streams client would not need to
rely on the returned boolean flag so I was a bit confused before. Now I
understand it is for other use cases where users do not (or cannot) rely on
the returned assignment to decide if a rebalance is still requested so
checking on the flag is fine.

With that in mind the KIP LGTM.

Guozhang

On Wed, Feb 12, 2020 at 5:56 PM Sophie Blee-Goldman <so...@confluent.io>
wrote:

> Ok, I think I see what you're getting at.
>
> No, we obviously would only want to trigger one additional rebalance after
> the current one completes since the next rebalance would of course have
> the metadata updates we want. But if enforceRebalance returns false a
> second time, we don't know whether it was due to a new rebalance or just
> the original rebalance taking a while. Even if we could, it's possible a
> new
> update came along at some point and it would get quite messy to keep track
> of when we do/don't need to retry.
>
> Except, of course, by checking the resulting assignment to see if it
> reflects
> the latest metadata that we think it should. Which can simply be done in
> the assignor#onAssignment method, since as pointed out in the javadocs
> usage of this API implies a custom assignor implementation.
>
> If we have to check the assignment anyway, I suppose there's no reason to
> return anything. My one outstanding concern is that some apps might not
> actually be able to detect whether the appropriate/latest metadata was used
> based on the resulting assignment. Obviously the assign algorithm is known
> to all members through their own assignor, but the cluster-wide metadata is
> not. I'm not quite convinced that all possible assignments would be as
> straightforward for each member to validate as in, for example, KIP-441
> where
> we just look for the active task.
>
> Perhaps my imagination is running wild here but I'm inclined to still
> return
> whether a new rebalance was triggered or not. It can always be ignored
>
> On Wed, Feb 12, 2020 at 5:18 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Sophie,
> >
> > So just to clarify, with the updated API we would keep calling
> > enforceRebalance until it returns true for cases where we rely on it with
> > new subscription metadata?
> >
> > Guozhang
> >
> > On Wed, Feb 12, 2020 at 5:14 PM Sophie Blee-Goldman <sophie@confluent.io
> >
> > wrote:
> >
> > > Thanks Boyang -- makes sense to me. I've optimistically updated the KIP
> > > with this new signature and behavior.
> > >
> > >
> > > On Wed, Feb 12, 2020 at 4:27 PM Boyang Chen <
> reluctanthero104@gmail.com>
> > > wrote:
> > >
> > > > Hey Sophie,
> > > >
> > > > I'm satisfied with making enforceRebalance() not throwing any
> exception
> > > > other than illegal state. You could imagine this KIP is just making
> the
> > > > `rejoinNeededOrPending` external to user requests. Make it as
> > lightweight
> > > > as possible makes sense.
> > > >
> > > > Boyang
> > > >
> > > > On Wed, Feb 12, 2020 at 2:14 PM Sophie Blee-Goldman <
> > sophie@confluent.io
> > > >
> > > > wrote:
> > > >
> > > > > Hey Guozhang, thanks for the thorough reply!
> > > > >
> > > > > I definitely went back and forth on whether to make it a blocking
> > call,
> > > > > and ultimately went with blocking just to leave it open to
> potential
> > > > future
> > > > > use cases (in particular non-Streams apps). But on second (or
> third)
> > > > > thought I think I agree that no use case wouldn't be similarly
> > covered
> > > by
> > > > > just calling poll() immediately after enforceRebalance(). It seems
> > best
> > > > to
> > > > > leave all rebalancing action within the scope of poll alone and
> avoid
> > > > > introducing unnecessary complexity -- happy to revert this then.
> > > > >
> > > > > I think that ends up addressing most of your other concerns,
> although
> > > > > there's one I would push back on: this method should still
> explicitly
> > > > > call out whether a rebalance is already in progress and the call is
> > > thus
> > > > > a no-op. If throwing a RebalanceInProgressException seems too
> > > > > heavy maybe we can just return a boolean indicating whether a new
> > > > > rebalance was triggered or not.
> > > > >
> > > > > The snippet you included does work around this, by checking the
> > > > > condition again in the rebalance listener. But I would argue that
> a)
> > > many
> > > > > applications don't use a rebalance listener, and shouldn't be
> forced
> > to
> > > > > implement it to fully use this new API. More importantly, since you
> > can
> > > > > probably use the assignor's onAssignment method to achieve the same
> > > > > thing, b) it adds unnecessary complexity, and as we've seen in
> > Streams
> > > > > the interactions between the rebalance callbacks and main consumer
> > > > > can already get quite ugly.
> > > > >
> > > > > For simplicity's sake then, I'll propose to just return the bool
> over
> > > the
> > > > > exception and change the signature to
> > > > >
> > > > > /**
> > > > >  * @return Whether a new rebalance was triggered (false if a
> > rebalance
> > > > was
> > > > > already in progress)
> > > > >  * @throws java.lang.IllegalStateException if the consumer does not
> > use
> > > > > group subscription
> > > > >  */
> > > > > boolean enforceRebalance();
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > On Tue, Feb 11, 2020 at 5:29 PM Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hello Sophie, thanks for brining up this KIP, and the great
> > write-up
> > > > > > summarizing the motivations of the proposal. Here are some
> > comments:
> > > > > >
> > > > > > Minor:
> > > > > >
> > > > > > 1. If we want to make it a blocking call (I have some thoughts
> > about
> > > > this
> > > > > > below :), to be consistent we need to consider having two
> > overloaded
> > > > > > function, one without the timeout which then relies on `
> > > > > > DEFAULT_API_TIMEOUT_MS_CONFIG`.
> > > > > >
> > > > > > 2. Also I'd suggest that, again for API consistency, we a) throw
> > > > > > TimeoutException if the operation cannot be completed within the
> > > > timeout
> > > > > > value, b) return false immediately if we cannot trigger a
> rebalance
> > > > > either
> > > > > > because coordinator is unknown.
> > > > > >
> > > > > > Meta:
> > > > > >
> > > > > > 3. I'm not sure if we have a concrete scenario that we want to
> wait
> > > > until
> > > > > > the rebalance is completed in KIP-441 / 268, rather than calling
> > > > > > "consumer.enforceRebalance(); consumer.poll()" consecutively and
> > try
> > > to
> > > > > > execute the rebalance in the poll call? If there's no valid
> > > motivations
> > > > > I'm
> > > > > > still a bit inclined to make it non-blocking (i.e. just setting a
> > bit
> > > > and
> > > > > > then execute the process in the later poll call) similar to our
> > > `seek`
> > > > > > functions. By doing this we can also make this function simpler
> as
> > it
> > > > > would
> > > > > > never throw RebalanceInProgress or Timeout or even
> KafkaExceptions.
> > > > > >
> > > > > > 4. Re: the case "when a rebalance is already in progress", this
> may
> > > be
> > > > > > related to 3) above. I think we can simplify this case as well
> but
> > > just
> > > > > not
> > > > > > triggering a new rebalance and let the the caller handle it: for
> > > > example
> > > > > in
> > > > > > KIP-441, in each iteration of the stream thread, we can if a
> > standby
> > > > task
> > > > > > is ready, and if yes we call `enforceRebalance`, if there's
> > already a
> > > > > > rebalance in progress (either with the new subscription metadata,
> > or
> > > > not)
> > > > > > this call would be a no-op, and then in the next iteration we
> would
> > > > just
> > > > > > call that function again, and eventually we would trigger the
> > > rebalance
> > > > > > with the new subscription metadata and previous calls would be
> > no-op
> > > > and
> > > > > > hence no cost anyways. I feel this would be simpler than letting
> > the
> > > > > caller
> > > > > > to capture RebalanceInProgressException:
> > > > > >
> > > > > >
> > > > > > mainProcessingLoop() {
> > > > > >     if (needsRebalance) {
> > > > > >         consumer.enforceRebalance();
> > > > > >     }
> > > > > >
> > > > > >     records = consumer.poll();
> > > > > >     ...
> > > > > >     // do some processing
> > > > > > }
> > > > > >
> > > > > > RebalanceListener {
> > > > > >
> > > > > >    onPartitionsAssigned(...) {
> > > > > >       if (rebalanceGoalAchieved()) {
> > > > > >         needsRebalance = false;
> > > > > >       }
> > > > > >     }
> > > > > > }
> > > > > >
> > > > > >
> > > > > > WDYT?
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Feb 11, 2020 at 3:59 PM Sophie Blee-Goldman <
> > > > sophie@confluent.io
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hey Boyang,
> > > > > > >
> > > > > > > Originally I had it as a nonblocking call, but decided to
> change
> > it
> > > > to
> > > > > > > blocking
> > > > > > > with a timeout parameter. I'm not sure a future makes sense to
> > > return
> > > > > > here,
> > > > > > > because the rebalance either does or does not complete within
> the
> > > > > > timeout:
> > > > > > > if it does not, you will have to call poll again to complete it
> > (as
> > > > is
> > > > > > the
> > > > > > > case with
> > > > > > > any other rebalance). I'll call this out in the javadocs as
> well.
> > > > > > >
> > > > > > > I also added an example demonstrating how/when to use this new
> > API.
> > > > > > >
> > > > > > > Thanks!
> > > > > > >
> > > > > > > On Tue, Feb 11, 2020 at 1:49 PM Boyang Chen <
> > > > > reluctanthero104@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey Sophie,
> > > > > > > >
> > > > > > > > is the `enforceRebalance` a blocking call? Could we add a
> code
> > > > sample
> > > > > > to
> > > > > > > > the KIP on how this API should be used?
> > > > > > > >
> > > > > > > > Returning a future instead of a boolean might be easier as we
> > are
> > > > > > > allowing
> > > > > > > > consumer to make progress during rebalance after 429 IMHO.
> > > > > > > >
> > > > > > > > Boyang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Feb 11, 2020 at 1:17 PM Konstantine Karantasis <
> > > > > > > > konstantine@confluent.io> wrote:
> > > > > > > >
> > > > > > > > > Thanks for the quick turnaround Sophie. My points have been
> > > > > > addressed.
> > > > > > > > > I think the intended use is quite clear now.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Konstantine
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Feb 11, 2020 at 12:57 PM Sophie Blee-Goldman <
> > > > > > > > sophie@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Konstantine,
> > > > > > > > > > Thanks for the feedback! I've updated the sections with
> > your
> > > > > > > > > suggestions. I
> > > > > > > > > > agree
> > > > > > > > > > in particular that it's really important to make sure
> users
> > > > don't
> > > > > > > call
> > > > > > > > > this
> > > > > > > > > > unnecessarily,
> > > > > > > > > >  or for the wrong reasons: to that end I also extended
> the
> > > > > javadocs
> > > > > > > to
> > > > > > > > > > specify that this
> > > > > > > > > > API is for when changes to the subscription userdata
> occur.
> > > > > > Hopefully
> > > > > > > > > that
> > > > > > > > > > should make
> > > > > > > > > > its intended usage quite clear.
> > > > > > > > > >
> > > > > > > > > > Bill,
> > > > > > > > > > The rebalance triggered by this new API will be a
> "normal"
> > > > > > rebalance,
> > > > > > > > and
> > > > > > > > > > therefore
> > > > > > > > > > follow the existing listener semantics. For example a
> > > > cooperative
> > > > > > > > > rebalance
> > > > > > > > > > will always
> > > > > > > > > > call onPartitionsAssigned, even if no partitions are
> > actually
> > > > > > moved.
> > > > > > > > > > An eager rebalance will still revoke all partitions first
> > > > anyway.
> > > > > > > > > >
> > > > > > > > > > Thanks for the feedback!
> > > > > > > > > > Sophie
> > > > > > > > > >
> > > > > > > > > > On Tue, Feb 11, 2020 at 9:52 AM Bill Bejeck <
> > > bbejeck@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Sophie,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the KIP, makes sense to me.
> > > > > > > > > > >
> > > > > > > > > > > One quick question, I'm not sure if it's relevant or
> not.
> > > > > > > > > > >
> > > > > > > > > > > If a user provides a `ConsumerRebalanceListener` and a
> > > > > rebalance
> > > > > > is
> > > > > > > > > > > triggered from an `enforceRebalance`  call,
> > > > > > > > > > > it seems possible the listener won't get called since
> > > > partition
> > > > > > > > > > assignments
> > > > > > > > > > > might not change.
> > > > > > > > > > > If that is the case, do we want to possibly consider
> > > adding a
> > > > > > > method
> > > > > > > > to
> > > > > > > > > > the
> > > > > > > > > > > `ConsumerRebalanceListener` for callbacks on
> > > > `enforceRebalance`
> > > > > > > > > actions?
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Bill
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Feb 11, 2020 at 12:11 PM Konstantine
> Karantasis <
> > > > > > > > > > > konstantine@confluent.io> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Sophie.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the KIP. I liked how focused the proposal
> > is.
> > > > > Also,
> > > > > > > its
> > > > > > > > > > > > motivation is clear after carefully reading the KIP
> and
> > > its
> > > > > > > > > references.
> > > > > > > > > > > >
> > > > > > > > > > > > Yet, I think it'd be a good idea to call out
> explicitly
> > > on
> > > > > the
> > > > > > > > > Rejected
> > > > > > > > > > > > Alternatives section that an automatic and periodic
> > > > > triggering
> > > > > > of
> > > > > > > > > > > > rebalances that would not require exposing this
> > > capability
> > > > > > > through
> > > > > > > > > the
> > > > > > > > > > > > Consumer interface does not cover your specific use
> > cases
> > > > and
> > > > > > > > > therefore
> > > > > > > > > > > is
> > > > > > > > > > > > not chosen as a desired approach. Maybe, even
> consider
> > > > > > mentioning
> > > > > > > > > again
> > > > > > > > > > > > here that this method is expected to be used to
> respond
> > > to
> > > > > > system
> > > > > > > > > > changes
> > > > > > > > > > > > external to the consumer and its membership logic and
> > is
> > > > not
> > > > > > > > proposed
> > > > > > > > > > as
> > > > > > > > > > > a
> > > > > > > > > > > > way to resolve temporary imbalances due to membership
> > > > changes
> > > > > > > that
> > > > > > > > > > should
> > > > > > > > > > > > inherently be resolved by the assignor logic itself
> > with
> > > > one
> > > > > or
> > > > > > > > more
> > > > > > > > > > > > consecutive rebalances.
> > > > > > > > > > > >
> > > > > > > > > > > > Also, in your javadoc I'd add some context similar to
> > > what
> > > > > > > someone
> > > > > > > > > can
> > > > > > > > > > > read
> > > > > > > > > > > > on the KIP. Specifically where you say: "for example
> if
> > > > some
> > > > > > > > > condition
> > > > > > > > > > > has
> > > > > > > > > > > > changed that has implications for the partition
> > > > assignment."
> > > > > > I'd
> > > > > > > > > rather
> > > > > > > > > > > add
> > > > > > > > > > > > something like "for example, if some condition
> external
> > > and
> > > > > > > > invisible
> > > > > > > > > > to
> > > > > > > > > > > > the Consumer and its group membership has changed in
> > ways
> > > > > that
> > > > > > > > would
> > > > > > > > > > > > justify a new partition assignment". That's just an
> > > > example,
> > > > > > feel
> > > > > > > > > free
> > > > > > > > > > to
> > > > > > > > > > > > reword, but I believe that saying explicitly that
> this
> > > > > > condition
> > > > > > > is
> > > > > > > > > not
> > > > > > > > > > > > visible to the consumer is useful to understand that
> > this
> > > > is
> > > > > > not
> > > > > > > > > > > necessary
> > > > > > > > > > > > under normal circumstances.
> > > > > > > > > > > >
> > > > > > > > > > > > In Compatibility, Deprecation, and Migration Plan
> > > section I
> > > > > > think
> > > > > > > > > it's
> > > > > > > > > > > > worth mentioning that this is a new feature that
> > affects
> > > > new
> > > > > > > > > > > > implementations of the Consumer interface and any
> such
> > > new
> > > > > > > > > > implementation
> > > > > > > > > > > > should override the new method. Implementations that
> > wish
> > > > to
> > > > > > > > upgrade
> > > > > > > > > > to a
> > > > > > > > > > > > newer version should be extended and recompiled,
> since
> > no
> > > > > > default
> > > > > > > > > > > > implementation will be provided.
> > > > > > > > > > > >
> > > > > > > > > > > > Naming is hard here, if someone wants to emphasize
> the
> > ad
> > > > hoc
> > > > > > and
> > > > > > > > > > > irregular
> > > > > > > > > > > > nature of this call. After some thought I'm fine with
> > > > > > > > > > 'enforceRebalance'
> > > > > > > > > > > > even if it could potentially be confused to a method
> > that
> > > > is
> > > > > > > > supposed
> > > > > > > > > > to
> > > > > > > > > > > be
> > > > > > > > > > > > called to remediate one or more previously
> unsuccessful
> > > > > > > rebalances
> > > > > > > > > > (which
> > > > > > > > > > > > is partly what StreamThread#enforceRebalance is used
> > > for).
> > > > > The
> > > > > > > > best I
> > > > > > > > > > > could
> > > > > > > > > > > > think of was 'onRequestRebalance' but that's not
> > perfect
> > > > > > either.
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Konstantine
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <
> > > > > > > > > > sophie@confluent.io
> > > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Thanks John. I took out the KafkaConsumer method
> and
> > > > moved
> > > > > > the
> > > > > > > > > > javadocs
> > > > > > > > > > > > > to the Consumer#enforceRebalance in the KIP -- hope
> > > > you're
> > > > > > > happy
> > > > > > > > :P
> > > > > > > > > > > > >
> > > > > > > > > > > > > Also, I wanted to point out one minor change to the
> > > > current
> > > > > > > > > proposal:
> > > > > > > > > > > > make
> > > > > > > > > > > > > this
> > > > > > > > > > > > > a blocking call, which accepts a timeout and
> returns
> > > > > whether
> > > > > > > the
> > > > > > > > > > > > rebalance
> > > > > > > > > > > > > completed within the timeout. It will still reduce
> > to a
> > > > > > > > nonblocking
> > > > > > > > > > > call
> > > > > > > > > > > > if
> > > > > > > > > > > > > a "zero"
> > > > > > > > > > > > > timeout is supplied. I've updated the KIP
> > accordingly.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Let me know if there are any further concerns, else
> > > I'll
> > > > > call
> > > > > > > > for a
> > > > > > > > > > > vote.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Cheers!
> > > > > > > > > > > > > Sophie
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Mon, Feb 10, 2020 at 12:47 PM John Roesler <
> > > > > > > > vvcephei@apache.org
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks Sophie,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Sorry I didn't respond. I think your new method
> > name
> > > > > sounds
> > > > > > > > good.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Regarding the interface vs implementation, I
> agree
> > > it's
> > > > > > > > > confusing.
> > > > > > > > > > > It's
> > > > > > > > > > > > > > always bothered me that the interface redirects
> you
> > > to
> > > > an
> > > > > > > > > > > > implementation
> > > > > > > > > > > > > > JavaDocs, but never enough for me to stop what
> I'm
> > > > doing
> > > > > to
> > > > > > > fix
> > > > > > > > > it.
> > > > > > > > > > > > > > It's not a big deal either way, I just thought it
> > was
> > > > > > strange
> > > > > > > > to
> > > > > > > > > > > > propose
> > > > > > > > > > > > > a
> > > > > > > > > > > > > > "public interface" change, but not in terms of
> the
> > > > actual
> > > > > > > > > interface
> > > > > > > > > > > > > class.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > It _is_ true that KafkaConsumer is also part of
> the
> > > > > public
> > > > > > > API,
> > > > > > > > > but
> > > > > > > > > > > > only
> > > > > > > > > > > > > > really
> > > > > > > > > > > > > > for the constructor. Any proposal to define a new
> > > > > "consumer
> > > > > > > > > client"
> > > > > > > > > > > API
> > > > > > > > > > > > > > should be on the Consumer interface (which you
> said
> > > you
> > > > > > plan
> > > > > > > to
> > > > > > > > > do
> > > > > > > > > > > > > anyway).
> > > > > > > > > > > > > > I guess I brought it up because proposing an
> > addition
> > > > to
> > > > > > > > Consumer
> > > > > > > > > > > > implies
> > > > > > > > > > > > > > it would be added to KafkaConsumer, but proposing
> > an
> > > > > > addition
> > > > > > > > to
> > > > > > > > > > > > > > KafkaConsumer does not necessarily imply it would
> > > also
> > > > be
> > > > > > > added
> > > > > > > > > to
> > > > > > > > > > > > > > Consumer. Does that make sense?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Anyway, thanks for updating the KIP.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > -John
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Mon, Feb 10, 2020, at 14:38, Sophie
> Blee-Goldman
> > > > > wrote:
> > > > > > > > > > > > > > > Since this doesn't seem too controversial, I'll
> > > > > probably
> > > > > > > call
> > > > > > > > > > for a
> > > > > > > > > > > > > vote
> > > > > > > > > > > > > > by
> > > > > > > > > > > > > > > end of day.
> > > > > > > > > > > > > > > If there any further
> comments/questions/concerns,
> > > > > please
> > > > > > > let
> > > > > > > > me
> > > > > > > > > > > know!
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Sophie
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Sat, Feb 8, 2020 at 12:19 AM Sophie
> > > Blee-Goldman <
> > > > > > > > > > > > > sophie@confluent.io
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks for the feedback! That's a good point
> > > about
> > > > > > trying
> > > > > > > > to
> > > > > > > > > > > > prevent
> > > > > > > > > > > > > > users
> > > > > > > > > > > > > > > > from
> > > > > > > > > > > > > > > > thinking they should use this API during
> normal
> > > > > > > processing
> > > > > > > > > and
> > > > > > > > > > > > > > clarifying
> > > > > > > > > > > > > > > > when/why
> > > > > > > > > > > > > > > > you might need it -- regardless of the method
> > > name,
> > > > > we
> > > > > > > > should
> > > > > > > > > > > > > > explicitly
> > > > > > > > > > > > > > > > call this out
> > > > > > > > > > > > > > > > in the javadocs.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > As for the method name, on reflection I agree
> > > that
> > > > > > > > > > "rejoinGroup"
> > > > > > > > > > > > does
> > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > seem to be
> > > > > > > > > > > > > > > > appropriate. Of course that's what the
> consumer
> > > > will
> > > > > > > > actually
> > > > > > > > > > be
> > > > > > > > > > > > > doing,
> > > > > > > > > > > > > > > > but that's just an
> > > > > > > > > > > > > > > > implementation detail -- the name should
> > reflect
> > > > what
> > > > > > the
> > > > > > > > API
> > > > > > > > > > is
> > > > > > > > > > > > > doing,
> > > > > > > > > > > > > > > > not how it does it
> > > > > > > > > > > > > > > > (which can always change).
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > How about "enforceRebalance"? This is stolen
> > from
> > > > the
> > > > > > > > > > > StreamThread
> > > > > > > > > > > > > > method
> > > > > > > > > > > > > > > > that does
> > > > > > > > > > > > > > > > exactly this (by unsubscribing) so it seems
> to
> > > fit.
> > > > > > I'll
> > > > > > > > > update
> > > > > > > > > > > the
> > > > > > > > > > > > > KIP
> > > > > > > > > > > > > > > > with this unless anyone
> > > > > > > > > > > > > > > > has another suggestion.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Regarding the Consumer vs KafkaConsumer
> > matter, I
> > > > > > > included
> > > > > > > > > the
> > > > > > > > > > > > > > > > KafkaConsumer method
> > > > > > > > > > > > > > > > because that's where all the javadocs
> redirect
> > to
> > > > in
> > > > > > the
> > > > > > > > > > Consumer
> > > > > > > > > > > > > > > > interface. Also, FWIW
> > > > > > > > > > > > > > > > I'm pretty sure KafkaConsumer is also part of
> > the
> > > > > > public
> > > > > > > > API
> > > > > > > > > --
> > > > > > > > > > > we
> > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > be adding a new
> > > > > > > > > > > > > > > > method to both.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <
> > > > > > > > > > vvcephei@apache.org
> > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >> Hi all,
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> Thanks for the well motivated KIP, Sophie. I
> > had
> > > > > some
> > > > > > > > > > > alternatives
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > > >> mind, which
> > > > > > > > > > > > > > > >> I won't even bother to relate because I feel
> > > like
> > > > > the
> > > > > > > > > > motivation
> > > > > > > > > > > > > made
> > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > >> compelling
> > > > > > > > > > > > > > > >> argument for the API as proposed.
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> One very minor point you might as well fix
> is
> > > that
> > > > > the
> > > > > > > API
> > > > > > > > > > > change
> > > > > > > > > > > > is
> > > > > > > > > > > > > > > >> targeted at
> > > > > > > > > > > > > > > >> KafkaConsumer (the implementation), but
> should
> > > be
> > > > > > > targeted
> > > > > > > > > at
> > > > > > > > > > > > > > > >> Consumer (the interface).
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> I agree with your discomfort about the name.
> > > > Adding
> > > > > a
> > > > > > > > > "rejoin"
> > > > > > > > > > > > > method
> > > > > > > > > > > > > > > >> seems strange
> > > > > > > > > > > > > > > >> since there's no "join" method. Instead the
> > way
> > > > you
> > > > > > join
> > > > > > > > the
> > > > > > > > > > > group
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > >> first time is just
> > > > > > > > > > > > > > > >> by calling "subscribe". But "resubscribe"
> > seems
> > > > too
> > > > > > > > indirect
> > > > > > > > > > > from
> > > > > > > > > > > > > what
> > > > > > > > > > > > > > > >> we're really trying
> > > > > > > > > > > > > > > >> to do, which is to trigger a rebalance by
> > > sending
> > > > a
> > > > > > new
> > > > > > > > > > > JoinGroup
> > > > > > > > > > > > > > request.
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> Another angle is that we don't want the
> method
> > > to
> > > > > > sound
> > > > > > > > like
> > > > > > > > > > > > > something
> > > > > > > > > > > > > > > >> you should
> > > > > > > > > > > > > > > >> be calling in normal circumstances, or
> people
> > > will
> > > > > be
> > > > > > > > > > "tricked"
> > > > > > > > > > > > into
> > > > > > > > > > > > > > > >> calling it unnecessarily.
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> So, I think "rejoinGroup" is fine, although
> a
> > > > person
> > > > > > > > _might_
> > > > > > > > > > be
> > > > > > > > > > > > > > forgiven
> > > > > > > > > > > > > > > >> for thinking they
> > > > > > > > > > > > > > > >> need to call it periodically or something.
> Did
> > > you
> > > > > > > > consider
> > > > > > > > > > > > > > > >> "triggerRebalance", which
> > > > > > > > > > > > > > > >> sounds pretty advanced-ish, and accurately
> > > > describes
> > > > > > > what
> > > > > > > > > > > happens
> > > > > > > > > > > > > when
> > > > > > > > > > > > > > > >> you call it?
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> All in all, the KIP sounds good to me, and
> I'm
> > > in
> > > > > > favor.
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > > > > >> -John
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald
> > > > wrote:
> > > > > > > > > > > > > > > >> > This situation was discussed at length
> > after a
> > > > > > recent
> > > > > > > > > talk I
> > > > > > > > > > > > gave.
> > > > > > > > > > > > > > This
> > > > > > > > > > > > > > > >> KIP
> > > > > > > > > > > > > > > >> > would be a great step towards increased
> > > > > availability
> > > > > > > and
> > > > > > > > > in
> > > > > > > > > > > > > > facilitating
> > > > > > > > > > > > > > > >> > lightweight rebalances.
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > anna
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie
> > > > Blee-Goldman <
> > > > > > > > > > > > > > sophie@confluent.io>
> > > > > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > > Hi all,
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > In light of some recent and upcoming
> > > > rebalancing
> > > > > > and
> > > > > > > > > > > > > availability
> > > > > > > > > > > > > > > >> > > improvements, it seems we have a need
> for
> > > > > > explicitly
> > > > > > > > > > > > triggering
> > > > > > > > > > > > > a
> > > > > > > > > > > > > > > >> consumer
> > > > > > > > > > > > > > > >> > > group rebalance. Therefore I'd like to
> > > propose
> > > > > > > adding
> > > > > > > > a
> > > > > > > > > > new
> > > > > > > > > > > > > > > >> > > rejoinGroup()method
> > > > > > > > > > > > > > > >> > > to the Consumer client (better method
> name
> > > > > > > suggestions
> > > > > > > > > are
> > > > > > > > > > > > very
> > > > > > > > > > > > > > > >> welcome).
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > Please take a look at the KIP and let me
> > > know
> > > > > what
> > > > > > > you
> > > > > > > > > > > think!
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > KIP document:
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > JIRA:
> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-9525
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > Cheers,
> > > > > > > > > > > > > > > >> > > Sophie
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Ok, I think I see what you're getting at.

No, we obviously would only want to trigger one additional rebalance after
the current one completes since the next rebalance would of course have
the metadata updates we want. But if enforceRebalance returns false a
second time, we don't know whether it was due to a new rebalance or just
the original rebalance taking a while. Even if we could, it's possible a new
update came along at some point and it would get quite messy to keep track
of when we do/don't need to retry.

Except, of course, by checking the resulting assignment to see if it
reflects
the latest metadata that we think it should. Which can simply be done in
the assignor#onAssignment method, since as pointed out in the javadocs
usage of this API implies a custom assignor implementation.

If we have to check the assignment anyway, I suppose there's no reason to
return anything. My one outstanding concern is that some apps might not
actually be able to detect whether the appropriate/latest metadata was used
based on the resulting assignment. Obviously the assign algorithm is known
to all members through their own assignor, but the cluster-wide metadata is
not. I'm not quite convinced that all possible assignments would be as
straightforward for each member to validate as in, for example, KIP-441
where
we just look for the active task.

Perhaps my imagination is running wild here but I'm inclined to still return
whether a new rebalance was triggered or not. It can always be ignored

On Wed, Feb 12, 2020 at 5:18 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Sophie,
>
> So just to clarify, with the updated API we would keep calling
> enforceRebalance until it returns true for cases where we rely on it with
> new subscription metadata?
>
> Guozhang
>
> On Wed, Feb 12, 2020 at 5:14 PM Sophie Blee-Goldman <so...@confluent.io>
> wrote:
>
> > Thanks Boyang -- makes sense to me. I've optimistically updated the KIP
> > with this new signature and behavior.
> >
> >
> > On Wed, Feb 12, 2020 at 4:27 PM Boyang Chen <re...@gmail.com>
> > wrote:
> >
> > > Hey Sophie,
> > >
> > > I'm satisfied with making enforceRebalance() not throwing any exception
> > > other than illegal state. You could imagine this KIP is just making the
> > > `rejoinNeededOrPending` external to user requests. Make it as
> lightweight
> > > as possible makes sense.
> > >
> > > Boyang
> > >
> > > On Wed, Feb 12, 2020 at 2:14 PM Sophie Blee-Goldman <
> sophie@confluent.io
> > >
> > > wrote:
> > >
> > > > Hey Guozhang, thanks for the thorough reply!
> > > >
> > > > I definitely went back and forth on whether to make it a blocking
> call,
> > > > and ultimately went with blocking just to leave it open to potential
> > > future
> > > > use cases (in particular non-Streams apps). But on second (or third)
> > > > thought I think I agree that no use case wouldn't be similarly
> covered
> > by
> > > > just calling poll() immediately after enforceRebalance(). It seems
> best
> > > to
> > > > leave all rebalancing action within the scope of poll alone and avoid
> > > > introducing unnecessary complexity -- happy to revert this then.
> > > >
> > > > I think that ends up addressing most of your other concerns, although
> > > > there's one I would push back on: this method should still explicitly
> > > > call out whether a rebalance is already in progress and the call is
> > thus
> > > > a no-op. If throwing a RebalanceInProgressException seems too
> > > > heavy maybe we can just return a boolean indicating whether a new
> > > > rebalance was triggered or not.
> > > >
> > > > The snippet you included does work around this, by checking the
> > > > condition again in the rebalance listener. But I would argue that a)
> > many
> > > > applications don't use a rebalance listener, and shouldn't be forced
> to
> > > > implement it to fully use this new API. More importantly, since you
> can
> > > > probably use the assignor's onAssignment method to achieve the same
> > > > thing, b) it adds unnecessary complexity, and as we've seen in
> Streams
> > > > the interactions between the rebalance callbacks and main consumer
> > > > can already get quite ugly.
> > > >
> > > > For simplicity's sake then, I'll propose to just return the bool over
> > the
> > > > exception and change the signature to
> > > >
> > > > /**
> > > >  * @return Whether a new rebalance was triggered (false if a
> rebalance
> > > was
> > > > already in progress)
> > > >  * @throws java.lang.IllegalStateException if the consumer does not
> use
> > > > group subscription
> > > >  */
> > > > boolean enforceRebalance();
> > > >
> > > > Thoughts?
> > > >
> > > > On Tue, Feb 11, 2020 at 5:29 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Sophie, thanks for brining up this KIP, and the great
> write-up
> > > > > summarizing the motivations of the proposal. Here are some
> comments:
> > > > >
> > > > > Minor:
> > > > >
> > > > > 1. If we want to make it a blocking call (I have some thoughts
> about
> > > this
> > > > > below :), to be consistent we need to consider having two
> overloaded
> > > > > function, one without the timeout which then relies on `
> > > > > DEFAULT_API_TIMEOUT_MS_CONFIG`.
> > > > >
> > > > > 2. Also I'd suggest that, again for API consistency, we a) throw
> > > > > TimeoutException if the operation cannot be completed within the
> > > timeout
> > > > > value, b) return false immediately if we cannot trigger a rebalance
> > > > either
> > > > > because coordinator is unknown.
> > > > >
> > > > > Meta:
> > > > >
> > > > > 3. I'm not sure if we have a concrete scenario that we want to wait
> > > until
> > > > > the rebalance is completed in KIP-441 / 268, rather than calling
> > > > > "consumer.enforceRebalance(); consumer.poll()" consecutively and
> try
> > to
> > > > > execute the rebalance in the poll call? If there's no valid
> > motivations
> > > > I'm
> > > > > still a bit inclined to make it non-blocking (i.e. just setting a
> bit
> > > and
> > > > > then execute the process in the later poll call) similar to our
> > `seek`
> > > > > functions. By doing this we can also make this function simpler as
> it
> > > > would
> > > > > never throw RebalanceInProgress or Timeout or even KafkaExceptions.
> > > > >
> > > > > 4. Re: the case "when a rebalance is already in progress", this may
> > be
> > > > > related to 3) above. I think we can simplify this case as well but
> > just
> > > > not
> > > > > triggering a new rebalance and let the the caller handle it: for
> > > example
> > > > in
> > > > > KIP-441, in each iteration of the stream thread, we can if a
> standby
> > > task
> > > > > is ready, and if yes we call `enforceRebalance`, if there's
> already a
> > > > > rebalance in progress (either with the new subscription metadata,
> or
> > > not)
> > > > > this call would be a no-op, and then in the next iteration we would
> > > just
> > > > > call that function again, and eventually we would trigger the
> > rebalance
> > > > > with the new subscription metadata and previous calls would be
> no-op
> > > and
> > > > > hence no cost anyways. I feel this would be simpler than letting
> the
> > > > caller
> > > > > to capture RebalanceInProgressException:
> > > > >
> > > > >
> > > > > mainProcessingLoop() {
> > > > >     if (needsRebalance) {
> > > > >         consumer.enforceRebalance();
> > > > >     }
> > > > >
> > > > >     records = consumer.poll();
> > > > >     ...
> > > > >     // do some processing
> > > > > }
> > > > >
> > > > > RebalanceListener {
> > > > >
> > > > >    onPartitionsAssigned(...) {
> > > > >       if (rebalanceGoalAchieved()) {
> > > > >         needsRebalance = false;
> > > > >       }
> > > > >     }
> > > > > }
> > > > >
> > > > >
> > > > > WDYT?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Feb 11, 2020 at 3:59 PM Sophie Blee-Goldman <
> > > sophie@confluent.io
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hey Boyang,
> > > > > >
> > > > > > Originally I had it as a nonblocking call, but decided to change
> it
> > > to
> > > > > > blocking
> > > > > > with a timeout parameter. I'm not sure a future makes sense to
> > return
> > > > > here,
> > > > > > because the rebalance either does or does not complete within the
> > > > > timeout:
> > > > > > if it does not, you will have to call poll again to complete it
> (as
> > > is
> > > > > the
> > > > > > case with
> > > > > > any other rebalance). I'll call this out in the javadocs as well.
> > > > > >
> > > > > > I also added an example demonstrating how/when to use this new
> API.
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > > > > On Tue, Feb 11, 2020 at 1:49 PM Boyang Chen <
> > > > reluctanthero104@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hey Sophie,
> > > > > > >
> > > > > > > is the `enforceRebalance` a blocking call? Could we add a code
> > > sample
> > > > > to
> > > > > > > the KIP on how this API should be used?
> > > > > > >
> > > > > > > Returning a future instead of a boolean might be easier as we
> are
> > > > > > allowing
> > > > > > > consumer to make progress during rebalance after 429 IMHO.
> > > > > > >
> > > > > > > Boyang
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Feb 11, 2020 at 1:17 PM Konstantine Karantasis <
> > > > > > > konstantine@confluent.io> wrote:
> > > > > > >
> > > > > > > > Thanks for the quick turnaround Sophie. My points have been
> > > > > addressed.
> > > > > > > > I think the intended use is quite clear now.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Konstantine
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Feb 11, 2020 at 12:57 PM Sophie Blee-Goldman <
> > > > > > > sophie@confluent.io>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Konstantine,
> > > > > > > > > Thanks for the feedback! I've updated the sections with
> your
> > > > > > > > suggestions. I
> > > > > > > > > agree
> > > > > > > > > in particular that it's really important to make sure users
> > > don't
> > > > > > call
> > > > > > > > this
> > > > > > > > > unnecessarily,
> > > > > > > > >  or for the wrong reasons: to that end I also extended the
> > > > javadocs
> > > > > > to
> > > > > > > > > specify that this
> > > > > > > > > API is for when changes to the subscription userdata occur.
> > > > > Hopefully
> > > > > > > > that
> > > > > > > > > should make
> > > > > > > > > its intended usage quite clear.
> > > > > > > > >
> > > > > > > > > Bill,
> > > > > > > > > The rebalance triggered by this new API will be a "normal"
> > > > > rebalance,
> > > > > > > and
> > > > > > > > > therefore
> > > > > > > > > follow the existing listener semantics. For example a
> > > cooperative
> > > > > > > > rebalance
> > > > > > > > > will always
> > > > > > > > > call onPartitionsAssigned, even if no partitions are
> actually
> > > > > moved.
> > > > > > > > > An eager rebalance will still revoke all partitions first
> > > anyway.
> > > > > > > > >
> > > > > > > > > Thanks for the feedback!
> > > > > > > > > Sophie
> > > > > > > > >
> > > > > > > > > On Tue, Feb 11, 2020 at 9:52 AM Bill Bejeck <
> > bbejeck@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Sophie,
> > > > > > > > > >
> > > > > > > > > > Thanks for the KIP, makes sense to me.
> > > > > > > > > >
> > > > > > > > > > One quick question, I'm not sure if it's relevant or not.
> > > > > > > > > >
> > > > > > > > > > If a user provides a `ConsumerRebalanceListener` and a
> > > > rebalance
> > > > > is
> > > > > > > > > > triggered from an `enforceRebalance`  call,
> > > > > > > > > > it seems possible the listener won't get called since
> > > partition
> > > > > > > > > assignments
> > > > > > > > > > might not change.
> > > > > > > > > > If that is the case, do we want to possibly consider
> > adding a
> > > > > > method
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > `ConsumerRebalanceListener` for callbacks on
> > > `enforceRebalance`
> > > > > > > > actions?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Bill
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Tue, Feb 11, 2020 at 12:11 PM Konstantine Karantasis <
> > > > > > > > > > konstantine@confluent.io> wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Sophie.
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the KIP. I liked how focused the proposal
> is.
> > > > Also,
> > > > > > its
> > > > > > > > > > > motivation is clear after carefully reading the KIP and
> > its
> > > > > > > > references.
> > > > > > > > > > >
> > > > > > > > > > > Yet, I think it'd be a good idea to call out explicitly
> > on
> > > > the
> > > > > > > > Rejected
> > > > > > > > > > > Alternatives section that an automatic and periodic
> > > > triggering
> > > > > of
> > > > > > > > > > > rebalances that would not require exposing this
> > capability
> > > > > > through
> > > > > > > > the
> > > > > > > > > > > Consumer interface does not cover your specific use
> cases
> > > and
> > > > > > > > therefore
> > > > > > > > > > is
> > > > > > > > > > > not chosen as a desired approach. Maybe, even consider
> > > > > mentioning
> > > > > > > > again
> > > > > > > > > > > here that this method is expected to be used to respond
> > to
> > > > > system
> > > > > > > > > changes
> > > > > > > > > > > external to the consumer and its membership logic and
> is
> > > not
> > > > > > > proposed
> > > > > > > > > as
> > > > > > > > > > a
> > > > > > > > > > > way to resolve temporary imbalances due to membership
> > > changes
> > > > > > that
> > > > > > > > > should
> > > > > > > > > > > inherently be resolved by the assignor logic itself
> with
> > > one
> > > > or
> > > > > > > more
> > > > > > > > > > > consecutive rebalances.
> > > > > > > > > > >
> > > > > > > > > > > Also, in your javadoc I'd add some context similar to
> > what
> > > > > > someone
> > > > > > > > can
> > > > > > > > > > read
> > > > > > > > > > > on the KIP. Specifically where you say: "for example if
> > > some
> > > > > > > > condition
> > > > > > > > > > has
> > > > > > > > > > > changed that has implications for the partition
> > > assignment."
> > > > > I'd
> > > > > > > > rather
> > > > > > > > > > add
> > > > > > > > > > > something like "for example, if some condition external
> > and
> > > > > > > invisible
> > > > > > > > > to
> > > > > > > > > > > the Consumer and its group membership has changed in
> ways
> > > > that
> > > > > > > would
> > > > > > > > > > > justify a new partition assignment". That's just an
> > > example,
> > > > > feel
> > > > > > > > free
> > > > > > > > > to
> > > > > > > > > > > reword, but I believe that saying explicitly that this
> > > > > condition
> > > > > > is
> > > > > > > > not
> > > > > > > > > > > visible to the consumer is useful to understand that
> this
> > > is
> > > > > not
> > > > > > > > > > necessary
> > > > > > > > > > > under normal circumstances.
> > > > > > > > > > >
> > > > > > > > > > > In Compatibility, Deprecation, and Migration Plan
> > section I
> > > > > think
> > > > > > > > it's
> > > > > > > > > > > worth mentioning that this is a new feature that
> affects
> > > new
> > > > > > > > > > > implementations of the Consumer interface and any such
> > new
> > > > > > > > > implementation
> > > > > > > > > > > should override the new method. Implementations that
> wish
> > > to
> > > > > > > upgrade
> > > > > > > > > to a
> > > > > > > > > > > newer version should be extended and recompiled, since
> no
> > > > > default
> > > > > > > > > > > implementation will be provided.
> > > > > > > > > > >
> > > > > > > > > > > Naming is hard here, if someone wants to emphasize the
> ad
> > > hoc
> > > > > and
> > > > > > > > > > irregular
> > > > > > > > > > > nature of this call. After some thought I'm fine with
> > > > > > > > > 'enforceRebalance'
> > > > > > > > > > > even if it could potentially be confused to a method
> that
> > > is
> > > > > > > supposed
> > > > > > > > > to
> > > > > > > > > > be
> > > > > > > > > > > called to remediate one or more previously unsuccessful
> > > > > > rebalances
> > > > > > > > > (which
> > > > > > > > > > > is partly what StreamThread#enforceRebalance is used
> > for).
> > > > The
> > > > > > > best I
> > > > > > > > > > could
> > > > > > > > > > > think of was 'onRequestRebalance' but that's not
> perfect
> > > > > either.
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Konstantine
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <
> > > > > > > > > sophie@confluent.io
> > > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thanks John. I took out the KafkaConsumer method and
> > > moved
> > > > > the
> > > > > > > > > javadocs
> > > > > > > > > > > > to the Consumer#enforceRebalance in the KIP -- hope
> > > you're
> > > > > > happy
> > > > > > > :P
> > > > > > > > > > > >
> > > > > > > > > > > > Also, I wanted to point out one minor change to the
> > > current
> > > > > > > > proposal:
> > > > > > > > > > > make
> > > > > > > > > > > > this
> > > > > > > > > > > > a blocking call, which accepts a timeout and returns
> > > > whether
> > > > > > the
> > > > > > > > > > > rebalance
> > > > > > > > > > > > completed within the timeout. It will still reduce
> to a
> > > > > > > nonblocking
> > > > > > > > > > call
> > > > > > > > > > > if
> > > > > > > > > > > > a "zero"
> > > > > > > > > > > > timeout is supplied. I've updated the KIP
> accordingly.
> > > > > > > > > > > >
> > > > > > > > > > > > Let me know if there are any further concerns, else
> > I'll
> > > > call
> > > > > > > for a
> > > > > > > > > > vote.
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers!
> > > > > > > > > > > > Sophie
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Feb 10, 2020 at 12:47 PM John Roesler <
> > > > > > > vvcephei@apache.org
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Thanks Sophie,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Sorry I didn't respond. I think your new method
> name
> > > > sounds
> > > > > > > good.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regarding the interface vs implementation, I agree
> > it's
> > > > > > > > confusing.
> > > > > > > > > > It's
> > > > > > > > > > > > > always bothered me that the interface redirects you
> > to
> > > an
> > > > > > > > > > > implementation
> > > > > > > > > > > > > JavaDocs, but never enough for me to stop what I'm
> > > doing
> > > > to
> > > > > > fix
> > > > > > > > it.
> > > > > > > > > > > > > It's not a big deal either way, I just thought it
> was
> > > > > strange
> > > > > > > to
> > > > > > > > > > > propose
> > > > > > > > > > > > a
> > > > > > > > > > > > > "public interface" change, but not in terms of the
> > > actual
> > > > > > > > interface
> > > > > > > > > > > > class.
> > > > > > > > > > > > >
> > > > > > > > > > > > > It _is_ true that KafkaConsumer is also part of the
> > > > public
> > > > > > API,
> > > > > > > > but
> > > > > > > > > > > only
> > > > > > > > > > > > > really
> > > > > > > > > > > > > for the constructor. Any proposal to define a new
> > > > "consumer
> > > > > > > > client"
> > > > > > > > > > API
> > > > > > > > > > > > > should be on the Consumer interface (which you said
> > you
> > > > > plan
> > > > > > to
> > > > > > > > do
> > > > > > > > > > > > anyway).
> > > > > > > > > > > > > I guess I brought it up because proposing an
> addition
> > > to
> > > > > > > Consumer
> > > > > > > > > > > implies
> > > > > > > > > > > > > it would be added to KafkaConsumer, but proposing
> an
> > > > > addition
> > > > > > > to
> > > > > > > > > > > > > KafkaConsumer does not necessarily imply it would
> > also
> > > be
> > > > > > added
> > > > > > > > to
> > > > > > > > > > > > > Consumer. Does that make sense?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Anyway, thanks for updating the KIP.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > -John
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman
> > > > wrote:
> > > > > > > > > > > > > > Since this doesn't seem too controversial, I'll
> > > > probably
> > > > > > call
> > > > > > > > > for a
> > > > > > > > > > > > vote
> > > > > > > > > > > > > by
> > > > > > > > > > > > > > end of day.
> > > > > > > > > > > > > > If there any further comments/questions/concerns,
> > > > please
> > > > > > let
> > > > > > > me
> > > > > > > > > > know!
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > Sophie
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Sat, Feb 8, 2020 at 12:19 AM Sophie
> > Blee-Goldman <
> > > > > > > > > > > > sophie@confluent.io
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks for the feedback! That's a good point
> > about
> > > > > trying
> > > > > > > to
> > > > > > > > > > > prevent
> > > > > > > > > > > > > users
> > > > > > > > > > > > > > > from
> > > > > > > > > > > > > > > thinking they should use this API during normal
> > > > > > processing
> > > > > > > > and
> > > > > > > > > > > > > clarifying
> > > > > > > > > > > > > > > when/why
> > > > > > > > > > > > > > > you might need it -- regardless of the method
> > name,
> > > > we
> > > > > > > should
> > > > > > > > > > > > > explicitly
> > > > > > > > > > > > > > > call this out
> > > > > > > > > > > > > > > in the javadocs.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > As for the method name, on reflection I agree
> > that
> > > > > > > > > "rejoinGroup"
> > > > > > > > > > > does
> > > > > > > > > > > > > not
> > > > > > > > > > > > > > > seem to be
> > > > > > > > > > > > > > > appropriate. Of course that's what the consumer
> > > will
> > > > > > > actually
> > > > > > > > > be
> > > > > > > > > > > > doing,
> > > > > > > > > > > > > > > but that's just an
> > > > > > > > > > > > > > > implementation detail -- the name should
> reflect
> > > what
> > > > > the
> > > > > > > API
> > > > > > > > > is
> > > > > > > > > > > > doing,
> > > > > > > > > > > > > > > not how it does it
> > > > > > > > > > > > > > > (which can always change).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > How about "enforceRebalance"? This is stolen
> from
> > > the
> > > > > > > > > > StreamThread
> > > > > > > > > > > > > method
> > > > > > > > > > > > > > > that does
> > > > > > > > > > > > > > > exactly this (by unsubscribing) so it seems to
> > fit.
> > > > > I'll
> > > > > > > > update
> > > > > > > > > > the
> > > > > > > > > > > > KIP
> > > > > > > > > > > > > > > with this unless anyone
> > > > > > > > > > > > > > > has another suggestion.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Regarding the Consumer vs KafkaConsumer
> matter, I
> > > > > > included
> > > > > > > > the
> > > > > > > > > > > > > > > KafkaConsumer method
> > > > > > > > > > > > > > > because that's where all the javadocs redirect
> to
> > > in
> > > > > the
> > > > > > > > > Consumer
> > > > > > > > > > > > > > > interface. Also, FWIW
> > > > > > > > > > > > > > > I'm pretty sure KafkaConsumer is also part of
> the
> > > > > public
> > > > > > > API
> > > > > > > > --
> > > > > > > > > > we
> > > > > > > > > > > > > would
> > > > > > > > > > > > > > > be adding a new
> > > > > > > > > > > > > > > method to both.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <
> > > > > > > > > vvcephei@apache.org
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> Hi all,
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Thanks for the well motivated KIP, Sophie. I
> had
> > > > some
> > > > > > > > > > alternatives
> > > > > > > > > > > > in
> > > > > > > > > > > > > > >> mind, which
> > > > > > > > > > > > > > >> I won't even bother to relate because I feel
> > like
> > > > the
> > > > > > > > > motivation
> > > > > > > > > > > > made
> > > > > > > > > > > > > a
> > > > > > > > > > > > > > >> compelling
> > > > > > > > > > > > > > >> argument for the API as proposed.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> One very minor point you might as well fix is
> > that
> > > > the
> > > > > > API
> > > > > > > > > > change
> > > > > > > > > > > is
> > > > > > > > > > > > > > >> targeted at
> > > > > > > > > > > > > > >> KafkaConsumer (the implementation), but should
> > be
> > > > > > targeted
> > > > > > > > at
> > > > > > > > > > > > > > >> Consumer (the interface).
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> I agree with your discomfort about the name.
> > > Adding
> > > > a
> > > > > > > > "rejoin"
> > > > > > > > > > > > method
> > > > > > > > > > > > > > >> seems strange
> > > > > > > > > > > > > > >> since there's no "join" method. Instead the
> way
> > > you
> > > > > join
> > > > > > > the
> > > > > > > > > > group
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> first time is just
> > > > > > > > > > > > > > >> by calling "subscribe". But "resubscribe"
> seems
> > > too
> > > > > > > indirect
> > > > > > > > > > from
> > > > > > > > > > > > what
> > > > > > > > > > > > > > >> we're really trying
> > > > > > > > > > > > > > >> to do, which is to trigger a rebalance by
> > sending
> > > a
> > > > > new
> > > > > > > > > > JoinGroup
> > > > > > > > > > > > > request.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Another angle is that we don't want the method
> > to
> > > > > sound
> > > > > > > like
> > > > > > > > > > > > something
> > > > > > > > > > > > > > >> you should
> > > > > > > > > > > > > > >> be calling in normal circumstances, or people
> > will
> > > > be
> > > > > > > > > "tricked"
> > > > > > > > > > > into
> > > > > > > > > > > > > > >> calling it unnecessarily.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> So, I think "rejoinGroup" is fine, although a
> > > person
> > > > > > > _might_
> > > > > > > > > be
> > > > > > > > > > > > > forgiven
> > > > > > > > > > > > > > >> for thinking they
> > > > > > > > > > > > > > >> need to call it periodically or something. Did
> > you
> > > > > > > consider
> > > > > > > > > > > > > > >> "triggerRebalance", which
> > > > > > > > > > > > > > >> sounds pretty advanced-ish, and accurately
> > > describes
> > > > > > what
> > > > > > > > > > happens
> > > > > > > > > > > > when
> > > > > > > > > > > > > > >> you call it?
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> All in all, the KIP sounds good to me, and I'm
> > in
> > > > > favor.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > > > >> -John
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald
> > > wrote:
> > > > > > > > > > > > > > >> > This situation was discussed at length
> after a
> > > > > recent
> > > > > > > > talk I
> > > > > > > > > > > gave.
> > > > > > > > > > > > > This
> > > > > > > > > > > > > > >> KIP
> > > > > > > > > > > > > > >> > would be a great step towards increased
> > > > availability
> > > > > > and
> > > > > > > > in
> > > > > > > > > > > > > facilitating
> > > > > > > > > > > > > > >> > lightweight rebalances.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > anna
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie
> > > Blee-Goldman <
> > > > > > > > > > > > > sophie@confluent.io>
> > > > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > > Hi all,
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > In light of some recent and upcoming
> > > rebalancing
> > > > > and
> > > > > > > > > > > > availability
> > > > > > > > > > > > > > >> > > improvements, it seems we have a need for
> > > > > explicitly
> > > > > > > > > > > triggering
> > > > > > > > > > > > a
> > > > > > > > > > > > > > >> consumer
> > > > > > > > > > > > > > >> > > group rebalance. Therefore I'd like to
> > propose
> > > > > > adding
> > > > > > > a
> > > > > > > > > new
> > > > > > > > > > > > > > >> > > rejoinGroup()method
> > > > > > > > > > > > > > >> > > to the Consumer client (better method name
> > > > > > suggestions
> > > > > > > > are
> > > > > > > > > > > very
> > > > > > > > > > > > > > >> welcome).
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > Please take a look at the KIP and let me
> > know
> > > > what
> > > > > > you
> > > > > > > > > > think!
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > KIP document:
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > JIRA:
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-9525
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > Cheers,
> > > > > > > > > > > > > > >> > > Sophie
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Sophie,

So just to clarify, with the updated API we would keep calling
enforceRebalance until it returns true for cases where we rely on it with
new subscription metadata?

Guozhang

On Wed, Feb 12, 2020 at 5:14 PM Sophie Blee-Goldman <so...@confluent.io>
wrote:

> Thanks Boyang -- makes sense to me. I've optimistically updated the KIP
> with this new signature and behavior.
>
>
> On Wed, Feb 12, 2020 at 4:27 PM Boyang Chen <re...@gmail.com>
> wrote:
>
> > Hey Sophie,
> >
> > I'm satisfied with making enforceRebalance() not throwing any exception
> > other than illegal state. You could imagine this KIP is just making the
> > `rejoinNeededOrPending` external to user requests. Make it as lightweight
> > as possible makes sense.
> >
> > Boyang
> >
> > On Wed, Feb 12, 2020 at 2:14 PM Sophie Blee-Goldman <sophie@confluent.io
> >
> > wrote:
> >
> > > Hey Guozhang, thanks for the thorough reply!
> > >
> > > I definitely went back and forth on whether to make it a blocking call,
> > > and ultimately went with blocking just to leave it open to potential
> > future
> > > use cases (in particular non-Streams apps). But on second (or third)
> > > thought I think I agree that no use case wouldn't be similarly covered
> by
> > > just calling poll() immediately after enforceRebalance(). It seems best
> > to
> > > leave all rebalancing action within the scope of poll alone and avoid
> > > introducing unnecessary complexity -- happy to revert this then.
> > >
> > > I think that ends up addressing most of your other concerns, although
> > > there's one I would push back on: this method should still explicitly
> > > call out whether a rebalance is already in progress and the call is
> thus
> > > a no-op. If throwing a RebalanceInProgressException seems too
> > > heavy maybe we can just return a boolean indicating whether a new
> > > rebalance was triggered or not.
> > >
> > > The snippet you included does work around this, by checking the
> > > condition again in the rebalance listener. But I would argue that a)
> many
> > > applications don't use a rebalance listener, and shouldn't be forced to
> > > implement it to fully use this new API. More importantly, since you can
> > > probably use the assignor's onAssignment method to achieve the same
> > > thing, b) it adds unnecessary complexity, and as we've seen in Streams
> > > the interactions between the rebalance callbacks and main consumer
> > > can already get quite ugly.
> > >
> > > For simplicity's sake then, I'll propose to just return the bool over
> the
> > > exception and change the signature to
> > >
> > > /**
> > >  * @return Whether a new rebalance was triggered (false if a rebalance
> > was
> > > already in progress)
> > >  * @throws java.lang.IllegalStateException if the consumer does not use
> > > group subscription
> > >  */
> > > boolean enforceRebalance();
> > >
> > > Thoughts?
> > >
> > > On Tue, Feb 11, 2020 at 5:29 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hello Sophie, thanks for brining up this KIP, and the great write-up
> > > > summarizing the motivations of the proposal. Here are some comments:
> > > >
> > > > Minor:
> > > >
> > > > 1. If we want to make it a blocking call (I have some thoughts about
> > this
> > > > below :), to be consistent we need to consider having two overloaded
> > > > function, one without the timeout which then relies on `
> > > > DEFAULT_API_TIMEOUT_MS_CONFIG`.
> > > >
> > > > 2. Also I'd suggest that, again for API consistency, we a) throw
> > > > TimeoutException if the operation cannot be completed within the
> > timeout
> > > > value, b) return false immediately if we cannot trigger a rebalance
> > > either
> > > > because coordinator is unknown.
> > > >
> > > > Meta:
> > > >
> > > > 3. I'm not sure if we have a concrete scenario that we want to wait
> > until
> > > > the rebalance is completed in KIP-441 / 268, rather than calling
> > > > "consumer.enforceRebalance(); consumer.poll()" consecutively and try
> to
> > > > execute the rebalance in the poll call? If there's no valid
> motivations
> > > I'm
> > > > still a bit inclined to make it non-blocking (i.e. just setting a bit
> > and
> > > > then execute the process in the later poll call) similar to our
> `seek`
> > > > functions. By doing this we can also make this function simpler as it
> > > would
> > > > never throw RebalanceInProgress or Timeout or even KafkaExceptions.
> > > >
> > > > 4. Re: the case "when a rebalance is already in progress", this may
> be
> > > > related to 3) above. I think we can simplify this case as well but
> just
> > > not
> > > > triggering a new rebalance and let the the caller handle it: for
> > example
> > > in
> > > > KIP-441, in each iteration of the stream thread, we can if a standby
> > task
> > > > is ready, and if yes we call `enforceRebalance`, if there's already a
> > > > rebalance in progress (either with the new subscription metadata, or
> > not)
> > > > this call would be a no-op, and then in the next iteration we would
> > just
> > > > call that function again, and eventually we would trigger the
> rebalance
> > > > with the new subscription metadata and previous calls would be no-op
> > and
> > > > hence no cost anyways. I feel this would be simpler than letting the
> > > caller
> > > > to capture RebalanceInProgressException:
> > > >
> > > >
> > > > mainProcessingLoop() {
> > > >     if (needsRebalance) {
> > > >         consumer.enforceRebalance();
> > > >     }
> > > >
> > > >     records = consumer.poll();
> > > >     ...
> > > >     // do some processing
> > > > }
> > > >
> > > > RebalanceListener {
> > > >
> > > >    onPartitionsAssigned(...) {
> > > >       if (rebalanceGoalAchieved()) {
> > > >         needsRebalance = false;
> > > >       }
> > > >     }
> > > > }
> > > >
> > > >
> > > > WDYT?
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Feb 11, 2020 at 3:59 PM Sophie Blee-Goldman <
> > sophie@confluent.io
> > > >
> > > > wrote:
> > > >
> > > > > Hey Boyang,
> > > > >
> > > > > Originally I had it as a nonblocking call, but decided to change it
> > to
> > > > > blocking
> > > > > with a timeout parameter. I'm not sure a future makes sense to
> return
> > > > here,
> > > > > because the rebalance either does or does not complete within the
> > > > timeout:
> > > > > if it does not, you will have to call poll again to complete it (as
> > is
> > > > the
> > > > > case with
> > > > > any other rebalance). I'll call this out in the javadocs as well.
> > > > >
> > > > > I also added an example demonstrating how/when to use this new API.
> > > > >
> > > > > Thanks!
> > > > >
> > > > > On Tue, Feb 11, 2020 at 1:49 PM Boyang Chen <
> > > reluctanthero104@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hey Sophie,
> > > > > >
> > > > > > is the `enforceRebalance` a blocking call? Could we add a code
> > sample
> > > > to
> > > > > > the KIP on how this API should be used?
> > > > > >
> > > > > > Returning a future instead of a boolean might be easier as we are
> > > > > allowing
> > > > > > consumer to make progress during rebalance after 429 IMHO.
> > > > > >
> > > > > > Boyang
> > > > > >
> > > > > >
> > > > > > On Tue, Feb 11, 2020 at 1:17 PM Konstantine Karantasis <
> > > > > > konstantine@confluent.io> wrote:
> > > > > >
> > > > > > > Thanks for the quick turnaround Sophie. My points have been
> > > > addressed.
> > > > > > > I think the intended use is quite clear now.
> > > > > > >
> > > > > > > Best,
> > > > > > > Konstantine
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Feb 11, 2020 at 12:57 PM Sophie Blee-Goldman <
> > > > > > sophie@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Konstantine,
> > > > > > > > Thanks for the feedback! I've updated the sections with your
> > > > > > > suggestions. I
> > > > > > > > agree
> > > > > > > > in particular that it's really important to make sure users
> > don't
> > > > > call
> > > > > > > this
> > > > > > > > unnecessarily,
> > > > > > > >  or for the wrong reasons: to that end I also extended the
> > > javadocs
> > > > > to
> > > > > > > > specify that this
> > > > > > > > API is for when changes to the subscription userdata occur.
> > > > Hopefully
> > > > > > > that
> > > > > > > > should make
> > > > > > > > its intended usage quite clear.
> > > > > > > >
> > > > > > > > Bill,
> > > > > > > > The rebalance triggered by this new API will be a "normal"
> > > > rebalance,
> > > > > > and
> > > > > > > > therefore
> > > > > > > > follow the existing listener semantics. For example a
> > cooperative
> > > > > > > rebalance
> > > > > > > > will always
> > > > > > > > call onPartitionsAssigned, even if no partitions are actually
> > > > moved.
> > > > > > > > An eager rebalance will still revoke all partitions first
> > anyway.
> > > > > > > >
> > > > > > > > Thanks for the feedback!
> > > > > > > > Sophie
> > > > > > > >
> > > > > > > > On Tue, Feb 11, 2020 at 9:52 AM Bill Bejeck <
> bbejeck@gmail.com
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Sophie,
> > > > > > > > >
> > > > > > > > > Thanks for the KIP, makes sense to me.
> > > > > > > > >
> > > > > > > > > One quick question, I'm not sure if it's relevant or not.
> > > > > > > > >
> > > > > > > > > If a user provides a `ConsumerRebalanceListener` and a
> > > rebalance
> > > > is
> > > > > > > > > triggered from an `enforceRebalance`  call,
> > > > > > > > > it seems possible the listener won't get called since
> > partition
> > > > > > > > assignments
> > > > > > > > > might not change.
> > > > > > > > > If that is the case, do we want to possibly consider
> adding a
> > > > > method
> > > > > > to
> > > > > > > > the
> > > > > > > > > `ConsumerRebalanceListener` for callbacks on
> > `enforceRebalance`
> > > > > > > actions?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Bill
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Feb 11, 2020 at 12:11 PM Konstantine Karantasis <
> > > > > > > > > konstantine@confluent.io> wrote:
> > > > > > > > >
> > > > > > > > > > Hi Sophie.
> > > > > > > > > >
> > > > > > > > > > Thanks for the KIP. I liked how focused the proposal is.
> > > Also,
> > > > > its
> > > > > > > > > > motivation is clear after carefully reading the KIP and
> its
> > > > > > > references.
> > > > > > > > > >
> > > > > > > > > > Yet, I think it'd be a good idea to call out explicitly
> on
> > > the
> > > > > > > Rejected
> > > > > > > > > > Alternatives section that an automatic and periodic
> > > triggering
> > > > of
> > > > > > > > > > rebalances that would not require exposing this
> capability
> > > > > through
> > > > > > > the
> > > > > > > > > > Consumer interface does not cover your specific use cases
> > and
> > > > > > > therefore
> > > > > > > > > is
> > > > > > > > > > not chosen as a desired approach. Maybe, even consider
> > > > mentioning
> > > > > > > again
> > > > > > > > > > here that this method is expected to be used to respond
> to
> > > > system
> > > > > > > > changes
> > > > > > > > > > external to the consumer and its membership logic and is
> > not
> > > > > > proposed
> > > > > > > > as
> > > > > > > > > a
> > > > > > > > > > way to resolve temporary imbalances due to membership
> > changes
> > > > > that
> > > > > > > > should
> > > > > > > > > > inherently be resolved by the assignor logic itself with
> > one
> > > or
> > > > > > more
> > > > > > > > > > consecutive rebalances.
> > > > > > > > > >
> > > > > > > > > > Also, in your javadoc I'd add some context similar to
> what
> > > > > someone
> > > > > > > can
> > > > > > > > > read
> > > > > > > > > > on the KIP. Specifically where you say: "for example if
> > some
> > > > > > > condition
> > > > > > > > > has
> > > > > > > > > > changed that has implications for the partition
> > assignment."
> > > > I'd
> > > > > > > rather
> > > > > > > > > add
> > > > > > > > > > something like "for example, if some condition external
> and
> > > > > > invisible
> > > > > > > > to
> > > > > > > > > > the Consumer and its group membership has changed in ways
> > > that
> > > > > > would
> > > > > > > > > > justify a new partition assignment". That's just an
> > example,
> > > > feel
> > > > > > > free
> > > > > > > > to
> > > > > > > > > > reword, but I believe that saying explicitly that this
> > > > condition
> > > > > is
> > > > > > > not
> > > > > > > > > > visible to the consumer is useful to understand that this
> > is
> > > > not
> > > > > > > > > necessary
> > > > > > > > > > under normal circumstances.
> > > > > > > > > >
> > > > > > > > > > In Compatibility, Deprecation, and Migration Plan
> section I
> > > > think
> > > > > > > it's
> > > > > > > > > > worth mentioning that this is a new feature that affects
> > new
> > > > > > > > > > implementations of the Consumer interface and any such
> new
> > > > > > > > implementation
> > > > > > > > > > should override the new method. Implementations that wish
> > to
> > > > > > upgrade
> > > > > > > > to a
> > > > > > > > > > newer version should be extended and recompiled, since no
> > > > default
> > > > > > > > > > implementation will be provided.
> > > > > > > > > >
> > > > > > > > > > Naming is hard here, if someone wants to emphasize the ad
> > hoc
> > > > and
> > > > > > > > > irregular
> > > > > > > > > > nature of this call. After some thought I'm fine with
> > > > > > > > 'enforceRebalance'
> > > > > > > > > > even if it could potentially be confused to a method that
> > is
> > > > > > supposed
> > > > > > > > to
> > > > > > > > > be
> > > > > > > > > > called to remediate one or more previously unsuccessful
> > > > > rebalances
> > > > > > > > (which
> > > > > > > > > > is partly what StreamThread#enforceRebalance is used
> for).
> > > The
> > > > > > best I
> > > > > > > > > could
> > > > > > > > > > think of was 'onRequestRebalance' but that's not perfect
> > > > either.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Konstantine
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <
> > > > > > > > sophie@confluent.io
> > > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thanks John. I took out the KafkaConsumer method and
> > moved
> > > > the
> > > > > > > > javadocs
> > > > > > > > > > > to the Consumer#enforceRebalance in the KIP -- hope
> > you're
> > > > > happy
> > > > > > :P
> > > > > > > > > > >
> > > > > > > > > > > Also, I wanted to point out one minor change to the
> > current
> > > > > > > proposal:
> > > > > > > > > > make
> > > > > > > > > > > this
> > > > > > > > > > > a blocking call, which accepts a timeout and returns
> > > whether
> > > > > the
> > > > > > > > > > rebalance
> > > > > > > > > > > completed within the timeout. It will still reduce to a
> > > > > > nonblocking
> > > > > > > > > call
> > > > > > > > > > if
> > > > > > > > > > > a "zero"
> > > > > > > > > > > timeout is supplied. I've updated the KIP accordingly.
> > > > > > > > > > >
> > > > > > > > > > > Let me know if there are any further concerns, else
> I'll
> > > call
> > > > > > for a
> > > > > > > > > vote.
> > > > > > > > > > >
> > > > > > > > > > > Cheers!
> > > > > > > > > > > Sophie
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Feb 10, 2020 at 12:47 PM John Roesler <
> > > > > > vvcephei@apache.org
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thanks Sophie,
> > > > > > > > > > > >
> > > > > > > > > > > > Sorry I didn't respond. I think your new method name
> > > sounds
> > > > > > good.
> > > > > > > > > > > >
> > > > > > > > > > > > Regarding the interface vs implementation, I agree
> it's
> > > > > > > confusing.
> > > > > > > > > It's
> > > > > > > > > > > > always bothered me that the interface redirects you
> to
> > an
> > > > > > > > > > implementation
> > > > > > > > > > > > JavaDocs, but never enough for me to stop what I'm
> > doing
> > > to
> > > > > fix
> > > > > > > it.
> > > > > > > > > > > > It's not a big deal either way, I just thought it was
> > > > strange
> > > > > > to
> > > > > > > > > > propose
> > > > > > > > > > > a
> > > > > > > > > > > > "public interface" change, but not in terms of the
> > actual
> > > > > > > interface
> > > > > > > > > > > class.
> > > > > > > > > > > >
> > > > > > > > > > > > It _is_ true that KafkaConsumer is also part of the
> > > public
> > > > > API,
> > > > > > > but
> > > > > > > > > > only
> > > > > > > > > > > > really
> > > > > > > > > > > > for the constructor. Any proposal to define a new
> > > "consumer
> > > > > > > client"
> > > > > > > > > API
> > > > > > > > > > > > should be on the Consumer interface (which you said
> you
> > > > plan
> > > > > to
> > > > > > > do
> > > > > > > > > > > anyway).
> > > > > > > > > > > > I guess I brought it up because proposing an addition
> > to
> > > > > > Consumer
> > > > > > > > > > implies
> > > > > > > > > > > > it would be added to KafkaConsumer, but proposing an
> > > > addition
> > > > > > to
> > > > > > > > > > > > KafkaConsumer does not necessarily imply it would
> also
> > be
> > > > > added
> > > > > > > to
> > > > > > > > > > > > Consumer. Does that make sense?
> > > > > > > > > > > >
> > > > > > > > > > > > Anyway, thanks for updating the KIP.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > -John
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman
> > > wrote:
> > > > > > > > > > > > > Since this doesn't seem too controversial, I'll
> > > probably
> > > > > call
> > > > > > > > for a
> > > > > > > > > > > vote
> > > > > > > > > > > > by
> > > > > > > > > > > > > end of day.
> > > > > > > > > > > > > If there any further comments/questions/concerns,
> > > please
> > > > > let
> > > > > > me
> > > > > > > > > know!
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Sophie
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Sat, Feb 8, 2020 at 12:19 AM Sophie
> Blee-Goldman <
> > > > > > > > > > > sophie@confluent.io
> > > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for the feedback! That's a good point
> about
> > > > trying
> > > > > > to
> > > > > > > > > > prevent
> > > > > > > > > > > > users
> > > > > > > > > > > > > > from
> > > > > > > > > > > > > > thinking they should use this API during normal
> > > > > processing
> > > > > > > and
> > > > > > > > > > > > clarifying
> > > > > > > > > > > > > > when/why
> > > > > > > > > > > > > > you might need it -- regardless of the method
> name,
> > > we
> > > > > > should
> > > > > > > > > > > > explicitly
> > > > > > > > > > > > > > call this out
> > > > > > > > > > > > > > in the javadocs.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > As for the method name, on reflection I agree
> that
> > > > > > > > "rejoinGroup"
> > > > > > > > > > does
> > > > > > > > > > > > not
> > > > > > > > > > > > > > seem to be
> > > > > > > > > > > > > > appropriate. Of course that's what the consumer
> > will
> > > > > > actually
> > > > > > > > be
> > > > > > > > > > > doing,
> > > > > > > > > > > > > > but that's just an
> > > > > > > > > > > > > > implementation detail -- the name should reflect
> > what
> > > > the
> > > > > > API
> > > > > > > > is
> > > > > > > > > > > doing,
> > > > > > > > > > > > > > not how it does it
> > > > > > > > > > > > > > (which can always change).
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > How about "enforceRebalance"? This is stolen from
> > the
> > > > > > > > > StreamThread
> > > > > > > > > > > > method
> > > > > > > > > > > > > > that does
> > > > > > > > > > > > > > exactly this (by unsubscribing) so it seems to
> fit.
> > > > I'll
> > > > > > > update
> > > > > > > > > the
> > > > > > > > > > > KIP
> > > > > > > > > > > > > > with this unless anyone
> > > > > > > > > > > > > > has another suggestion.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Regarding the Consumer vs KafkaConsumer matter, I
> > > > > included
> > > > > > > the
> > > > > > > > > > > > > > KafkaConsumer method
> > > > > > > > > > > > > > because that's where all the javadocs redirect to
> > in
> > > > the
> > > > > > > > Consumer
> > > > > > > > > > > > > > interface. Also, FWIW
> > > > > > > > > > > > > > I'm pretty sure KafkaConsumer is also part of the
> > > > public
> > > > > > API
> > > > > > > --
> > > > > > > > > we
> > > > > > > > > > > > would
> > > > > > > > > > > > > > be adding a new
> > > > > > > > > > > > > > method to both.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <
> > > > > > > > vvcephei@apache.org
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> Hi all,
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Thanks for the well motivated KIP, Sophie. I had
> > > some
> > > > > > > > > alternatives
> > > > > > > > > > > in
> > > > > > > > > > > > > >> mind, which
> > > > > > > > > > > > > >> I won't even bother to relate because I feel
> like
> > > the
> > > > > > > > motivation
> > > > > > > > > > > made
> > > > > > > > > > > > a
> > > > > > > > > > > > > >> compelling
> > > > > > > > > > > > > >> argument for the API as proposed.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> One very minor point you might as well fix is
> that
> > > the
> > > > > API
> > > > > > > > > change
> > > > > > > > > > is
> > > > > > > > > > > > > >> targeted at
> > > > > > > > > > > > > >> KafkaConsumer (the implementation), but should
> be
> > > > > targeted
> > > > > > > at
> > > > > > > > > > > > > >> Consumer (the interface).
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> I agree with your discomfort about the name.
> > Adding
> > > a
> > > > > > > "rejoin"
> > > > > > > > > > > method
> > > > > > > > > > > > > >> seems strange
> > > > > > > > > > > > > >> since there's no "join" method. Instead the way
> > you
> > > > join
> > > > > > the
> > > > > > > > > group
> > > > > > > > > > > the
> > > > > > > > > > > > > >> first time is just
> > > > > > > > > > > > > >> by calling "subscribe". But "resubscribe" seems
> > too
> > > > > > indirect
> > > > > > > > > from
> > > > > > > > > > > what
> > > > > > > > > > > > > >> we're really trying
> > > > > > > > > > > > > >> to do, which is to trigger a rebalance by
> sending
> > a
> > > > new
> > > > > > > > > JoinGroup
> > > > > > > > > > > > request.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Another angle is that we don't want the method
> to
> > > > sound
> > > > > > like
> > > > > > > > > > > something
> > > > > > > > > > > > > >> you should
> > > > > > > > > > > > > >> be calling in normal circumstances, or people
> will
> > > be
> > > > > > > > "tricked"
> > > > > > > > > > into
> > > > > > > > > > > > > >> calling it unnecessarily.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> So, I think "rejoinGroup" is fine, although a
> > person
> > > > > > _might_
> > > > > > > > be
> > > > > > > > > > > > forgiven
> > > > > > > > > > > > > >> for thinking they
> > > > > > > > > > > > > >> need to call it periodically or something. Did
> you
> > > > > > consider
> > > > > > > > > > > > > >> "triggerRebalance", which
> > > > > > > > > > > > > >> sounds pretty advanced-ish, and accurately
> > describes
> > > > > what
> > > > > > > > > happens
> > > > > > > > > > > when
> > > > > > > > > > > > > >> you call it?
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> All in all, the KIP sounds good to me, and I'm
> in
> > > > favor.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > > >> -John
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald
> > wrote:
> > > > > > > > > > > > > >> > This situation was discussed at length after a
> > > > recent
> > > > > > > talk I
> > > > > > > > > > gave.
> > > > > > > > > > > > This
> > > > > > > > > > > > > >> KIP
> > > > > > > > > > > > > >> > would be a great step towards increased
> > > availability
> > > > > and
> > > > > > > in
> > > > > > > > > > > > facilitating
> > > > > > > > > > > > > >> > lightweight rebalances.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > anna
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie
> > Blee-Goldman <
> > > > > > > > > > > > sophie@confluent.io>
> > > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > > Hi all,
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > In light of some recent and upcoming
> > rebalancing
> > > > and
> > > > > > > > > > > availability
> > > > > > > > > > > > > >> > > improvements, it seems we have a need for
> > > > explicitly
> > > > > > > > > > triggering
> > > > > > > > > > > a
> > > > > > > > > > > > > >> consumer
> > > > > > > > > > > > > >> > > group rebalance. Therefore I'd like to
> propose
> > > > > adding
> > > > > > a
> > > > > > > > new
> > > > > > > > > > > > > >> > > rejoinGroup()method
> > > > > > > > > > > > > >> > > to the Consumer client (better method name
> > > > > suggestions
> > > > > > > are
> > > > > > > > > > very
> > > > > > > > > > > > > >> welcome).
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > Please take a look at the KIP and let me
> know
> > > what
> > > > > you
> > > > > > > > > think!
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > KIP document:
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > JIRA:
> > > > > > https://issues.apache.org/jira/browse/KAFKA-9525
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > Cheers,
> > > > > > > > > > > > > >> > > Sophie
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Thanks Boyang -- makes sense to me. I've optimistically updated the KIP
with this new signature and behavior.


On Wed, Feb 12, 2020 at 4:27 PM Boyang Chen <re...@gmail.com>
wrote:

> Hey Sophie,
>
> I'm satisfied with making enforceRebalance() not throwing any exception
> other than illegal state. You could imagine this KIP is just making the
> `rejoinNeededOrPending` external to user requests. Make it as lightweight
> as possible makes sense.
>
> Boyang
>
> On Wed, Feb 12, 2020 at 2:14 PM Sophie Blee-Goldman <so...@confluent.io>
> wrote:
>
> > Hey Guozhang, thanks for the thorough reply!
> >
> > I definitely went back and forth on whether to make it a blocking call,
> > and ultimately went with blocking just to leave it open to potential
> future
> > use cases (in particular non-Streams apps). But on second (or third)
> > thought I think I agree that no use case wouldn't be similarly covered by
> > just calling poll() immediately after enforceRebalance(). It seems best
> to
> > leave all rebalancing action within the scope of poll alone and avoid
> > introducing unnecessary complexity -- happy to revert this then.
> >
> > I think that ends up addressing most of your other concerns, although
> > there's one I would push back on: this method should still explicitly
> > call out whether a rebalance is already in progress and the call is thus
> > a no-op. If throwing a RebalanceInProgressException seems too
> > heavy maybe we can just return a boolean indicating whether a new
> > rebalance was triggered or not.
> >
> > The snippet you included does work around this, by checking the
> > condition again in the rebalance listener. But I would argue that a) many
> > applications don't use a rebalance listener, and shouldn't be forced to
> > implement it to fully use this new API. More importantly, since you can
> > probably use the assignor's onAssignment method to achieve the same
> > thing, b) it adds unnecessary complexity, and as we've seen in Streams
> > the interactions between the rebalance callbacks and main consumer
> > can already get quite ugly.
> >
> > For simplicity's sake then, I'll propose to just return the bool over the
> > exception and change the signature to
> >
> > /**
> >  * @return Whether a new rebalance was triggered (false if a rebalance
> was
> > already in progress)
> >  * @throws java.lang.IllegalStateException if the consumer does not use
> > group subscription
> >  */
> > boolean enforceRebalance();
> >
> > Thoughts?
> >
> > On Tue, Feb 11, 2020 at 5:29 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello Sophie, thanks for brining up this KIP, and the great write-up
> > > summarizing the motivations of the proposal. Here are some comments:
> > >
> > > Minor:
> > >
> > > 1. If we want to make it a blocking call (I have some thoughts about
> this
> > > below :), to be consistent we need to consider having two overloaded
> > > function, one without the timeout which then relies on `
> > > DEFAULT_API_TIMEOUT_MS_CONFIG`.
> > >
> > > 2. Also I'd suggest that, again for API consistency, we a) throw
> > > TimeoutException if the operation cannot be completed within the
> timeout
> > > value, b) return false immediately if we cannot trigger a rebalance
> > either
> > > because coordinator is unknown.
> > >
> > > Meta:
> > >
> > > 3. I'm not sure if we have a concrete scenario that we want to wait
> until
> > > the rebalance is completed in KIP-441 / 268, rather than calling
> > > "consumer.enforceRebalance(); consumer.poll()" consecutively and try to
> > > execute the rebalance in the poll call? If there's no valid motivations
> > I'm
> > > still a bit inclined to make it non-blocking (i.e. just setting a bit
> and
> > > then execute the process in the later poll call) similar to our `seek`
> > > functions. By doing this we can also make this function simpler as it
> > would
> > > never throw RebalanceInProgress or Timeout or even KafkaExceptions.
> > >
> > > 4. Re: the case "when a rebalance is already in progress", this may be
> > > related to 3) above. I think we can simplify this case as well but just
> > not
> > > triggering a new rebalance and let the the caller handle it: for
> example
> > in
> > > KIP-441, in each iteration of the stream thread, we can if a standby
> task
> > > is ready, and if yes we call `enforceRebalance`, if there's already a
> > > rebalance in progress (either with the new subscription metadata, or
> not)
> > > this call would be a no-op, and then in the next iteration we would
> just
> > > call that function again, and eventually we would trigger the rebalance
> > > with the new subscription metadata and previous calls would be no-op
> and
> > > hence no cost anyways. I feel this would be simpler than letting the
> > caller
> > > to capture RebalanceInProgressException:
> > >
> > >
> > > mainProcessingLoop() {
> > >     if (needsRebalance) {
> > >         consumer.enforceRebalance();
> > >     }
> > >
> > >     records = consumer.poll();
> > >     ...
> > >     // do some processing
> > > }
> > >
> > > RebalanceListener {
> > >
> > >    onPartitionsAssigned(...) {
> > >       if (rebalanceGoalAchieved()) {
> > >         needsRebalance = false;
> > >       }
> > >     }
> > > }
> > >
> > >
> > > WDYT?
> > >
> > >
> > >
> > >
> > > On Tue, Feb 11, 2020 at 3:59 PM Sophie Blee-Goldman <
> sophie@confluent.io
> > >
> > > wrote:
> > >
> > > > Hey Boyang,
> > > >
> > > > Originally I had it as a nonblocking call, but decided to change it
> to
> > > > blocking
> > > > with a timeout parameter. I'm not sure a future makes sense to return
> > > here,
> > > > because the rebalance either does or does not complete within the
> > > timeout:
> > > > if it does not, you will have to call poll again to complete it (as
> is
> > > the
> > > > case with
> > > > any other rebalance). I'll call this out in the javadocs as well.
> > > >
> > > > I also added an example demonstrating how/when to use this new API.
> > > >
> > > > Thanks!
> > > >
> > > > On Tue, Feb 11, 2020 at 1:49 PM Boyang Chen <
> > reluctanthero104@gmail.com>
> > > > wrote:
> > > >
> > > > > Hey Sophie,
> > > > >
> > > > > is the `enforceRebalance` a blocking call? Could we add a code
> sample
> > > to
> > > > > the KIP on how this API should be used?
> > > > >
> > > > > Returning a future instead of a boolean might be easier as we are
> > > > allowing
> > > > > consumer to make progress during rebalance after 429 IMHO.
> > > > >
> > > > > Boyang
> > > > >
> > > > >
> > > > > On Tue, Feb 11, 2020 at 1:17 PM Konstantine Karantasis <
> > > > > konstantine@confluent.io> wrote:
> > > > >
> > > > > > Thanks for the quick turnaround Sophie. My points have been
> > > addressed.
> > > > > > I think the intended use is quite clear now.
> > > > > >
> > > > > > Best,
> > > > > > Konstantine
> > > > > >
> > > > > >
> > > > > > On Tue, Feb 11, 2020 at 12:57 PM Sophie Blee-Goldman <
> > > > > sophie@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > Konstantine,
> > > > > > > Thanks for the feedback! I've updated the sections with your
> > > > > > suggestions. I
> > > > > > > agree
> > > > > > > in particular that it's really important to make sure users
> don't
> > > > call
> > > > > > this
> > > > > > > unnecessarily,
> > > > > > >  or for the wrong reasons: to that end I also extended the
> > javadocs
> > > > to
> > > > > > > specify that this
> > > > > > > API is for when changes to the subscription userdata occur.
> > > Hopefully
> > > > > > that
> > > > > > > should make
> > > > > > > its intended usage quite clear.
> > > > > > >
> > > > > > > Bill,
> > > > > > > The rebalance triggered by this new API will be a "normal"
> > > rebalance,
> > > > > and
> > > > > > > therefore
> > > > > > > follow the existing listener semantics. For example a
> cooperative
> > > > > > rebalance
> > > > > > > will always
> > > > > > > call onPartitionsAssigned, even if no partitions are actually
> > > moved.
> > > > > > > An eager rebalance will still revoke all partitions first
> anyway.
> > > > > > >
> > > > > > > Thanks for the feedback!
> > > > > > > Sophie
> > > > > > >
> > > > > > > On Tue, Feb 11, 2020 at 9:52 AM Bill Bejeck <bbejeck@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Sophie,
> > > > > > > >
> > > > > > > > Thanks for the KIP, makes sense to me.
> > > > > > > >
> > > > > > > > One quick question, I'm not sure if it's relevant or not.
> > > > > > > >
> > > > > > > > If a user provides a `ConsumerRebalanceListener` and a
> > rebalance
> > > is
> > > > > > > > triggered from an `enforceRebalance`  call,
> > > > > > > > it seems possible the listener won't get called since
> partition
> > > > > > > assignments
> > > > > > > > might not change.
> > > > > > > > If that is the case, do we want to possibly consider adding a
> > > > method
> > > > > to
> > > > > > > the
> > > > > > > > `ConsumerRebalanceListener` for callbacks on
> `enforceRebalance`
> > > > > > actions?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Bill
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Feb 11, 2020 at 12:11 PM Konstantine Karantasis <
> > > > > > > > konstantine@confluent.io> wrote:
> > > > > > > >
> > > > > > > > > Hi Sophie.
> > > > > > > > >
> > > > > > > > > Thanks for the KIP. I liked how focused the proposal is.
> > Also,
> > > > its
> > > > > > > > > motivation is clear after carefully reading the KIP and its
> > > > > > references.
> > > > > > > > >
> > > > > > > > > Yet, I think it'd be a good idea to call out explicitly on
> > the
> > > > > > Rejected
> > > > > > > > > Alternatives section that an automatic and periodic
> > triggering
> > > of
> > > > > > > > > rebalances that would not require exposing this capability
> > > > through
> > > > > > the
> > > > > > > > > Consumer interface does not cover your specific use cases
> and
> > > > > > therefore
> > > > > > > > is
> > > > > > > > > not chosen as a desired approach. Maybe, even consider
> > > mentioning
> > > > > > again
> > > > > > > > > here that this method is expected to be used to respond to
> > > system
> > > > > > > changes
> > > > > > > > > external to the consumer and its membership logic and is
> not
> > > > > proposed
> > > > > > > as
> > > > > > > > a
> > > > > > > > > way to resolve temporary imbalances due to membership
> changes
> > > > that
> > > > > > > should
> > > > > > > > > inherently be resolved by the assignor logic itself with
> one
> > or
> > > > > more
> > > > > > > > > consecutive rebalances.
> > > > > > > > >
> > > > > > > > > Also, in your javadoc I'd add some context similar to what
> > > > someone
> > > > > > can
> > > > > > > > read
> > > > > > > > > on the KIP. Specifically where you say: "for example if
> some
> > > > > > condition
> > > > > > > > has
> > > > > > > > > changed that has implications for the partition
> assignment."
> > > I'd
> > > > > > rather
> > > > > > > > add
> > > > > > > > > something like "for example, if some condition external and
> > > > > invisible
> > > > > > > to
> > > > > > > > > the Consumer and its group membership has changed in ways
> > that
> > > > > would
> > > > > > > > > justify a new partition assignment". That's just an
> example,
> > > feel
> > > > > > free
> > > > > > > to
> > > > > > > > > reword, but I believe that saying explicitly that this
> > > condition
> > > > is
> > > > > > not
> > > > > > > > > visible to the consumer is useful to understand that this
> is
> > > not
> > > > > > > > necessary
> > > > > > > > > under normal circumstances.
> > > > > > > > >
> > > > > > > > > In Compatibility, Deprecation, and Migration Plan section I
> > > think
> > > > > > it's
> > > > > > > > > worth mentioning that this is a new feature that affects
> new
> > > > > > > > > implementations of the Consumer interface and any such new
> > > > > > > implementation
> > > > > > > > > should override the new method. Implementations that wish
> to
> > > > > upgrade
> > > > > > > to a
> > > > > > > > > newer version should be extended and recompiled, since no
> > > default
> > > > > > > > > implementation will be provided.
> > > > > > > > >
> > > > > > > > > Naming is hard here, if someone wants to emphasize the ad
> hoc
> > > and
> > > > > > > > irregular
> > > > > > > > > nature of this call. After some thought I'm fine with
> > > > > > > 'enforceRebalance'
> > > > > > > > > even if it could potentially be confused to a method that
> is
> > > > > supposed
> > > > > > > to
> > > > > > > > be
> > > > > > > > > called to remediate one or more previously unsuccessful
> > > > rebalances
> > > > > > > (which
> > > > > > > > > is partly what StreamThread#enforceRebalance is used for).
> > The
> > > > > best I
> > > > > > > > could
> > > > > > > > > think of was 'onRequestRebalance' but that's not perfect
> > > either.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Konstantine
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <
> > > > > > > sophie@confluent.io
> > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks John. I took out the KafkaConsumer method and
> moved
> > > the
> > > > > > > javadocs
> > > > > > > > > > to the Consumer#enforceRebalance in the KIP -- hope
> you're
> > > > happy
> > > > > :P
> > > > > > > > > >
> > > > > > > > > > Also, I wanted to point out one minor change to the
> current
> > > > > > proposal:
> > > > > > > > > make
> > > > > > > > > > this
> > > > > > > > > > a blocking call, which accepts a timeout and returns
> > whether
> > > > the
> > > > > > > > > rebalance
> > > > > > > > > > completed within the timeout. It will still reduce to a
> > > > > nonblocking
> > > > > > > > call
> > > > > > > > > if
> > > > > > > > > > a "zero"
> > > > > > > > > > timeout is supplied. I've updated the KIP accordingly.
> > > > > > > > > >
> > > > > > > > > > Let me know if there are any further concerns, else I'll
> > call
> > > > > for a
> > > > > > > > vote.
> > > > > > > > > >
> > > > > > > > > > Cheers!
> > > > > > > > > > Sophie
> > > > > > > > > >
> > > > > > > > > > On Mon, Feb 10, 2020 at 12:47 PM John Roesler <
> > > > > vvcephei@apache.org
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thanks Sophie,
> > > > > > > > > > >
> > > > > > > > > > > Sorry I didn't respond. I think your new method name
> > sounds
> > > > > good.
> > > > > > > > > > >
> > > > > > > > > > > Regarding the interface vs implementation, I agree it's
> > > > > > confusing.
> > > > > > > > It's
> > > > > > > > > > > always bothered me that the interface redirects you to
> an
> > > > > > > > > implementation
> > > > > > > > > > > JavaDocs, but never enough for me to stop what I'm
> doing
> > to
> > > > fix
> > > > > > it.
> > > > > > > > > > > It's not a big deal either way, I just thought it was
> > > strange
> > > > > to
> > > > > > > > > propose
> > > > > > > > > > a
> > > > > > > > > > > "public interface" change, but not in terms of the
> actual
> > > > > > interface
> > > > > > > > > > class.
> > > > > > > > > > >
> > > > > > > > > > > It _is_ true that KafkaConsumer is also part of the
> > public
> > > > API,
> > > > > > but
> > > > > > > > > only
> > > > > > > > > > > really
> > > > > > > > > > > for the constructor. Any proposal to define a new
> > "consumer
> > > > > > client"
> > > > > > > > API
> > > > > > > > > > > should be on the Consumer interface (which you said you
> > > plan
> > > > to
> > > > > > do
> > > > > > > > > > anyway).
> > > > > > > > > > > I guess I brought it up because proposing an addition
> to
> > > > > Consumer
> > > > > > > > > implies
> > > > > > > > > > > it would be added to KafkaConsumer, but proposing an
> > > addition
> > > > > to
> > > > > > > > > > > KafkaConsumer does not necessarily imply it would also
> be
> > > > added
> > > > > > to
> > > > > > > > > > > Consumer. Does that make sense?
> > > > > > > > > > >
> > > > > > > > > > > Anyway, thanks for updating the KIP.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > -John
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman
> > wrote:
> > > > > > > > > > > > Since this doesn't seem too controversial, I'll
> > probably
> > > > call
> > > > > > > for a
> > > > > > > > > > vote
> > > > > > > > > > > by
> > > > > > > > > > > > end of day.
> > > > > > > > > > > > If there any further comments/questions/concerns,
> > please
> > > > let
> > > > > me
> > > > > > > > know!
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Sophie
> > > > > > > > > > > >
> > > > > > > > > > > > On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <
> > > > > > > > > > sophie@confluent.io
> > > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the feedback! That's a good point about
> > > trying
> > > > > to
> > > > > > > > > prevent
> > > > > > > > > > > users
> > > > > > > > > > > > > from
> > > > > > > > > > > > > thinking they should use this API during normal
> > > > processing
> > > > > > and
> > > > > > > > > > > clarifying
> > > > > > > > > > > > > when/why
> > > > > > > > > > > > > you might need it -- regardless of the method name,
> > we
> > > > > should
> > > > > > > > > > > explicitly
> > > > > > > > > > > > > call this out
> > > > > > > > > > > > > in the javadocs.
> > > > > > > > > > > > >
> > > > > > > > > > > > > As for the method name, on reflection I agree that
> > > > > > > "rejoinGroup"
> > > > > > > > > does
> > > > > > > > > > > not
> > > > > > > > > > > > > seem to be
> > > > > > > > > > > > > appropriate. Of course that's what the consumer
> will
> > > > > actually
> > > > > > > be
> > > > > > > > > > doing,
> > > > > > > > > > > > > but that's just an
> > > > > > > > > > > > > implementation detail -- the name should reflect
> what
> > > the
> > > > > API
> > > > > > > is
> > > > > > > > > > doing,
> > > > > > > > > > > > > not how it does it
> > > > > > > > > > > > > (which can always change).
> > > > > > > > > > > > >
> > > > > > > > > > > > > How about "enforceRebalance"? This is stolen from
> the
> > > > > > > > StreamThread
> > > > > > > > > > > method
> > > > > > > > > > > > > that does
> > > > > > > > > > > > > exactly this (by unsubscribing) so it seems to fit.
> > > I'll
> > > > > > update
> > > > > > > > the
> > > > > > > > > > KIP
> > > > > > > > > > > > > with this unless anyone
> > > > > > > > > > > > > has another suggestion.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regarding the Consumer vs KafkaConsumer matter, I
> > > > included
> > > > > > the
> > > > > > > > > > > > > KafkaConsumer method
> > > > > > > > > > > > > because that's where all the javadocs redirect to
> in
> > > the
> > > > > > > Consumer
> > > > > > > > > > > > > interface. Also, FWIW
> > > > > > > > > > > > > I'm pretty sure KafkaConsumer is also part of the
> > > public
> > > > > API
> > > > > > --
> > > > > > > > we
> > > > > > > > > > > would
> > > > > > > > > > > > > be adding a new
> > > > > > > > > > > > > method to both.
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <
> > > > > > > vvcephei@apache.org
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > >> Hi all,
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Thanks for the well motivated KIP, Sophie. I had
> > some
> > > > > > > > alternatives
> > > > > > > > > > in
> > > > > > > > > > > > >> mind, which
> > > > > > > > > > > > >> I won't even bother to relate because I feel like
> > the
> > > > > > > motivation
> > > > > > > > > > made
> > > > > > > > > > > a
> > > > > > > > > > > > >> compelling
> > > > > > > > > > > > >> argument for the API as proposed.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> One very minor point you might as well fix is that
> > the
> > > > API
> > > > > > > > change
> > > > > > > > > is
> > > > > > > > > > > > >> targeted at
> > > > > > > > > > > > >> KafkaConsumer (the implementation), but should be
> > > > targeted
> > > > > > at
> > > > > > > > > > > > >> Consumer (the interface).
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> I agree with your discomfort about the name.
> Adding
> > a
> > > > > > "rejoin"
> > > > > > > > > > method
> > > > > > > > > > > > >> seems strange
> > > > > > > > > > > > >> since there's no "join" method. Instead the way
> you
> > > join
> > > > > the
> > > > > > > > group
> > > > > > > > > > the
> > > > > > > > > > > > >> first time is just
> > > > > > > > > > > > >> by calling "subscribe". But "resubscribe" seems
> too
> > > > > indirect
> > > > > > > > from
> > > > > > > > > > what
> > > > > > > > > > > > >> we're really trying
> > > > > > > > > > > > >> to do, which is to trigger a rebalance by sending
> a
> > > new
> > > > > > > > JoinGroup
> > > > > > > > > > > request.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Another angle is that we don't want the method to
> > > sound
> > > > > like
> > > > > > > > > > something
> > > > > > > > > > > > >> you should
> > > > > > > > > > > > >> be calling in normal circumstances, or people will
> > be
> > > > > > > "tricked"
> > > > > > > > > into
> > > > > > > > > > > > >> calling it unnecessarily.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> So, I think "rejoinGroup" is fine, although a
> person
> > > > > _might_
> > > > > > > be
> > > > > > > > > > > forgiven
> > > > > > > > > > > > >> for thinking they
> > > > > > > > > > > > >> need to call it periodically or something. Did you
> > > > > consider
> > > > > > > > > > > > >> "triggerRebalance", which
> > > > > > > > > > > > >> sounds pretty advanced-ish, and accurately
> describes
> > > > what
> > > > > > > > happens
> > > > > > > > > > when
> > > > > > > > > > > > >> you call it?
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> All in all, the KIP sounds good to me, and I'm in
> > > favor.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > >> -John
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald
> wrote:
> > > > > > > > > > > > >> > This situation was discussed at length after a
> > > recent
> > > > > > talk I
> > > > > > > > > gave.
> > > > > > > > > > > This
> > > > > > > > > > > > >> KIP
> > > > > > > > > > > > >> > would be a great step towards increased
> > availability
> > > > and
> > > > > > in
> > > > > > > > > > > facilitating
> > > > > > > > > > > > >> > lightweight rebalances.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > anna
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie
> Blee-Goldman <
> > > > > > > > > > > sophie@confluent.io>
> > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > > Hi all,
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > In light of some recent and upcoming
> rebalancing
> > > and
> > > > > > > > > > availability
> > > > > > > > > > > > >> > > improvements, it seems we have a need for
> > > explicitly
> > > > > > > > > triggering
> > > > > > > > > > a
> > > > > > > > > > > > >> consumer
> > > > > > > > > > > > >> > > group rebalance. Therefore I'd like to propose
> > > > adding
> > > > > a
> > > > > > > new
> > > > > > > > > > > > >> > > rejoinGroup()method
> > > > > > > > > > > > >> > > to the Consumer client (better method name
> > > > suggestions
> > > > > > are
> > > > > > > > > very
> > > > > > > > > > > > >> welcome).
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > Please take a look at the KIP and let me know
> > what
> > > > you
> > > > > > > > think!
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > KIP document:
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > JIRA:
> > > > > https://issues.apache.org/jira/browse/KAFKA-9525
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > Cheers,
> > > > > > > > > > > > >> > > Sophie
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Boyang Chen <re...@gmail.com>.
Hey Sophie,

I'm satisfied with making enforceRebalance() not throwing any exception
other than illegal state. You could imagine this KIP is just making the
`rejoinNeededOrPending` external to user requests. Make it as lightweight
as possible makes sense.

Boyang

On Wed, Feb 12, 2020 at 2:14 PM Sophie Blee-Goldman <so...@confluent.io>
wrote:

> Hey Guozhang, thanks for the thorough reply!
>
> I definitely went back and forth on whether to make it a blocking call,
> and ultimately went with blocking just to leave it open to potential future
> use cases (in particular non-Streams apps). But on second (or third)
> thought I think I agree that no use case wouldn't be similarly covered by
> just calling poll() immediately after enforceRebalance(). It seems best to
> leave all rebalancing action within the scope of poll alone and avoid
> introducing unnecessary complexity -- happy to revert this then.
>
> I think that ends up addressing most of your other concerns, although
> there's one I would push back on: this method should still explicitly
> call out whether a rebalance is already in progress and the call is thus
> a no-op. If throwing a RebalanceInProgressException seems too
> heavy maybe we can just return a boolean indicating whether a new
> rebalance was triggered or not.
>
> The snippet you included does work around this, by checking the
> condition again in the rebalance listener. But I would argue that a) many
> applications don't use a rebalance listener, and shouldn't be forced to
> implement it to fully use this new API. More importantly, since you can
> probably use the assignor's onAssignment method to achieve the same
> thing, b) it adds unnecessary complexity, and as we've seen in Streams
> the interactions between the rebalance callbacks and main consumer
> can already get quite ugly.
>
> For simplicity's sake then, I'll propose to just return the bool over the
> exception and change the signature to
>
> /**
>  * @return Whether a new rebalance was triggered (false if a rebalance was
> already in progress)
>  * @throws java.lang.IllegalStateException if the consumer does not use
> group subscription
>  */
> boolean enforceRebalance();
>
> Thoughts?
>
> On Tue, Feb 11, 2020 at 5:29 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Sophie, thanks for brining up this KIP, and the great write-up
> > summarizing the motivations of the proposal. Here are some comments:
> >
> > Minor:
> >
> > 1. If we want to make it a blocking call (I have some thoughts about this
> > below :), to be consistent we need to consider having two overloaded
> > function, one without the timeout which then relies on `
> > DEFAULT_API_TIMEOUT_MS_CONFIG`.
> >
> > 2. Also I'd suggest that, again for API consistency, we a) throw
> > TimeoutException if the operation cannot be completed within the timeout
> > value, b) return false immediately if we cannot trigger a rebalance
> either
> > because coordinator is unknown.
> >
> > Meta:
> >
> > 3. I'm not sure if we have a concrete scenario that we want to wait until
> > the rebalance is completed in KIP-441 / 268, rather than calling
> > "consumer.enforceRebalance(); consumer.poll()" consecutively and try to
> > execute the rebalance in the poll call? If there's no valid motivations
> I'm
> > still a bit inclined to make it non-blocking (i.e. just setting a bit and
> > then execute the process in the later poll call) similar to our `seek`
> > functions. By doing this we can also make this function simpler as it
> would
> > never throw RebalanceInProgress or Timeout or even KafkaExceptions.
> >
> > 4. Re: the case "when a rebalance is already in progress", this may be
> > related to 3) above. I think we can simplify this case as well but just
> not
> > triggering a new rebalance and let the the caller handle it: for example
> in
> > KIP-441, in each iteration of the stream thread, we can if a standby task
> > is ready, and if yes we call `enforceRebalance`, if there's already a
> > rebalance in progress (either with the new subscription metadata, or not)
> > this call would be a no-op, and then in the next iteration we would just
> > call that function again, and eventually we would trigger the rebalance
> > with the new subscription metadata and previous calls would be no-op and
> > hence no cost anyways. I feel this would be simpler than letting the
> caller
> > to capture RebalanceInProgressException:
> >
> >
> > mainProcessingLoop() {
> >     if (needsRebalance) {
> >         consumer.enforceRebalance();
> >     }
> >
> >     records = consumer.poll();
> >     ...
> >     // do some processing
> > }
> >
> > RebalanceListener {
> >
> >    onPartitionsAssigned(...) {
> >       if (rebalanceGoalAchieved()) {
> >         needsRebalance = false;
> >       }
> >     }
> > }
> >
> >
> > WDYT?
> >
> >
> >
> >
> > On Tue, Feb 11, 2020 at 3:59 PM Sophie Blee-Goldman <sophie@confluent.io
> >
> > wrote:
> >
> > > Hey Boyang,
> > >
> > > Originally I had it as a nonblocking call, but decided to change it to
> > > blocking
> > > with a timeout parameter. I'm not sure a future makes sense to return
> > here,
> > > because the rebalance either does or does not complete within the
> > timeout:
> > > if it does not, you will have to call poll again to complete it (as is
> > the
> > > case with
> > > any other rebalance). I'll call this out in the javadocs as well.
> > >
> > > I also added an example demonstrating how/when to use this new API.
> > >
> > > Thanks!
> > >
> > > On Tue, Feb 11, 2020 at 1:49 PM Boyang Chen <
> reluctanthero104@gmail.com>
> > > wrote:
> > >
> > > > Hey Sophie,
> > > >
> > > > is the `enforceRebalance` a blocking call? Could we add a code sample
> > to
> > > > the KIP on how this API should be used?
> > > >
> > > > Returning a future instead of a boolean might be easier as we are
> > > allowing
> > > > consumer to make progress during rebalance after 429 IMHO.
> > > >
> > > > Boyang
> > > >
> > > >
> > > > On Tue, Feb 11, 2020 at 1:17 PM Konstantine Karantasis <
> > > > konstantine@confluent.io> wrote:
> > > >
> > > > > Thanks for the quick turnaround Sophie. My points have been
> > addressed.
> > > > > I think the intended use is quite clear now.
> > > > >
> > > > > Best,
> > > > > Konstantine
> > > > >
> > > > >
> > > > > On Tue, Feb 11, 2020 at 12:57 PM Sophie Blee-Goldman <
> > > > sophie@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Konstantine,
> > > > > > Thanks for the feedback! I've updated the sections with your
> > > > > suggestions. I
> > > > > > agree
> > > > > > in particular that it's really important to make sure users don't
> > > call
> > > > > this
> > > > > > unnecessarily,
> > > > > >  or for the wrong reasons: to that end I also extended the
> javadocs
> > > to
> > > > > > specify that this
> > > > > > API is for when changes to the subscription userdata occur.
> > Hopefully
> > > > > that
> > > > > > should make
> > > > > > its intended usage quite clear.
> > > > > >
> > > > > > Bill,
> > > > > > The rebalance triggered by this new API will be a "normal"
> > rebalance,
> > > > and
> > > > > > therefore
> > > > > > follow the existing listener semantics. For example a cooperative
> > > > > rebalance
> > > > > > will always
> > > > > > call onPartitionsAssigned, even if no partitions are actually
> > moved.
> > > > > > An eager rebalance will still revoke all partitions first anyway.
> > > > > >
> > > > > > Thanks for the feedback!
> > > > > > Sophie
> > > > > >
> > > > > > On Tue, Feb 11, 2020 at 9:52 AM Bill Bejeck <bb...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi Sophie,
> > > > > > >
> > > > > > > Thanks for the KIP, makes sense to me.
> > > > > > >
> > > > > > > One quick question, I'm not sure if it's relevant or not.
> > > > > > >
> > > > > > > If a user provides a `ConsumerRebalanceListener` and a
> rebalance
> > is
> > > > > > > triggered from an `enforceRebalance`  call,
> > > > > > > it seems possible the listener won't get called since partition
> > > > > > assignments
> > > > > > > might not change.
> > > > > > > If that is the case, do we want to possibly consider adding a
> > > method
> > > > to
> > > > > > the
> > > > > > > `ConsumerRebalanceListener` for callbacks on `enforceRebalance`
> > > > > actions?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Bill
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Feb 11, 2020 at 12:11 PM Konstantine Karantasis <
> > > > > > > konstantine@confluent.io> wrote:
> > > > > > >
> > > > > > > > Hi Sophie.
> > > > > > > >
> > > > > > > > Thanks for the KIP. I liked how focused the proposal is.
> Also,
> > > its
> > > > > > > > motivation is clear after carefully reading the KIP and its
> > > > > references.
> > > > > > > >
> > > > > > > > Yet, I think it'd be a good idea to call out explicitly on
> the
> > > > > Rejected
> > > > > > > > Alternatives section that an automatic and periodic
> triggering
> > of
> > > > > > > > rebalances that would not require exposing this capability
> > > through
> > > > > the
> > > > > > > > Consumer interface does not cover your specific use cases and
> > > > > therefore
> > > > > > > is
> > > > > > > > not chosen as a desired approach. Maybe, even consider
> > mentioning
> > > > > again
> > > > > > > > here that this method is expected to be used to respond to
> > system
> > > > > > changes
> > > > > > > > external to the consumer and its membership logic and is not
> > > > proposed
> > > > > > as
> > > > > > > a
> > > > > > > > way to resolve temporary imbalances due to membership changes
> > > that
> > > > > > should
> > > > > > > > inherently be resolved by the assignor logic itself with one
> or
> > > > more
> > > > > > > > consecutive rebalances.
> > > > > > > >
> > > > > > > > Also, in your javadoc I'd add some context similar to what
> > > someone
> > > > > can
> > > > > > > read
> > > > > > > > on the KIP. Specifically where you say: "for example if some
> > > > > condition
> > > > > > > has
> > > > > > > > changed that has implications for the partition assignment."
> > I'd
> > > > > rather
> > > > > > > add
> > > > > > > > something like "for example, if some condition external and
> > > > invisible
> > > > > > to
> > > > > > > > the Consumer and its group membership has changed in ways
> that
> > > > would
> > > > > > > > justify a new partition assignment". That's just an example,
> > feel
> > > > > free
> > > > > > to
> > > > > > > > reword, but I believe that saying explicitly that this
> > condition
> > > is
> > > > > not
> > > > > > > > visible to the consumer is useful to understand that this is
> > not
> > > > > > > necessary
> > > > > > > > under normal circumstances.
> > > > > > > >
> > > > > > > > In Compatibility, Deprecation, and Migration Plan section I
> > think
> > > > > it's
> > > > > > > > worth mentioning that this is a new feature that affects new
> > > > > > > > implementations of the Consumer interface and any such new
> > > > > > implementation
> > > > > > > > should override the new method. Implementations that wish to
> > > > upgrade
> > > > > > to a
> > > > > > > > newer version should be extended and recompiled, since no
> > default
> > > > > > > > implementation will be provided.
> > > > > > > >
> > > > > > > > Naming is hard here, if someone wants to emphasize the ad hoc
> > and
> > > > > > > irregular
> > > > > > > > nature of this call. After some thought I'm fine with
> > > > > > 'enforceRebalance'
> > > > > > > > even if it could potentially be confused to a method that is
> > > > supposed
> > > > > > to
> > > > > > > be
> > > > > > > > called to remediate one or more previously unsuccessful
> > > rebalances
> > > > > > (which
> > > > > > > > is partly what StreamThread#enforceRebalance is used for).
> The
> > > > best I
> > > > > > > could
> > > > > > > > think of was 'onRequestRebalance' but that's not perfect
> > either.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Konstantine
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <
> > > > > > sophie@confluent.io
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks John. I took out the KafkaConsumer method and moved
> > the
> > > > > > javadocs
> > > > > > > > > to the Consumer#enforceRebalance in the KIP -- hope you're
> > > happy
> > > > :P
> > > > > > > > >
> > > > > > > > > Also, I wanted to point out one minor change to the current
> > > > > proposal:
> > > > > > > > make
> > > > > > > > > this
> > > > > > > > > a blocking call, which accepts a timeout and returns
> whether
> > > the
> > > > > > > > rebalance
> > > > > > > > > completed within the timeout. It will still reduce to a
> > > > nonblocking
> > > > > > > call
> > > > > > > > if
> > > > > > > > > a "zero"
> > > > > > > > > timeout is supplied. I've updated the KIP accordingly.
> > > > > > > > >
> > > > > > > > > Let me know if there are any further concerns, else I'll
> call
> > > > for a
> > > > > > > vote.
> > > > > > > > >
> > > > > > > > > Cheers!
> > > > > > > > > Sophie
> > > > > > > > >
> > > > > > > > > On Mon, Feb 10, 2020 at 12:47 PM John Roesler <
> > > > vvcephei@apache.org
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks Sophie,
> > > > > > > > > >
> > > > > > > > > > Sorry I didn't respond. I think your new method name
> sounds
> > > > good.
> > > > > > > > > >
> > > > > > > > > > Regarding the interface vs implementation, I agree it's
> > > > > confusing.
> > > > > > > It's
> > > > > > > > > > always bothered me that the interface redirects you to an
> > > > > > > > implementation
> > > > > > > > > > JavaDocs, but never enough for me to stop what I'm doing
> to
> > > fix
> > > > > it.
> > > > > > > > > > It's not a big deal either way, I just thought it was
> > strange
> > > > to
> > > > > > > > propose
> > > > > > > > > a
> > > > > > > > > > "public interface" change, but not in terms of the actual
> > > > > interface
> > > > > > > > > class.
> > > > > > > > > >
> > > > > > > > > > It _is_ true that KafkaConsumer is also part of the
> public
> > > API,
> > > > > but
> > > > > > > > only
> > > > > > > > > > really
> > > > > > > > > > for the constructor. Any proposal to define a new
> "consumer
> > > > > client"
> > > > > > > API
> > > > > > > > > > should be on the Consumer interface (which you said you
> > plan
> > > to
> > > > > do
> > > > > > > > > anyway).
> > > > > > > > > > I guess I brought it up because proposing an addition to
> > > > Consumer
> > > > > > > > implies
> > > > > > > > > > it would be added to KafkaConsumer, but proposing an
> > addition
> > > > to
> > > > > > > > > > KafkaConsumer does not necessarily imply it would also be
> > > added
> > > > > to
> > > > > > > > > > Consumer. Does that make sense?
> > > > > > > > > >
> > > > > > > > > > Anyway, thanks for updating the KIP.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > -John
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman
> wrote:
> > > > > > > > > > > Since this doesn't seem too controversial, I'll
> probably
> > > call
> > > > > > for a
> > > > > > > > > vote
> > > > > > > > > > by
> > > > > > > > > > > end of day.
> > > > > > > > > > > If there any further comments/questions/concerns,
> please
> > > let
> > > > me
> > > > > > > know!
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Sophie
> > > > > > > > > > >
> > > > > > > > > > > On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <
> > > > > > > > > sophie@confluent.io
> > > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thanks for the feedback! That's a good point about
> > trying
> > > > to
> > > > > > > > prevent
> > > > > > > > > > users
> > > > > > > > > > > > from
> > > > > > > > > > > > thinking they should use this API during normal
> > > processing
> > > > > and
> > > > > > > > > > clarifying
> > > > > > > > > > > > when/why
> > > > > > > > > > > > you might need it -- regardless of the method name,
> we
> > > > should
> > > > > > > > > > explicitly
> > > > > > > > > > > > call this out
> > > > > > > > > > > > in the javadocs.
> > > > > > > > > > > >
> > > > > > > > > > > > As for the method name, on reflection I agree that
> > > > > > "rejoinGroup"
> > > > > > > > does
> > > > > > > > > > not
> > > > > > > > > > > > seem to be
> > > > > > > > > > > > appropriate. Of course that's what the consumer will
> > > > actually
> > > > > > be
> > > > > > > > > doing,
> > > > > > > > > > > > but that's just an
> > > > > > > > > > > > implementation detail -- the name should reflect what
> > the
> > > > API
> > > > > > is
> > > > > > > > > doing,
> > > > > > > > > > > > not how it does it
> > > > > > > > > > > > (which can always change).
> > > > > > > > > > > >
> > > > > > > > > > > > How about "enforceRebalance"? This is stolen from the
> > > > > > > StreamThread
> > > > > > > > > > method
> > > > > > > > > > > > that does
> > > > > > > > > > > > exactly this (by unsubscribing) so it seems to fit.
> > I'll
> > > > > update
> > > > > > > the
> > > > > > > > > KIP
> > > > > > > > > > > > with this unless anyone
> > > > > > > > > > > > has another suggestion.
> > > > > > > > > > > >
> > > > > > > > > > > > Regarding the Consumer vs KafkaConsumer matter, I
> > > included
> > > > > the
> > > > > > > > > > > > KafkaConsumer method
> > > > > > > > > > > > because that's where all the javadocs redirect to in
> > the
> > > > > > Consumer
> > > > > > > > > > > > interface. Also, FWIW
> > > > > > > > > > > > I'm pretty sure KafkaConsumer is also part of the
> > public
> > > > API
> > > > > --
> > > > > > > we
> > > > > > > > > > would
> > > > > > > > > > > > be adding a new
> > > > > > > > > > > > method to both.
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <
> > > > > > vvcephei@apache.org
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> Hi all,
> > > > > > > > > > > >>
> > > > > > > > > > > >> Thanks for the well motivated KIP, Sophie. I had
> some
> > > > > > > alternatives
> > > > > > > > > in
> > > > > > > > > > > >> mind, which
> > > > > > > > > > > >> I won't even bother to relate because I feel like
> the
> > > > > > motivation
> > > > > > > > > made
> > > > > > > > > > a
> > > > > > > > > > > >> compelling
> > > > > > > > > > > >> argument for the API as proposed.
> > > > > > > > > > > >>
> > > > > > > > > > > >> One very minor point you might as well fix is that
> the
> > > API
> > > > > > > change
> > > > > > > > is
> > > > > > > > > > > >> targeted at
> > > > > > > > > > > >> KafkaConsumer (the implementation), but should be
> > > targeted
> > > > > at
> > > > > > > > > > > >> Consumer (the interface).
> > > > > > > > > > > >>
> > > > > > > > > > > >> I agree with your discomfort about the name. Adding
> a
> > > > > "rejoin"
> > > > > > > > > method
> > > > > > > > > > > >> seems strange
> > > > > > > > > > > >> since there's no "join" method. Instead the way you
> > join
> > > > the
> > > > > > > group
> > > > > > > > > the
> > > > > > > > > > > >> first time is just
> > > > > > > > > > > >> by calling "subscribe". But "resubscribe" seems too
> > > > indirect
> > > > > > > from
> > > > > > > > > what
> > > > > > > > > > > >> we're really trying
> > > > > > > > > > > >> to do, which is to trigger a rebalance by sending a
> > new
> > > > > > > JoinGroup
> > > > > > > > > > request.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Another angle is that we don't want the method to
> > sound
> > > > like
> > > > > > > > > something
> > > > > > > > > > > >> you should
> > > > > > > > > > > >> be calling in normal circumstances, or people will
> be
> > > > > > "tricked"
> > > > > > > > into
> > > > > > > > > > > >> calling it unnecessarily.
> > > > > > > > > > > >>
> > > > > > > > > > > >> So, I think "rejoinGroup" is fine, although a person
> > > > _might_
> > > > > > be
> > > > > > > > > > forgiven
> > > > > > > > > > > >> for thinking they
> > > > > > > > > > > >> need to call it periodically or something. Did you
> > > > consider
> > > > > > > > > > > >> "triggerRebalance", which
> > > > > > > > > > > >> sounds pretty advanced-ish, and accurately describes
> > > what
> > > > > > > happens
> > > > > > > > > when
> > > > > > > > > > > >> you call it?
> > > > > > > > > > > >>
> > > > > > > > > > > >> All in all, the KIP sounds good to me, and I'm in
> > favor.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Thanks,
> > > > > > > > > > > >> -John
> > > > > > > > > > > >>
> > > > > > > > > > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> > > > > > > > > > > >> > This situation was discussed at length after a
> > recent
> > > > > talk I
> > > > > > > > gave.
> > > > > > > > > > This
> > > > > > > > > > > >> KIP
> > > > > > > > > > > >> > would be a great step towards increased
> availability
> > > and
> > > > > in
> > > > > > > > > > facilitating
> > > > > > > > > > > >> > lightweight rebalances.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > anna
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <
> > > > > > > > > > sophie@confluent.io>
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > > Hi all,
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > In light of some recent and upcoming rebalancing
> > and
> > > > > > > > > availability
> > > > > > > > > > > >> > > improvements, it seems we have a need for
> > explicitly
> > > > > > > > triggering
> > > > > > > > > a
> > > > > > > > > > > >> consumer
> > > > > > > > > > > >> > > group rebalance. Therefore I'd like to propose
> > > adding
> > > > a
> > > > > > new
> > > > > > > > > > > >> > > rejoinGroup()method
> > > > > > > > > > > >> > > to the Consumer client (better method name
> > > suggestions
> > > > > are
> > > > > > > > very
> > > > > > > > > > > >> welcome).
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > Please take a look at the KIP and let me know
> what
> > > you
> > > > > > > think!
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > KIP document:
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > >
> > > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > JIRA:
> > > > https://issues.apache.org/jira/browse/KAFKA-9525
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > Cheers,
> > > > > > > > > > > >> > > Sophie
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Hey Guozhang, thanks for the thorough reply!

I definitely went back and forth on whether to make it a blocking call,
and ultimately went with blocking just to leave it open to potential future
use cases (in particular non-Streams apps). But on second (or third)
thought I think I agree that no use case wouldn't be similarly covered by
just calling poll() immediately after enforceRebalance(). It seems best to
leave all rebalancing action within the scope of poll alone and avoid
introducing unnecessary complexity -- happy to revert this then.

I think that ends up addressing most of your other concerns, although
there's one I would push back on: this method should still explicitly
call out whether a rebalance is already in progress and the call is thus
a no-op. If throwing a RebalanceInProgressException seems too
heavy maybe we can just return a boolean indicating whether a new
rebalance was triggered or not.

The snippet you included does work around this, by checking the
condition again in the rebalance listener. But I would argue that a) many
applications don't use a rebalance listener, and shouldn't be forced to
implement it to fully use this new API. More importantly, since you can
probably use the assignor's onAssignment method to achieve the same
thing, b) it adds unnecessary complexity, and as we've seen in Streams
the interactions between the rebalance callbacks and main consumer
can already get quite ugly.

For simplicity's sake then, I'll propose to just return the bool over the
exception and change the signature to

/**
 * @return Whether a new rebalance was triggered (false if a rebalance was
already in progress)
 * @throws java.lang.IllegalStateException if the consumer does not use
group subscription
 */
boolean enforceRebalance();

Thoughts?

On Tue, Feb 11, 2020 at 5:29 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Sophie, thanks for brining up this KIP, and the great write-up
> summarizing the motivations of the proposal. Here are some comments:
>
> Minor:
>
> 1. If we want to make it a blocking call (I have some thoughts about this
> below :), to be consistent we need to consider having two overloaded
> function, one without the timeout which then relies on `
> DEFAULT_API_TIMEOUT_MS_CONFIG`.
>
> 2. Also I'd suggest that, again for API consistency, we a) throw
> TimeoutException if the operation cannot be completed within the timeout
> value, b) return false immediately if we cannot trigger a rebalance either
> because coordinator is unknown.
>
> Meta:
>
> 3. I'm not sure if we have a concrete scenario that we want to wait until
> the rebalance is completed in KIP-441 / 268, rather than calling
> "consumer.enforceRebalance(); consumer.poll()" consecutively and try to
> execute the rebalance in the poll call? If there's no valid motivations I'm
> still a bit inclined to make it non-blocking (i.e. just setting a bit and
> then execute the process in the later poll call) similar to our `seek`
> functions. By doing this we can also make this function simpler as it would
> never throw RebalanceInProgress or Timeout or even KafkaExceptions.
>
> 4. Re: the case "when a rebalance is already in progress", this may be
> related to 3) above. I think we can simplify this case as well but just not
> triggering a new rebalance and let the the caller handle it: for example in
> KIP-441, in each iteration of the stream thread, we can if a standby task
> is ready, and if yes we call `enforceRebalance`, if there's already a
> rebalance in progress (either with the new subscription metadata, or not)
> this call would be a no-op, and then in the next iteration we would just
> call that function again, and eventually we would trigger the rebalance
> with the new subscription metadata and previous calls would be no-op and
> hence no cost anyways. I feel this would be simpler than letting the caller
> to capture RebalanceInProgressException:
>
>
> mainProcessingLoop() {
>     if (needsRebalance) {
>         consumer.enforceRebalance();
>     }
>
>     records = consumer.poll();
>     ...
>     // do some processing
> }
>
> RebalanceListener {
>
>    onPartitionsAssigned(...) {
>       if (rebalanceGoalAchieved()) {
>         needsRebalance = false;
>       }
>     }
> }
>
>
> WDYT?
>
>
>
>
> On Tue, Feb 11, 2020 at 3:59 PM Sophie Blee-Goldman <so...@confluent.io>
> wrote:
>
> > Hey Boyang,
> >
> > Originally I had it as a nonblocking call, but decided to change it to
> > blocking
> > with a timeout parameter. I'm not sure a future makes sense to return
> here,
> > because the rebalance either does or does not complete within the
> timeout:
> > if it does not, you will have to call poll again to complete it (as is
> the
> > case with
> > any other rebalance). I'll call this out in the javadocs as well.
> >
> > I also added an example demonstrating how/when to use this new API.
> >
> > Thanks!
> >
> > On Tue, Feb 11, 2020 at 1:49 PM Boyang Chen <re...@gmail.com>
> > wrote:
> >
> > > Hey Sophie,
> > >
> > > is the `enforceRebalance` a blocking call? Could we add a code sample
> to
> > > the KIP on how this API should be used?
> > >
> > > Returning a future instead of a boolean might be easier as we are
> > allowing
> > > consumer to make progress during rebalance after 429 IMHO.
> > >
> > > Boyang
> > >
> > >
> > > On Tue, Feb 11, 2020 at 1:17 PM Konstantine Karantasis <
> > > konstantine@confluent.io> wrote:
> > >
> > > > Thanks for the quick turnaround Sophie. My points have been
> addressed.
> > > > I think the intended use is quite clear now.
> > > >
> > > > Best,
> > > > Konstantine
> > > >
> > > >
> > > > On Tue, Feb 11, 2020 at 12:57 PM Sophie Blee-Goldman <
> > > sophie@confluent.io>
> > > > wrote:
> > > >
> > > > > Konstantine,
> > > > > Thanks for the feedback! I've updated the sections with your
> > > > suggestions. I
> > > > > agree
> > > > > in particular that it's really important to make sure users don't
> > call
> > > > this
> > > > > unnecessarily,
> > > > >  or for the wrong reasons: to that end I also extended the javadocs
> > to
> > > > > specify that this
> > > > > API is for when changes to the subscription userdata occur.
> Hopefully
> > > > that
> > > > > should make
> > > > > its intended usage quite clear.
> > > > >
> > > > > Bill,
> > > > > The rebalance triggered by this new API will be a "normal"
> rebalance,
> > > and
> > > > > therefore
> > > > > follow the existing listener semantics. For example a cooperative
> > > > rebalance
> > > > > will always
> > > > > call onPartitionsAssigned, even if no partitions are actually
> moved.
> > > > > An eager rebalance will still revoke all partitions first anyway.
> > > > >
> > > > > Thanks for the feedback!
> > > > > Sophie
> > > > >
> > > > > On Tue, Feb 11, 2020 at 9:52 AM Bill Bejeck <bb...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Sophie,
> > > > > >
> > > > > > Thanks for the KIP, makes sense to me.
> > > > > >
> > > > > > One quick question, I'm not sure if it's relevant or not.
> > > > > >
> > > > > > If a user provides a `ConsumerRebalanceListener` and a rebalance
> is
> > > > > > triggered from an `enforceRebalance`  call,
> > > > > > it seems possible the listener won't get called since partition
> > > > > assignments
> > > > > > might not change.
> > > > > > If that is the case, do we want to possibly consider adding a
> > method
> > > to
> > > > > the
> > > > > > `ConsumerRebalanceListener` for callbacks on `enforceRebalance`
> > > > actions?
> > > > > >
> > > > > > Thanks,
> > > > > > Bill
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Feb 11, 2020 at 12:11 PM Konstantine Karantasis <
> > > > > > konstantine@confluent.io> wrote:
> > > > > >
> > > > > > > Hi Sophie.
> > > > > > >
> > > > > > > Thanks for the KIP. I liked how focused the proposal is. Also,
> > its
> > > > > > > motivation is clear after carefully reading the KIP and its
> > > > references.
> > > > > > >
> > > > > > > Yet, I think it'd be a good idea to call out explicitly on the
> > > > Rejected
> > > > > > > Alternatives section that an automatic and periodic triggering
> of
> > > > > > > rebalances that would not require exposing this capability
> > through
> > > > the
> > > > > > > Consumer interface does not cover your specific use cases and
> > > > therefore
> > > > > > is
> > > > > > > not chosen as a desired approach. Maybe, even consider
> mentioning
> > > > again
> > > > > > > here that this method is expected to be used to respond to
> system
> > > > > changes
> > > > > > > external to the consumer and its membership logic and is not
> > > proposed
> > > > > as
> > > > > > a
> > > > > > > way to resolve temporary imbalances due to membership changes
> > that
> > > > > should
> > > > > > > inherently be resolved by the assignor logic itself with one or
> > > more
> > > > > > > consecutive rebalances.
> > > > > > >
> > > > > > > Also, in your javadoc I'd add some context similar to what
> > someone
> > > > can
> > > > > > read
> > > > > > > on the KIP. Specifically where you say: "for example if some
> > > > condition
> > > > > > has
> > > > > > > changed that has implications for the partition assignment."
> I'd
> > > > rather
> > > > > > add
> > > > > > > something like "for example, if some condition external and
> > > invisible
> > > > > to
> > > > > > > the Consumer and its group membership has changed in ways that
> > > would
> > > > > > > justify a new partition assignment". That's just an example,
> feel
> > > > free
> > > > > to
> > > > > > > reword, but I believe that saying explicitly that this
> condition
> > is
> > > > not
> > > > > > > visible to the consumer is useful to understand that this is
> not
> > > > > > necessary
> > > > > > > under normal circumstances.
> > > > > > >
> > > > > > > In Compatibility, Deprecation, and Migration Plan section I
> think
> > > > it's
> > > > > > > worth mentioning that this is a new feature that affects new
> > > > > > > implementations of the Consumer interface and any such new
> > > > > implementation
> > > > > > > should override the new method. Implementations that wish to
> > > upgrade
> > > > > to a
> > > > > > > newer version should be extended and recompiled, since no
> default
> > > > > > > implementation will be provided.
> > > > > > >
> > > > > > > Naming is hard here, if someone wants to emphasize the ad hoc
> and
> > > > > > irregular
> > > > > > > nature of this call. After some thought I'm fine with
> > > > > 'enforceRebalance'
> > > > > > > even if it could potentially be confused to a method that is
> > > supposed
> > > > > to
> > > > > > be
> > > > > > > called to remediate one or more previously unsuccessful
> > rebalances
> > > > > (which
> > > > > > > is partly what StreamThread#enforceRebalance is used for). The
> > > best I
> > > > > > could
> > > > > > > think of was 'onRequestRebalance' but that's not perfect
> either.
> > > > > > >
> > > > > > > Best,
> > > > > > > Konstantine
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <
> > > > > sophie@confluent.io
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks John. I took out the KafkaConsumer method and moved
> the
> > > > > javadocs
> > > > > > > > to the Consumer#enforceRebalance in the KIP -- hope you're
> > happy
> > > :P
> > > > > > > >
> > > > > > > > Also, I wanted to point out one minor change to the current
> > > > proposal:
> > > > > > > make
> > > > > > > > this
> > > > > > > > a blocking call, which accepts a timeout and returns whether
> > the
> > > > > > > rebalance
> > > > > > > > completed within the timeout. It will still reduce to a
> > > nonblocking
> > > > > > call
> > > > > > > if
> > > > > > > > a "zero"
> > > > > > > > timeout is supplied. I've updated the KIP accordingly.
> > > > > > > >
> > > > > > > > Let me know if there are any further concerns, else I'll call
> > > for a
> > > > > > vote.
> > > > > > > >
> > > > > > > > Cheers!
> > > > > > > > Sophie
> > > > > > > >
> > > > > > > > On Mon, Feb 10, 2020 at 12:47 PM John Roesler <
> > > vvcephei@apache.org
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Sophie,
> > > > > > > > >
> > > > > > > > > Sorry I didn't respond. I think your new method name sounds
> > > good.
> > > > > > > > >
> > > > > > > > > Regarding the interface vs implementation, I agree it's
> > > > confusing.
> > > > > > It's
> > > > > > > > > always bothered me that the interface redirects you to an
> > > > > > > implementation
> > > > > > > > > JavaDocs, but never enough for me to stop what I'm doing to
> > fix
> > > > it.
> > > > > > > > > It's not a big deal either way, I just thought it was
> strange
> > > to
> > > > > > > propose
> > > > > > > > a
> > > > > > > > > "public interface" change, but not in terms of the actual
> > > > interface
> > > > > > > > class.
> > > > > > > > >
> > > > > > > > > It _is_ true that KafkaConsumer is also part of the public
> > API,
> > > > but
> > > > > > > only
> > > > > > > > > really
> > > > > > > > > for the constructor. Any proposal to define a new "consumer
> > > > client"
> > > > > > API
> > > > > > > > > should be on the Consumer interface (which you said you
> plan
> > to
> > > > do
> > > > > > > > anyway).
> > > > > > > > > I guess I brought it up because proposing an addition to
> > > Consumer
> > > > > > > implies
> > > > > > > > > it would be added to KafkaConsumer, but proposing an
> addition
> > > to
> > > > > > > > > KafkaConsumer does not necessarily imply it would also be
> > added
> > > > to
> > > > > > > > > Consumer. Does that make sense?
> > > > > > > > >
> > > > > > > > > Anyway, thanks for updating the KIP.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > -John
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman wrote:
> > > > > > > > > > Since this doesn't seem too controversial, I'll probably
> > call
> > > > > for a
> > > > > > > > vote
> > > > > > > > > by
> > > > > > > > > > end of day.
> > > > > > > > > > If there any further comments/questions/concerns, please
> > let
> > > me
> > > > > > know!
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Sophie
> > > > > > > > > >
> > > > > > > > > > On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <
> > > > > > > > sophie@confluent.io
> > > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thanks for the feedback! That's a good point about
> trying
> > > to
> > > > > > > prevent
> > > > > > > > > users
> > > > > > > > > > > from
> > > > > > > > > > > thinking they should use this API during normal
> > processing
> > > > and
> > > > > > > > > clarifying
> > > > > > > > > > > when/why
> > > > > > > > > > > you might need it -- regardless of the method name, we
> > > should
> > > > > > > > > explicitly
> > > > > > > > > > > call this out
> > > > > > > > > > > in the javadocs.
> > > > > > > > > > >
> > > > > > > > > > > As for the method name, on reflection I agree that
> > > > > "rejoinGroup"
> > > > > > > does
> > > > > > > > > not
> > > > > > > > > > > seem to be
> > > > > > > > > > > appropriate. Of course that's what the consumer will
> > > actually
> > > > > be
> > > > > > > > doing,
> > > > > > > > > > > but that's just an
> > > > > > > > > > > implementation detail -- the name should reflect what
> the
> > > API
> > > > > is
> > > > > > > > doing,
> > > > > > > > > > > not how it does it
> > > > > > > > > > > (which can always change).
> > > > > > > > > > >
> > > > > > > > > > > How about "enforceRebalance"? This is stolen from the
> > > > > > StreamThread
> > > > > > > > > method
> > > > > > > > > > > that does
> > > > > > > > > > > exactly this (by unsubscribing) so it seems to fit.
> I'll
> > > > update
> > > > > > the
> > > > > > > > KIP
> > > > > > > > > > > with this unless anyone
> > > > > > > > > > > has another suggestion.
> > > > > > > > > > >
> > > > > > > > > > > Regarding the Consumer vs KafkaConsumer matter, I
> > included
> > > > the
> > > > > > > > > > > KafkaConsumer method
> > > > > > > > > > > because that's where all the javadocs redirect to in
> the
> > > > > Consumer
> > > > > > > > > > > interface. Also, FWIW
> > > > > > > > > > > I'm pretty sure KafkaConsumer is also part of the
> public
> > > API
> > > > --
> > > > > > we
> > > > > > > > > would
> > > > > > > > > > > be adding a new
> > > > > > > > > > > method to both.
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <
> > > > > vvcephei@apache.org
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > >> Hi all,
> > > > > > > > > > >>
> > > > > > > > > > >> Thanks for the well motivated KIP, Sophie. I had some
> > > > > > alternatives
> > > > > > > > in
> > > > > > > > > > >> mind, which
> > > > > > > > > > >> I won't even bother to relate because I feel like the
> > > > > motivation
> > > > > > > > made
> > > > > > > > > a
> > > > > > > > > > >> compelling
> > > > > > > > > > >> argument for the API as proposed.
> > > > > > > > > > >>
> > > > > > > > > > >> One very minor point you might as well fix is that the
> > API
> > > > > > change
> > > > > > > is
> > > > > > > > > > >> targeted at
> > > > > > > > > > >> KafkaConsumer (the implementation), but should be
> > targeted
> > > > at
> > > > > > > > > > >> Consumer (the interface).
> > > > > > > > > > >>
> > > > > > > > > > >> I agree with your discomfort about the name. Adding a
> > > > "rejoin"
> > > > > > > > method
> > > > > > > > > > >> seems strange
> > > > > > > > > > >> since there's no "join" method. Instead the way you
> join
> > > the
> > > > > > group
> > > > > > > > the
> > > > > > > > > > >> first time is just
> > > > > > > > > > >> by calling "subscribe". But "resubscribe" seems too
> > > indirect
> > > > > > from
> > > > > > > > what
> > > > > > > > > > >> we're really trying
> > > > > > > > > > >> to do, which is to trigger a rebalance by sending a
> new
> > > > > > JoinGroup
> > > > > > > > > request.
> > > > > > > > > > >>
> > > > > > > > > > >> Another angle is that we don't want the method to
> sound
> > > like
> > > > > > > > something
> > > > > > > > > > >> you should
> > > > > > > > > > >> be calling in normal circumstances, or people will be
> > > > > "tricked"
> > > > > > > into
> > > > > > > > > > >> calling it unnecessarily.
> > > > > > > > > > >>
> > > > > > > > > > >> So, I think "rejoinGroup" is fine, although a person
> > > _might_
> > > > > be
> > > > > > > > > forgiven
> > > > > > > > > > >> for thinking they
> > > > > > > > > > >> need to call it periodically or something. Did you
> > > consider
> > > > > > > > > > >> "triggerRebalance", which
> > > > > > > > > > >> sounds pretty advanced-ish, and accurately describes
> > what
> > > > > > happens
> > > > > > > > when
> > > > > > > > > > >> you call it?
> > > > > > > > > > >>
> > > > > > > > > > >> All in all, the KIP sounds good to me, and I'm in
> favor.
> > > > > > > > > > >>
> > > > > > > > > > >> Thanks,
> > > > > > > > > > >> -John
> > > > > > > > > > >>
> > > > > > > > > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> > > > > > > > > > >> > This situation was discussed at length after a
> recent
> > > > talk I
> > > > > > > gave.
> > > > > > > > > This
> > > > > > > > > > >> KIP
> > > > > > > > > > >> > would be a great step towards increased availability
> > and
> > > > in
> > > > > > > > > facilitating
> > > > > > > > > > >> > lightweight rebalances.
> > > > > > > > > > >> >
> > > > > > > > > > >> > anna
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <
> > > > > > > > > sophie@confluent.io>
> > > > > > > > > > >> > wrote:
> > > > > > > > > > >> >
> > > > > > > > > > >> > > Hi all,
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > In light of some recent and upcoming rebalancing
> and
> > > > > > > > availability
> > > > > > > > > > >> > > improvements, it seems we have a need for
> explicitly
> > > > > > > triggering
> > > > > > > > a
> > > > > > > > > > >> consumer
> > > > > > > > > > >> > > group rebalance. Therefore I'd like to propose
> > adding
> > > a
> > > > > new
> > > > > > > > > > >> > > rejoinGroup()method
> > > > > > > > > > >> > > to the Consumer client (better method name
> > suggestions
> > > > are
> > > > > > > very
> > > > > > > > > > >> welcome).
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > Please take a look at the KIP and let me know what
> > you
> > > > > > think!
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > KIP document:
> > > > > > > > > > >> > >
> > > > > > > > > > >> > >
> > > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > JIRA:
> > > https://issues.apache.org/jira/browse/KAFKA-9525
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > Cheers,
> > > > > > > > > > >> > > Sophie
> > > > > > > > > > >> > >
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Sophie, thanks for brining up this KIP, and the great write-up
summarizing the motivations of the proposal. Here are some comments:

Minor:

1. If we want to make it a blocking call (I have some thoughts about this
below :), to be consistent we need to consider having two overloaded
function, one without the timeout which then relies on `
DEFAULT_API_TIMEOUT_MS_CONFIG`.

2. Also I'd suggest that, again for API consistency, we a) throw
TimeoutException if the operation cannot be completed within the timeout
value, b) return false immediately if we cannot trigger a rebalance either
because coordinator is unknown.

Meta:

3. I'm not sure if we have a concrete scenario that we want to wait until
the rebalance is completed in KIP-441 / 268, rather than calling
"consumer.enforceRebalance(); consumer.poll()" consecutively and try to
execute the rebalance in the poll call? If there's no valid motivations I'm
still a bit inclined to make it non-blocking (i.e. just setting a bit and
then execute the process in the later poll call) similar to our `seek`
functions. By doing this we can also make this function simpler as it would
never throw RebalanceInProgress or Timeout or even KafkaExceptions.

4. Re: the case "when a rebalance is already in progress", this may be
related to 3) above. I think we can simplify this case as well but just not
triggering a new rebalance and let the the caller handle it: for example in
KIP-441, in each iteration of the stream thread, we can if a standby task
is ready, and if yes we call `enforceRebalance`, if there's already a
rebalance in progress (either with the new subscription metadata, or not)
this call would be a no-op, and then in the next iteration we would just
call that function again, and eventually we would trigger the rebalance
with the new subscription metadata and previous calls would be no-op and
hence no cost anyways. I feel this would be simpler than letting the caller
to capture RebalanceInProgressException:


mainProcessingLoop() {
    if (needsRebalance) {
        consumer.enforceRebalance();
    }

    records = consumer.poll();
    ...
    // do some processing
}

RebalanceListener {

   onPartitionsAssigned(...) {
      if (rebalanceGoalAchieved()) {
        needsRebalance = false;
      }
    }
}


WDYT?




On Tue, Feb 11, 2020 at 3:59 PM Sophie Blee-Goldman <so...@confluent.io>
wrote:

> Hey Boyang,
>
> Originally I had it as a nonblocking call, but decided to change it to
> blocking
> with a timeout parameter. I'm not sure a future makes sense to return here,
> because the rebalance either does or does not complete within the timeout:
> if it does not, you will have to call poll again to complete it (as is the
> case with
> any other rebalance). I'll call this out in the javadocs as well.
>
> I also added an example demonstrating how/when to use this new API.
>
> Thanks!
>
> On Tue, Feb 11, 2020 at 1:49 PM Boyang Chen <re...@gmail.com>
> wrote:
>
> > Hey Sophie,
> >
> > is the `enforceRebalance` a blocking call? Could we add a code sample to
> > the KIP on how this API should be used?
> >
> > Returning a future instead of a boolean might be easier as we are
> allowing
> > consumer to make progress during rebalance after 429 IMHO.
> >
> > Boyang
> >
> >
> > On Tue, Feb 11, 2020 at 1:17 PM Konstantine Karantasis <
> > konstantine@confluent.io> wrote:
> >
> > > Thanks for the quick turnaround Sophie. My points have been addressed.
> > > I think the intended use is quite clear now.
> > >
> > > Best,
> > > Konstantine
> > >
> > >
> > > On Tue, Feb 11, 2020 at 12:57 PM Sophie Blee-Goldman <
> > sophie@confluent.io>
> > > wrote:
> > >
> > > > Konstantine,
> > > > Thanks for the feedback! I've updated the sections with your
> > > suggestions. I
> > > > agree
> > > > in particular that it's really important to make sure users don't
> call
> > > this
> > > > unnecessarily,
> > > >  or for the wrong reasons: to that end I also extended the javadocs
> to
> > > > specify that this
> > > > API is for when changes to the subscription userdata occur. Hopefully
> > > that
> > > > should make
> > > > its intended usage quite clear.
> > > >
> > > > Bill,
> > > > The rebalance triggered by this new API will be a "normal" rebalance,
> > and
> > > > therefore
> > > > follow the existing listener semantics. For example a cooperative
> > > rebalance
> > > > will always
> > > > call onPartitionsAssigned, even if no partitions are actually moved.
> > > > An eager rebalance will still revoke all partitions first anyway.
> > > >
> > > > Thanks for the feedback!
> > > > Sophie
> > > >
> > > > On Tue, Feb 11, 2020 at 9:52 AM Bill Bejeck <bb...@gmail.com>
> wrote:
> > > >
> > > > > Hi Sophie,
> > > > >
> > > > > Thanks for the KIP, makes sense to me.
> > > > >
> > > > > One quick question, I'm not sure if it's relevant or not.
> > > > >
> > > > > If a user provides a `ConsumerRebalanceListener` and a rebalance is
> > > > > triggered from an `enforceRebalance`  call,
> > > > > it seems possible the listener won't get called since partition
> > > > assignments
> > > > > might not change.
> > > > > If that is the case, do we want to possibly consider adding a
> method
> > to
> > > > the
> > > > > `ConsumerRebalanceListener` for callbacks on `enforceRebalance`
> > > actions?
> > > > >
> > > > > Thanks,
> > > > > Bill
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Feb 11, 2020 at 12:11 PM Konstantine Karantasis <
> > > > > konstantine@confluent.io> wrote:
> > > > >
> > > > > > Hi Sophie.
> > > > > >
> > > > > > Thanks for the KIP. I liked how focused the proposal is. Also,
> its
> > > > > > motivation is clear after carefully reading the KIP and its
> > > references.
> > > > > >
> > > > > > Yet, I think it'd be a good idea to call out explicitly on the
> > > Rejected
> > > > > > Alternatives section that an automatic and periodic triggering of
> > > > > > rebalances that would not require exposing this capability
> through
> > > the
> > > > > > Consumer interface does not cover your specific use cases and
> > > therefore
> > > > > is
> > > > > > not chosen as a desired approach. Maybe, even consider mentioning
> > > again
> > > > > > here that this method is expected to be used to respond to system
> > > > changes
> > > > > > external to the consumer and its membership logic and is not
> > proposed
> > > > as
> > > > > a
> > > > > > way to resolve temporary imbalances due to membership changes
> that
> > > > should
> > > > > > inherently be resolved by the assignor logic itself with one or
> > more
> > > > > > consecutive rebalances.
> > > > > >
> > > > > > Also, in your javadoc I'd add some context similar to what
> someone
> > > can
> > > > > read
> > > > > > on the KIP. Specifically where you say: "for example if some
> > > condition
> > > > > has
> > > > > > changed that has implications for the partition assignment." I'd
> > > rather
> > > > > add
> > > > > > something like "for example, if some condition external and
> > invisible
> > > > to
> > > > > > the Consumer and its group membership has changed in ways that
> > would
> > > > > > justify a new partition assignment". That's just an example, feel
> > > free
> > > > to
> > > > > > reword, but I believe that saying explicitly that this condition
> is
> > > not
> > > > > > visible to the consumer is useful to understand that this is not
> > > > > necessary
> > > > > > under normal circumstances.
> > > > > >
> > > > > > In Compatibility, Deprecation, and Migration Plan section I think
> > > it's
> > > > > > worth mentioning that this is a new feature that affects new
> > > > > > implementations of the Consumer interface and any such new
> > > > implementation
> > > > > > should override the new method. Implementations that wish to
> > upgrade
> > > > to a
> > > > > > newer version should be extended and recompiled, since no default
> > > > > > implementation will be provided.
> > > > > >
> > > > > > Naming is hard here, if someone wants to emphasize the ad hoc and
> > > > > irregular
> > > > > > nature of this call. After some thought I'm fine with
> > > > 'enforceRebalance'
> > > > > > even if it could potentially be confused to a method that is
> > supposed
> > > > to
> > > > > be
> > > > > > called to remediate one or more previously unsuccessful
> rebalances
> > > > (which
> > > > > > is partly what StreamThread#enforceRebalance is used for). The
> > best I
> > > > > could
> > > > > > think of was 'onRequestRebalance' but that's not perfect either.
> > > > > >
> > > > > > Best,
> > > > > > Konstantine
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <
> > > > sophie@confluent.io
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks John. I took out the KafkaConsumer method and moved the
> > > > javadocs
> > > > > > > to the Consumer#enforceRebalance in the KIP -- hope you're
> happy
> > :P
> > > > > > >
> > > > > > > Also, I wanted to point out one minor change to the current
> > > proposal:
> > > > > > make
> > > > > > > this
> > > > > > > a blocking call, which accepts a timeout and returns whether
> the
> > > > > > rebalance
> > > > > > > completed within the timeout. It will still reduce to a
> > nonblocking
> > > > > call
> > > > > > if
> > > > > > > a "zero"
> > > > > > > timeout is supplied. I've updated the KIP accordingly.
> > > > > > >
> > > > > > > Let me know if there are any further concerns, else I'll call
> > for a
> > > > > vote.
> > > > > > >
> > > > > > > Cheers!
> > > > > > > Sophie
> > > > > > >
> > > > > > > On Mon, Feb 10, 2020 at 12:47 PM John Roesler <
> > vvcephei@apache.org
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks Sophie,
> > > > > > > >
> > > > > > > > Sorry I didn't respond. I think your new method name sounds
> > good.
> > > > > > > >
> > > > > > > > Regarding the interface vs implementation, I agree it's
> > > confusing.
> > > > > It's
> > > > > > > > always bothered me that the interface redirects you to an
> > > > > > implementation
> > > > > > > > JavaDocs, but never enough for me to stop what I'm doing to
> fix
> > > it.
> > > > > > > > It's not a big deal either way, I just thought it was strange
> > to
> > > > > > propose
> > > > > > > a
> > > > > > > > "public interface" change, but not in terms of the actual
> > > interface
> > > > > > > class.
> > > > > > > >
> > > > > > > > It _is_ true that KafkaConsumer is also part of the public
> API,
> > > but
> > > > > > only
> > > > > > > > really
> > > > > > > > for the constructor. Any proposal to define a new "consumer
> > > client"
> > > > > API
> > > > > > > > should be on the Consumer interface (which you said you plan
> to
> > > do
> > > > > > > anyway).
> > > > > > > > I guess I brought it up because proposing an addition to
> > Consumer
> > > > > > implies
> > > > > > > > it would be added to KafkaConsumer, but proposing an addition
> > to
> > > > > > > > KafkaConsumer does not necessarily imply it would also be
> added
> > > to
> > > > > > > > Consumer. Does that make sense?
> > > > > > > >
> > > > > > > > Anyway, thanks for updating the KIP.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > -John
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman wrote:
> > > > > > > > > Since this doesn't seem too controversial, I'll probably
> call
> > > > for a
> > > > > > > vote
> > > > > > > > by
> > > > > > > > > end of day.
> > > > > > > > > If there any further comments/questions/concerns, please
> let
> > me
> > > > > know!
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Sophie
> > > > > > > > >
> > > > > > > > > On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <
> > > > > > > sophie@confluent.io
> > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks for the feedback! That's a good point about trying
> > to
> > > > > > prevent
> > > > > > > > users
> > > > > > > > > > from
> > > > > > > > > > thinking they should use this API during normal
> processing
> > > and
> > > > > > > > clarifying
> > > > > > > > > > when/why
> > > > > > > > > > you might need it -- regardless of the method name, we
> > should
> > > > > > > > explicitly
> > > > > > > > > > call this out
> > > > > > > > > > in the javadocs.
> > > > > > > > > >
> > > > > > > > > > As for the method name, on reflection I agree that
> > > > "rejoinGroup"
> > > > > > does
> > > > > > > > not
> > > > > > > > > > seem to be
> > > > > > > > > > appropriate. Of course that's what the consumer will
> > actually
> > > > be
> > > > > > > doing,
> > > > > > > > > > but that's just an
> > > > > > > > > > implementation detail -- the name should reflect what the
> > API
> > > > is
> > > > > > > doing,
> > > > > > > > > > not how it does it
> > > > > > > > > > (which can always change).
> > > > > > > > > >
> > > > > > > > > > How about "enforceRebalance"? This is stolen from the
> > > > > StreamThread
> > > > > > > > method
> > > > > > > > > > that does
> > > > > > > > > > exactly this (by unsubscribing) so it seems to fit. I'll
> > > update
> > > > > the
> > > > > > > KIP
> > > > > > > > > > with this unless anyone
> > > > > > > > > > has another suggestion.
> > > > > > > > > >
> > > > > > > > > > Regarding the Consumer vs KafkaConsumer matter, I
> included
> > > the
> > > > > > > > > > KafkaConsumer method
> > > > > > > > > > because that's where all the javadocs redirect to in the
> > > > Consumer
> > > > > > > > > > interface. Also, FWIW
> > > > > > > > > > I'm pretty sure KafkaConsumer is also part of the public
> > API
> > > --
> > > > > we
> > > > > > > > would
> > > > > > > > > > be adding a new
> > > > > > > > > > method to both.
> > > > > > > > > >
> > > > > > > > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <
> > > > vvcephei@apache.org
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Hi all,
> > > > > > > > > >>
> > > > > > > > > >> Thanks for the well motivated KIP, Sophie. I had some
> > > > > alternatives
> > > > > > > in
> > > > > > > > > >> mind, which
> > > > > > > > > >> I won't even bother to relate because I feel like the
> > > > motivation
> > > > > > > made
> > > > > > > > a
> > > > > > > > > >> compelling
> > > > > > > > > >> argument for the API as proposed.
> > > > > > > > > >>
> > > > > > > > > >> One very minor point you might as well fix is that the
> API
> > > > > change
> > > > > > is
> > > > > > > > > >> targeted at
> > > > > > > > > >> KafkaConsumer (the implementation), but should be
> targeted
> > > at
> > > > > > > > > >> Consumer (the interface).
> > > > > > > > > >>
> > > > > > > > > >> I agree with your discomfort about the name. Adding a
> > > "rejoin"
> > > > > > > method
> > > > > > > > > >> seems strange
> > > > > > > > > >> since there's no "join" method. Instead the way you join
> > the
> > > > > group
> > > > > > > the
> > > > > > > > > >> first time is just
> > > > > > > > > >> by calling "subscribe". But "resubscribe" seems too
> > indirect
> > > > > from
> > > > > > > what
> > > > > > > > > >> we're really trying
> > > > > > > > > >> to do, which is to trigger a rebalance by sending a new
> > > > > JoinGroup
> > > > > > > > request.
> > > > > > > > > >>
> > > > > > > > > >> Another angle is that we don't want the method to sound
> > like
> > > > > > > something
> > > > > > > > > >> you should
> > > > > > > > > >> be calling in normal circumstances, or people will be
> > > > "tricked"
> > > > > > into
> > > > > > > > > >> calling it unnecessarily.
> > > > > > > > > >>
> > > > > > > > > >> So, I think "rejoinGroup" is fine, although a person
> > _might_
> > > > be
> > > > > > > > forgiven
> > > > > > > > > >> for thinking they
> > > > > > > > > >> need to call it periodically or something. Did you
> > consider
> > > > > > > > > >> "triggerRebalance", which
> > > > > > > > > >> sounds pretty advanced-ish, and accurately describes
> what
> > > > > happens
> > > > > > > when
> > > > > > > > > >> you call it?
> > > > > > > > > >>
> > > > > > > > > >> All in all, the KIP sounds good to me, and I'm in favor.
> > > > > > > > > >>
> > > > > > > > > >> Thanks,
> > > > > > > > > >> -John
> > > > > > > > > >>
> > > > > > > > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> > > > > > > > > >> > This situation was discussed at length after a recent
> > > talk I
> > > > > > gave.
> > > > > > > > This
> > > > > > > > > >> KIP
> > > > > > > > > >> > would be a great step towards increased availability
> and
> > > in
> > > > > > > > facilitating
> > > > > > > > > >> > lightweight rebalances.
> > > > > > > > > >> >
> > > > > > > > > >> > anna
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <
> > > > > > > > sophie@confluent.io>
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > > Hi all,
> > > > > > > > > >> > >
> > > > > > > > > >> > > In light of some recent and upcoming rebalancing and
> > > > > > > availability
> > > > > > > > > >> > > improvements, it seems we have a need for explicitly
> > > > > > triggering
> > > > > > > a
> > > > > > > > > >> consumer
> > > > > > > > > >> > > group rebalance. Therefore I'd like to propose
> adding
> > a
> > > > new
> > > > > > > > > >> > > rejoinGroup()method
> > > > > > > > > >> > > to the Consumer client (better method name
> suggestions
> > > are
> > > > > > very
> > > > > > > > > >> welcome).
> > > > > > > > > >> > >
> > > > > > > > > >> > > Please take a look at the KIP and let me know what
> you
> > > > > think!
> > > > > > > > > >> > >
> > > > > > > > > >> > > KIP document:
> > > > > > > > > >> > >
> > > > > > > > > >> > >
> > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > > > > > > > >> > >
> > > > > > > > > >> > > JIRA:
> > https://issues.apache.org/jira/browse/KAFKA-9525
> > > > > > > > > >> > >
> > > > > > > > > >> > > Cheers,
> > > > > > > > > >> > > Sophie
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Hey Boyang,

Originally I had it as a nonblocking call, but decided to change it to
blocking
with a timeout parameter. I'm not sure a future makes sense to return here,
because the rebalance either does or does not complete within the timeout:
if it does not, you will have to call poll again to complete it (as is the
case with
any other rebalance). I'll call this out in the javadocs as well.

I also added an example demonstrating how/when to use this new API.

Thanks!

On Tue, Feb 11, 2020 at 1:49 PM Boyang Chen <re...@gmail.com>
wrote:

> Hey Sophie,
>
> is the `enforceRebalance` a blocking call? Could we add a code sample to
> the KIP on how this API should be used?
>
> Returning a future instead of a boolean might be easier as we are allowing
> consumer to make progress during rebalance after 429 IMHO.
>
> Boyang
>
>
> On Tue, Feb 11, 2020 at 1:17 PM Konstantine Karantasis <
> konstantine@confluent.io> wrote:
>
> > Thanks for the quick turnaround Sophie. My points have been addressed.
> > I think the intended use is quite clear now.
> >
> > Best,
> > Konstantine
> >
> >
> > On Tue, Feb 11, 2020 at 12:57 PM Sophie Blee-Goldman <
> sophie@confluent.io>
> > wrote:
> >
> > > Konstantine,
> > > Thanks for the feedback! I've updated the sections with your
> > suggestions. I
> > > agree
> > > in particular that it's really important to make sure users don't call
> > this
> > > unnecessarily,
> > >  or for the wrong reasons: to that end I also extended the javadocs to
> > > specify that this
> > > API is for when changes to the subscription userdata occur. Hopefully
> > that
> > > should make
> > > its intended usage quite clear.
> > >
> > > Bill,
> > > The rebalance triggered by this new API will be a "normal" rebalance,
> and
> > > therefore
> > > follow the existing listener semantics. For example a cooperative
> > rebalance
> > > will always
> > > call onPartitionsAssigned, even if no partitions are actually moved.
> > > An eager rebalance will still revoke all partitions first anyway.
> > >
> > > Thanks for the feedback!
> > > Sophie
> > >
> > > On Tue, Feb 11, 2020 at 9:52 AM Bill Bejeck <bb...@gmail.com> wrote:
> > >
> > > > Hi Sophie,
> > > >
> > > > Thanks for the KIP, makes sense to me.
> > > >
> > > > One quick question, I'm not sure if it's relevant or not.
> > > >
> > > > If a user provides a `ConsumerRebalanceListener` and a rebalance is
> > > > triggered from an `enforceRebalance`  call,
> > > > it seems possible the listener won't get called since partition
> > > assignments
> > > > might not change.
> > > > If that is the case, do we want to possibly consider adding a method
> to
> > > the
> > > > `ConsumerRebalanceListener` for callbacks on `enforceRebalance`
> > actions?
> > > >
> > > > Thanks,
> > > > Bill
> > > >
> > > >
> > > >
> > > > On Tue, Feb 11, 2020 at 12:11 PM Konstantine Karantasis <
> > > > konstantine@confluent.io> wrote:
> > > >
> > > > > Hi Sophie.
> > > > >
> > > > > Thanks for the KIP. I liked how focused the proposal is. Also, its
> > > > > motivation is clear after carefully reading the KIP and its
> > references.
> > > > >
> > > > > Yet, I think it'd be a good idea to call out explicitly on the
> > Rejected
> > > > > Alternatives section that an automatic and periodic triggering of
> > > > > rebalances that would not require exposing this capability through
> > the
> > > > > Consumer interface does not cover your specific use cases and
> > therefore
> > > > is
> > > > > not chosen as a desired approach. Maybe, even consider mentioning
> > again
> > > > > here that this method is expected to be used to respond to system
> > > changes
> > > > > external to the consumer and its membership logic and is not
> proposed
> > > as
> > > > a
> > > > > way to resolve temporary imbalances due to membership changes that
> > > should
> > > > > inherently be resolved by the assignor logic itself with one or
> more
> > > > > consecutive rebalances.
> > > > >
> > > > > Also, in your javadoc I'd add some context similar to what someone
> > can
> > > > read
> > > > > on the KIP. Specifically where you say: "for example if some
> > condition
> > > > has
> > > > > changed that has implications for the partition assignment." I'd
> > rather
> > > > add
> > > > > something like "for example, if some condition external and
> invisible
> > > to
> > > > > the Consumer and its group membership has changed in ways that
> would
> > > > > justify a new partition assignment". That's just an example, feel
> > free
> > > to
> > > > > reword, but I believe that saying explicitly that this condition is
> > not
> > > > > visible to the consumer is useful to understand that this is not
> > > > necessary
> > > > > under normal circumstances.
> > > > >
> > > > > In Compatibility, Deprecation, and Migration Plan section I think
> > it's
> > > > > worth mentioning that this is a new feature that affects new
> > > > > implementations of the Consumer interface and any such new
> > > implementation
> > > > > should override the new method. Implementations that wish to
> upgrade
> > > to a
> > > > > newer version should be extended and recompiled, since no default
> > > > > implementation will be provided.
> > > > >
> > > > > Naming is hard here, if someone wants to emphasize the ad hoc and
> > > > irregular
> > > > > nature of this call. After some thought I'm fine with
> > > 'enforceRebalance'
> > > > > even if it could potentially be confused to a method that is
> supposed
> > > to
> > > > be
> > > > > called to remediate one or more previously unsuccessful rebalances
> > > (which
> > > > > is partly what StreamThread#enforceRebalance is used for). The
> best I
> > > > could
> > > > > think of was 'onRequestRebalance' but that's not perfect either.
> > > > >
> > > > > Best,
> > > > > Konstantine
> > > > >
> > > > >
> > > > > On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <
> > > sophie@confluent.io
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Thanks John. I took out the KafkaConsumer method and moved the
> > > javadocs
> > > > > > to the Consumer#enforceRebalance in the KIP -- hope you're happy
> :P
> > > > > >
> > > > > > Also, I wanted to point out one minor change to the current
> > proposal:
> > > > > make
> > > > > > this
> > > > > > a blocking call, which accepts a timeout and returns whether the
> > > > > rebalance
> > > > > > completed within the timeout. It will still reduce to a
> nonblocking
> > > > call
> > > > > if
> > > > > > a "zero"
> > > > > > timeout is supplied. I've updated the KIP accordingly.
> > > > > >
> > > > > > Let me know if there are any further concerns, else I'll call
> for a
> > > > vote.
> > > > > >
> > > > > > Cheers!
> > > > > > Sophie
> > > > > >
> > > > > > On Mon, Feb 10, 2020 at 12:47 PM John Roesler <
> vvcephei@apache.org
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Thanks Sophie,
> > > > > > >
> > > > > > > Sorry I didn't respond. I think your new method name sounds
> good.
> > > > > > >
> > > > > > > Regarding the interface vs implementation, I agree it's
> > confusing.
> > > > It's
> > > > > > > always bothered me that the interface redirects you to an
> > > > > implementation
> > > > > > > JavaDocs, but never enough for me to stop what I'm doing to fix
> > it.
> > > > > > > It's not a big deal either way, I just thought it was strange
> to
> > > > > propose
> > > > > > a
> > > > > > > "public interface" change, but not in terms of the actual
> > interface
> > > > > > class.
> > > > > > >
> > > > > > > It _is_ true that KafkaConsumer is also part of the public API,
> > but
> > > > > only
> > > > > > > really
> > > > > > > for the constructor. Any proposal to define a new "consumer
> > client"
> > > > API
> > > > > > > should be on the Consumer interface (which you said you plan to
> > do
> > > > > > anyway).
> > > > > > > I guess I brought it up because proposing an addition to
> Consumer
> > > > > implies
> > > > > > > it would be added to KafkaConsumer, but proposing an addition
> to
> > > > > > > KafkaConsumer does not necessarily imply it would also be added
> > to
> > > > > > > Consumer. Does that make sense?
> > > > > > >
> > > > > > > Anyway, thanks for updating the KIP.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > -John
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman wrote:
> > > > > > > > Since this doesn't seem too controversial, I'll probably call
> > > for a
> > > > > > vote
> > > > > > > by
> > > > > > > > end of day.
> > > > > > > > If there any further comments/questions/concerns, please let
> me
> > > > know!
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Sophie
> > > > > > > >
> > > > > > > > On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <
> > > > > > sophie@confluent.io
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks for the feedback! That's a good point about trying
> to
> > > > > prevent
> > > > > > > users
> > > > > > > > > from
> > > > > > > > > thinking they should use this API during normal processing
> > and
> > > > > > > clarifying
> > > > > > > > > when/why
> > > > > > > > > you might need it -- regardless of the method name, we
> should
> > > > > > > explicitly
> > > > > > > > > call this out
> > > > > > > > > in the javadocs.
> > > > > > > > >
> > > > > > > > > As for the method name, on reflection I agree that
> > > "rejoinGroup"
> > > > > does
> > > > > > > not
> > > > > > > > > seem to be
> > > > > > > > > appropriate. Of course that's what the consumer will
> actually
> > > be
> > > > > > doing,
> > > > > > > > > but that's just an
> > > > > > > > > implementation detail -- the name should reflect what the
> API
> > > is
> > > > > > doing,
> > > > > > > > > not how it does it
> > > > > > > > > (which can always change).
> > > > > > > > >
> > > > > > > > > How about "enforceRebalance"? This is stolen from the
> > > > StreamThread
> > > > > > > method
> > > > > > > > > that does
> > > > > > > > > exactly this (by unsubscribing) so it seems to fit. I'll
> > update
> > > > the
> > > > > > KIP
> > > > > > > > > with this unless anyone
> > > > > > > > > has another suggestion.
> > > > > > > > >
> > > > > > > > > Regarding the Consumer vs KafkaConsumer matter, I included
> > the
> > > > > > > > > KafkaConsumer method
> > > > > > > > > because that's where all the javadocs redirect to in the
> > > Consumer
> > > > > > > > > interface. Also, FWIW
> > > > > > > > > I'm pretty sure KafkaConsumer is also part of the public
> API
> > --
> > > > we
> > > > > > > would
> > > > > > > > > be adding a new
> > > > > > > > > method to both.
> > > > > > > > >
> > > > > > > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <
> > > vvcephei@apache.org
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hi all,
> > > > > > > > >>
> > > > > > > > >> Thanks for the well motivated KIP, Sophie. I had some
> > > > alternatives
> > > > > > in
> > > > > > > > >> mind, which
> > > > > > > > >> I won't even bother to relate because I feel like the
> > > motivation
> > > > > > made
> > > > > > > a
> > > > > > > > >> compelling
> > > > > > > > >> argument for the API as proposed.
> > > > > > > > >>
> > > > > > > > >> One very minor point you might as well fix is that the API
> > > > change
> > > > > is
> > > > > > > > >> targeted at
> > > > > > > > >> KafkaConsumer (the implementation), but should be targeted
> > at
> > > > > > > > >> Consumer (the interface).
> > > > > > > > >>
> > > > > > > > >> I agree with your discomfort about the name. Adding a
> > "rejoin"
> > > > > > method
> > > > > > > > >> seems strange
> > > > > > > > >> since there's no "join" method. Instead the way you join
> the
> > > > group
> > > > > > the
> > > > > > > > >> first time is just
> > > > > > > > >> by calling "subscribe". But "resubscribe" seems too
> indirect
> > > > from
> > > > > > what
> > > > > > > > >> we're really trying
> > > > > > > > >> to do, which is to trigger a rebalance by sending a new
> > > > JoinGroup
> > > > > > > request.
> > > > > > > > >>
> > > > > > > > >> Another angle is that we don't want the method to sound
> like
> > > > > > something
> > > > > > > > >> you should
> > > > > > > > >> be calling in normal circumstances, or people will be
> > > "tricked"
> > > > > into
> > > > > > > > >> calling it unnecessarily.
> > > > > > > > >>
> > > > > > > > >> So, I think "rejoinGroup" is fine, although a person
> _might_
> > > be
> > > > > > > forgiven
> > > > > > > > >> for thinking they
> > > > > > > > >> need to call it periodically or something. Did you
> consider
> > > > > > > > >> "triggerRebalance", which
> > > > > > > > >> sounds pretty advanced-ish, and accurately describes what
> > > > happens
> > > > > > when
> > > > > > > > >> you call it?
> > > > > > > > >>
> > > > > > > > >> All in all, the KIP sounds good to me, and I'm in favor.
> > > > > > > > >>
> > > > > > > > >> Thanks,
> > > > > > > > >> -John
> > > > > > > > >>
> > > > > > > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> > > > > > > > >> > This situation was discussed at length after a recent
> > talk I
> > > > > gave.
> > > > > > > This
> > > > > > > > >> KIP
> > > > > > > > >> > would be a great step towards increased availability and
> > in
> > > > > > > facilitating
> > > > > > > > >> > lightweight rebalances.
> > > > > > > > >> >
> > > > > > > > >> > anna
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <
> > > > > > > sophie@confluent.io>
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > Hi all,
> > > > > > > > >> > >
> > > > > > > > >> > > In light of some recent and upcoming rebalancing and
> > > > > > availability
> > > > > > > > >> > > improvements, it seems we have a need for explicitly
> > > > > triggering
> > > > > > a
> > > > > > > > >> consumer
> > > > > > > > >> > > group rebalance. Therefore I'd like to propose adding
> a
> > > new
> > > > > > > > >> > > rejoinGroup()method
> > > > > > > > >> > > to the Consumer client (better method name suggestions
> > are
> > > > > very
> > > > > > > > >> welcome).
> > > > > > > > >> > >
> > > > > > > > >> > > Please take a look at the KIP and let me know what you
> > > > think!
> > > > > > > > >> > >
> > > > > > > > >> > > KIP document:
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > > > > > > >> > >
> > > > > > > > >> > > JIRA:
> https://issues.apache.org/jira/browse/KAFKA-9525
> > > > > > > > >> > >
> > > > > > > > >> > > Cheers,
> > > > > > > > >> > > Sophie
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Boyang Chen <re...@gmail.com>.
Hey Sophie,

is the `enforceRebalance` a blocking call? Could we add a code sample to
the KIP on how this API should be used?

Returning a future instead of a boolean might be easier as we are allowing
consumer to make progress during rebalance after 429 IMHO.

Boyang


On Tue, Feb 11, 2020 at 1:17 PM Konstantine Karantasis <
konstantine@confluent.io> wrote:

> Thanks for the quick turnaround Sophie. My points have been addressed.
> I think the intended use is quite clear now.
>
> Best,
> Konstantine
>
>
> On Tue, Feb 11, 2020 at 12:57 PM Sophie Blee-Goldman <so...@confluent.io>
> wrote:
>
> > Konstantine,
> > Thanks for the feedback! I've updated the sections with your
> suggestions. I
> > agree
> > in particular that it's really important to make sure users don't call
> this
> > unnecessarily,
> >  or for the wrong reasons: to that end I also extended the javadocs to
> > specify that this
> > API is for when changes to the subscription userdata occur. Hopefully
> that
> > should make
> > its intended usage quite clear.
> >
> > Bill,
> > The rebalance triggered by this new API will be a "normal" rebalance, and
> > therefore
> > follow the existing listener semantics. For example a cooperative
> rebalance
> > will always
> > call onPartitionsAssigned, even if no partitions are actually moved.
> > An eager rebalance will still revoke all partitions first anyway.
> >
> > Thanks for the feedback!
> > Sophie
> >
> > On Tue, Feb 11, 2020 at 9:52 AM Bill Bejeck <bb...@gmail.com> wrote:
> >
> > > Hi Sophie,
> > >
> > > Thanks for the KIP, makes sense to me.
> > >
> > > One quick question, I'm not sure if it's relevant or not.
> > >
> > > If a user provides a `ConsumerRebalanceListener` and a rebalance is
> > > triggered from an `enforceRebalance`  call,
> > > it seems possible the listener won't get called since partition
> > assignments
> > > might not change.
> > > If that is the case, do we want to possibly consider adding a method to
> > the
> > > `ConsumerRebalanceListener` for callbacks on `enforceRebalance`
> actions?
> > >
> > > Thanks,
> > > Bill
> > >
> > >
> > >
> > > On Tue, Feb 11, 2020 at 12:11 PM Konstantine Karantasis <
> > > konstantine@confluent.io> wrote:
> > >
> > > > Hi Sophie.
> > > >
> > > > Thanks for the KIP. I liked how focused the proposal is. Also, its
> > > > motivation is clear after carefully reading the KIP and its
> references.
> > > >
> > > > Yet, I think it'd be a good idea to call out explicitly on the
> Rejected
> > > > Alternatives section that an automatic and periodic triggering of
> > > > rebalances that would not require exposing this capability through
> the
> > > > Consumer interface does not cover your specific use cases and
> therefore
> > > is
> > > > not chosen as a desired approach. Maybe, even consider mentioning
> again
> > > > here that this method is expected to be used to respond to system
> > changes
> > > > external to the consumer and its membership logic and is not proposed
> > as
> > > a
> > > > way to resolve temporary imbalances due to membership changes that
> > should
> > > > inherently be resolved by the assignor logic itself with one or more
> > > > consecutive rebalances.
> > > >
> > > > Also, in your javadoc I'd add some context similar to what someone
> can
> > > read
> > > > on the KIP. Specifically where you say: "for example if some
> condition
> > > has
> > > > changed that has implications for the partition assignment." I'd
> rather
> > > add
> > > > something like "for example, if some condition external and invisible
> > to
> > > > the Consumer and its group membership has changed in ways that would
> > > > justify a new partition assignment". That's just an example, feel
> free
> > to
> > > > reword, but I believe that saying explicitly that this condition is
> not
> > > > visible to the consumer is useful to understand that this is not
> > > necessary
> > > > under normal circumstances.
> > > >
> > > > In Compatibility, Deprecation, and Migration Plan section I think
> it's
> > > > worth mentioning that this is a new feature that affects new
> > > > implementations of the Consumer interface and any such new
> > implementation
> > > > should override the new method. Implementations that wish to upgrade
> > to a
> > > > newer version should be extended and recompiled, since no default
> > > > implementation will be provided.
> > > >
> > > > Naming is hard here, if someone wants to emphasize the ad hoc and
> > > irregular
> > > > nature of this call. After some thought I'm fine with
> > 'enforceRebalance'
> > > > even if it could potentially be confused to a method that is supposed
> > to
> > > be
> > > > called to remediate one or more previously unsuccessful rebalances
> > (which
> > > > is partly what StreamThread#enforceRebalance is used for). The best I
> > > could
> > > > think of was 'onRequestRebalance' but that's not perfect either.
> > > >
> > > > Best,
> > > > Konstantine
> > > >
> > > >
> > > > On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <
> > sophie@confluent.io
> > > >
> > > > wrote:
> > > >
> > > > > Thanks John. I took out the KafkaConsumer method and moved the
> > javadocs
> > > > > to the Consumer#enforceRebalance in the KIP -- hope you're happy :P
> > > > >
> > > > > Also, I wanted to point out one minor change to the current
> proposal:
> > > > make
> > > > > this
> > > > > a blocking call, which accepts a timeout and returns whether the
> > > > rebalance
> > > > > completed within the timeout. It will still reduce to a nonblocking
> > > call
> > > > if
> > > > > a "zero"
> > > > > timeout is supplied. I've updated the KIP accordingly.
> > > > >
> > > > > Let me know if there are any further concerns, else I'll call for a
> > > vote.
> > > > >
> > > > > Cheers!
> > > > > Sophie
> > > > >
> > > > > On Mon, Feb 10, 2020 at 12:47 PM John Roesler <vvcephei@apache.org
> >
> > > > wrote:
> > > > >
> > > > > > Thanks Sophie,
> > > > > >
> > > > > > Sorry I didn't respond. I think your new method name sounds good.
> > > > > >
> > > > > > Regarding the interface vs implementation, I agree it's
> confusing.
> > > It's
> > > > > > always bothered me that the interface redirects you to an
> > > > implementation
> > > > > > JavaDocs, but never enough for me to stop what I'm doing to fix
> it.
> > > > > > It's not a big deal either way, I just thought it was strange to
> > > > propose
> > > > > a
> > > > > > "public interface" change, but not in terms of the actual
> interface
> > > > > class.
> > > > > >
> > > > > > It _is_ true that KafkaConsumer is also part of the public API,
> but
> > > > only
> > > > > > really
> > > > > > for the constructor. Any proposal to define a new "consumer
> client"
> > > API
> > > > > > should be on the Consumer interface (which you said you plan to
> do
> > > > > anyway).
> > > > > > I guess I brought it up because proposing an addition to Consumer
> > > > implies
> > > > > > it would be added to KafkaConsumer, but proposing an addition to
> > > > > > KafkaConsumer does not necessarily imply it would also be added
> to
> > > > > > Consumer. Does that make sense?
> > > > > >
> > > > > > Anyway, thanks for updating the KIP.
> > > > > >
> > > > > > Thanks,
> > > > > > -John
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman wrote:
> > > > > > > Since this doesn't seem too controversial, I'll probably call
> > for a
> > > > > vote
> > > > > > by
> > > > > > > end of day.
> > > > > > > If there any further comments/questions/concerns, please let me
> > > know!
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Sophie
> > > > > > >
> > > > > > > On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <
> > > > > sophie@confluent.io
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks for the feedback! That's a good point about trying to
> > > > prevent
> > > > > > users
> > > > > > > > from
> > > > > > > > thinking they should use this API during normal processing
> and
> > > > > > clarifying
> > > > > > > > when/why
> > > > > > > > you might need it -- regardless of the method name, we should
> > > > > > explicitly
> > > > > > > > call this out
> > > > > > > > in the javadocs.
> > > > > > > >
> > > > > > > > As for the method name, on reflection I agree that
> > "rejoinGroup"
> > > > does
> > > > > > not
> > > > > > > > seem to be
> > > > > > > > appropriate. Of course that's what the consumer will actually
> > be
> > > > > doing,
> > > > > > > > but that's just an
> > > > > > > > implementation detail -- the name should reflect what the API
> > is
> > > > > doing,
> > > > > > > > not how it does it
> > > > > > > > (which can always change).
> > > > > > > >
> > > > > > > > How about "enforceRebalance"? This is stolen from the
> > > StreamThread
> > > > > > method
> > > > > > > > that does
> > > > > > > > exactly this (by unsubscribing) so it seems to fit. I'll
> update
> > > the
> > > > > KIP
> > > > > > > > with this unless anyone
> > > > > > > > has another suggestion.
> > > > > > > >
> > > > > > > > Regarding the Consumer vs KafkaConsumer matter, I included
> the
> > > > > > > > KafkaConsumer method
> > > > > > > > because that's where all the javadocs redirect to in the
> > Consumer
> > > > > > > > interface. Also, FWIW
> > > > > > > > I'm pretty sure KafkaConsumer is also part of the public API
> --
> > > we
> > > > > > would
> > > > > > > > be adding a new
> > > > > > > > method to both.
> > > > > > > >
> > > > > > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <
> > vvcephei@apache.org
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi all,
> > > > > > > >>
> > > > > > > >> Thanks for the well motivated KIP, Sophie. I had some
> > > alternatives
> > > > > in
> > > > > > > >> mind, which
> > > > > > > >> I won't even bother to relate because I feel like the
> > motivation
> > > > > made
> > > > > > a
> > > > > > > >> compelling
> > > > > > > >> argument for the API as proposed.
> > > > > > > >>
> > > > > > > >> One very minor point you might as well fix is that the API
> > > change
> > > > is
> > > > > > > >> targeted at
> > > > > > > >> KafkaConsumer (the implementation), but should be targeted
> at
> > > > > > > >> Consumer (the interface).
> > > > > > > >>
> > > > > > > >> I agree with your discomfort about the name. Adding a
> "rejoin"
> > > > > method
> > > > > > > >> seems strange
> > > > > > > >> since there's no "join" method. Instead the way you join the
> > > group
> > > > > the
> > > > > > > >> first time is just
> > > > > > > >> by calling "subscribe". But "resubscribe" seems too indirect
> > > from
> > > > > what
> > > > > > > >> we're really trying
> > > > > > > >> to do, which is to trigger a rebalance by sending a new
> > > JoinGroup
> > > > > > request.
> > > > > > > >>
> > > > > > > >> Another angle is that we don't want the method to sound like
> > > > > something
> > > > > > > >> you should
> > > > > > > >> be calling in normal circumstances, or people will be
> > "tricked"
> > > > into
> > > > > > > >> calling it unnecessarily.
> > > > > > > >>
> > > > > > > >> So, I think "rejoinGroup" is fine, although a person _might_
> > be
> > > > > > forgiven
> > > > > > > >> for thinking they
> > > > > > > >> need to call it periodically or something. Did you consider
> > > > > > > >> "triggerRebalance", which
> > > > > > > >> sounds pretty advanced-ish, and accurately describes what
> > > happens
> > > > > when
> > > > > > > >> you call it?
> > > > > > > >>
> > > > > > > >> All in all, the KIP sounds good to me, and I'm in favor.
> > > > > > > >>
> > > > > > > >> Thanks,
> > > > > > > >> -John
> > > > > > > >>
> > > > > > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> > > > > > > >> > This situation was discussed at length after a recent
> talk I
> > > > gave.
> > > > > > This
> > > > > > > >> KIP
> > > > > > > >> > would be a great step towards increased availability and
> in
> > > > > > facilitating
> > > > > > > >> > lightweight rebalances.
> > > > > > > >> >
> > > > > > > >> > anna
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <
> > > > > > sophie@confluent.io>
> > > > > > > >> > wrote:
> > > > > > > >> >
> > > > > > > >> > > Hi all,
> > > > > > > >> > >
> > > > > > > >> > > In light of some recent and upcoming rebalancing and
> > > > > availability
> > > > > > > >> > > improvements, it seems we have a need for explicitly
> > > > triggering
> > > > > a
> > > > > > > >> consumer
> > > > > > > >> > > group rebalance. Therefore I'd like to propose adding a
> > new
> > > > > > > >> > > rejoinGroup()method
> > > > > > > >> > > to the Consumer client (better method name suggestions
> are
> > > > very
> > > > > > > >> welcome).
> > > > > > > >> > >
> > > > > > > >> > > Please take a look at the KIP and let me know what you
> > > think!
> > > > > > > >> > >
> > > > > > > >> > > KIP document:
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > > > > > >> > >
> > > > > > > >> > > JIRA: https://issues.apache.org/jira/browse/KAFKA-9525
> > > > > > > >> > >
> > > > > > > >> > > Cheers,
> > > > > > > >> > > Sophie
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Konstantine Karantasis <ko...@confluent.io>.
Thanks for the quick turnaround Sophie. My points have been addressed.
I think the intended use is quite clear now.

Best,
Konstantine


On Tue, Feb 11, 2020 at 12:57 PM Sophie Blee-Goldman <so...@confluent.io>
wrote:

> Konstantine,
> Thanks for the feedback! I've updated the sections with your suggestions. I
> agree
> in particular that it's really important to make sure users don't call this
> unnecessarily,
>  or for the wrong reasons: to that end I also extended the javadocs to
> specify that this
> API is for when changes to the subscription userdata occur. Hopefully that
> should make
> its intended usage quite clear.
>
> Bill,
> The rebalance triggered by this new API will be a "normal" rebalance, and
> therefore
> follow the existing listener semantics. For example a cooperative rebalance
> will always
> call onPartitionsAssigned, even if no partitions are actually moved.
> An eager rebalance will still revoke all partitions first anyway.
>
> Thanks for the feedback!
> Sophie
>
> On Tue, Feb 11, 2020 at 9:52 AM Bill Bejeck <bb...@gmail.com> wrote:
>
> > Hi Sophie,
> >
> > Thanks for the KIP, makes sense to me.
> >
> > One quick question, I'm not sure if it's relevant or not.
> >
> > If a user provides a `ConsumerRebalanceListener` and a rebalance is
> > triggered from an `enforceRebalance`  call,
> > it seems possible the listener won't get called since partition
> assignments
> > might not change.
> > If that is the case, do we want to possibly consider adding a method to
> the
> > `ConsumerRebalanceListener` for callbacks on `enforceRebalance` actions?
> >
> > Thanks,
> > Bill
> >
> >
> >
> > On Tue, Feb 11, 2020 at 12:11 PM Konstantine Karantasis <
> > konstantine@confluent.io> wrote:
> >
> > > Hi Sophie.
> > >
> > > Thanks for the KIP. I liked how focused the proposal is. Also, its
> > > motivation is clear after carefully reading the KIP and its references.
> > >
> > > Yet, I think it'd be a good idea to call out explicitly on the Rejected
> > > Alternatives section that an automatic and periodic triggering of
> > > rebalances that would not require exposing this capability through the
> > > Consumer interface does not cover your specific use cases and therefore
> > is
> > > not chosen as a desired approach. Maybe, even consider mentioning again
> > > here that this method is expected to be used to respond to system
> changes
> > > external to the consumer and its membership logic and is not proposed
> as
> > a
> > > way to resolve temporary imbalances due to membership changes that
> should
> > > inherently be resolved by the assignor logic itself with one or more
> > > consecutive rebalances.
> > >
> > > Also, in your javadoc I'd add some context similar to what someone can
> > read
> > > on the KIP. Specifically where you say: "for example if some condition
> > has
> > > changed that has implications for the partition assignment." I'd rather
> > add
> > > something like "for example, if some condition external and invisible
> to
> > > the Consumer and its group membership has changed in ways that would
> > > justify a new partition assignment". That's just an example, feel free
> to
> > > reword, but I believe that saying explicitly that this condition is not
> > > visible to the consumer is useful to understand that this is not
> > necessary
> > > under normal circumstances.
> > >
> > > In Compatibility, Deprecation, and Migration Plan section I think it's
> > > worth mentioning that this is a new feature that affects new
> > > implementations of the Consumer interface and any such new
> implementation
> > > should override the new method. Implementations that wish to upgrade
> to a
> > > newer version should be extended and recompiled, since no default
> > > implementation will be provided.
> > >
> > > Naming is hard here, if someone wants to emphasize the ad hoc and
> > irregular
> > > nature of this call. After some thought I'm fine with
> 'enforceRebalance'
> > > even if it could potentially be confused to a method that is supposed
> to
> > be
> > > called to remediate one or more previously unsuccessful rebalances
> (which
> > > is partly what StreamThread#enforceRebalance is used for). The best I
> > could
> > > think of was 'onRequestRebalance' but that's not perfect either.
> > >
> > > Best,
> > > Konstantine
> > >
> > >
> > > On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <
> sophie@confluent.io
> > >
> > > wrote:
> > >
> > > > Thanks John. I took out the KafkaConsumer method and moved the
> javadocs
> > > > to the Consumer#enforceRebalance in the KIP -- hope you're happy :P
> > > >
> > > > Also, I wanted to point out one minor change to the current proposal:
> > > make
> > > > this
> > > > a blocking call, which accepts a timeout and returns whether the
> > > rebalance
> > > > completed within the timeout. It will still reduce to a nonblocking
> > call
> > > if
> > > > a "zero"
> > > > timeout is supplied. I've updated the KIP accordingly.
> > > >
> > > > Let me know if there are any further concerns, else I'll call for a
> > vote.
> > > >
> > > > Cheers!
> > > > Sophie
> > > >
> > > > On Mon, Feb 10, 2020 at 12:47 PM John Roesler <vv...@apache.org>
> > > wrote:
> > > >
> > > > > Thanks Sophie,
> > > > >
> > > > > Sorry I didn't respond. I think your new method name sounds good.
> > > > >
> > > > > Regarding the interface vs implementation, I agree it's confusing.
> > It's
> > > > > always bothered me that the interface redirects you to an
> > > implementation
> > > > > JavaDocs, but never enough for me to stop what I'm doing to fix it.
> > > > > It's not a big deal either way, I just thought it was strange to
> > > propose
> > > > a
> > > > > "public interface" change, but not in terms of the actual interface
> > > > class.
> > > > >
> > > > > It _is_ true that KafkaConsumer is also part of the public API, but
> > > only
> > > > > really
> > > > > for the constructor. Any proposal to define a new "consumer client"
> > API
> > > > > should be on the Consumer interface (which you said you plan to do
> > > > anyway).
> > > > > I guess I brought it up because proposing an addition to Consumer
> > > implies
> > > > > it would be added to KafkaConsumer, but proposing an addition to
> > > > > KafkaConsumer does not necessarily imply it would also be added to
> > > > > Consumer. Does that make sense?
> > > > >
> > > > > Anyway, thanks for updating the KIP.
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > >
> > > > > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman wrote:
> > > > > > Since this doesn't seem too controversial, I'll probably call
> for a
> > > > vote
> > > > > by
> > > > > > end of day.
> > > > > > If there any further comments/questions/concerns, please let me
> > know!
> > > > > >
> > > > > > Thanks,
> > > > > > Sophie
> > > > > >
> > > > > > On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <
> > > > sophie@confluent.io
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks for the feedback! That's a good point about trying to
> > > prevent
> > > > > users
> > > > > > > from
> > > > > > > thinking they should use this API during normal processing and
> > > > > clarifying
> > > > > > > when/why
> > > > > > > you might need it -- regardless of the method name, we should
> > > > > explicitly
> > > > > > > call this out
> > > > > > > in the javadocs.
> > > > > > >
> > > > > > > As for the method name, on reflection I agree that
> "rejoinGroup"
> > > does
> > > > > not
> > > > > > > seem to be
> > > > > > > appropriate. Of course that's what the consumer will actually
> be
> > > > doing,
> > > > > > > but that's just an
> > > > > > > implementation detail -- the name should reflect what the API
> is
> > > > doing,
> > > > > > > not how it does it
> > > > > > > (which can always change).
> > > > > > >
> > > > > > > How about "enforceRebalance"? This is stolen from the
> > StreamThread
> > > > > method
> > > > > > > that does
> > > > > > > exactly this (by unsubscribing) so it seems to fit. I'll update
> > the
> > > > KIP
> > > > > > > with this unless anyone
> > > > > > > has another suggestion.
> > > > > > >
> > > > > > > Regarding the Consumer vs KafkaConsumer matter, I included the
> > > > > > > KafkaConsumer method
> > > > > > > because that's where all the javadocs redirect to in the
> Consumer
> > > > > > > interface. Also, FWIW
> > > > > > > I'm pretty sure KafkaConsumer is also part of the public API --
> > we
> > > > > would
> > > > > > > be adding a new
> > > > > > > method to both.
> > > > > > >
> > > > > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <
> vvcephei@apache.org
> > >
> > > > > wrote:
> > > > > > >
> > > > > > >> Hi all,
> > > > > > >>
> > > > > > >> Thanks for the well motivated KIP, Sophie. I had some
> > alternatives
> > > > in
> > > > > > >> mind, which
> > > > > > >> I won't even bother to relate because I feel like the
> motivation
> > > > made
> > > > > a
> > > > > > >> compelling
> > > > > > >> argument for the API as proposed.
> > > > > > >>
> > > > > > >> One very minor point you might as well fix is that the API
> > change
> > > is
> > > > > > >> targeted at
> > > > > > >> KafkaConsumer (the implementation), but should be targeted at
> > > > > > >> Consumer (the interface).
> > > > > > >>
> > > > > > >> I agree with your discomfort about the name. Adding a "rejoin"
> > > > method
> > > > > > >> seems strange
> > > > > > >> since there's no "join" method. Instead the way you join the
> > group
> > > > the
> > > > > > >> first time is just
> > > > > > >> by calling "subscribe". But "resubscribe" seems too indirect
> > from
> > > > what
> > > > > > >> we're really trying
> > > > > > >> to do, which is to trigger a rebalance by sending a new
> > JoinGroup
> > > > > request.
> > > > > > >>
> > > > > > >> Another angle is that we don't want the method to sound like
> > > > something
> > > > > > >> you should
> > > > > > >> be calling in normal circumstances, or people will be
> "tricked"
> > > into
> > > > > > >> calling it unnecessarily.
> > > > > > >>
> > > > > > >> So, I think "rejoinGroup" is fine, although a person _might_
> be
> > > > > forgiven
> > > > > > >> for thinking they
> > > > > > >> need to call it periodically or something. Did you consider
> > > > > > >> "triggerRebalance", which
> > > > > > >> sounds pretty advanced-ish, and accurately describes what
> > happens
> > > > when
> > > > > > >> you call it?
> > > > > > >>
> > > > > > >> All in all, the KIP sounds good to me, and I'm in favor.
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >> -John
> > > > > > >>
> > > > > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> > > > > > >> > This situation was discussed at length after a recent talk I
> > > gave.
> > > > > This
> > > > > > >> KIP
> > > > > > >> > would be a great step towards increased availability and in
> > > > > facilitating
> > > > > > >> > lightweight rebalances.
> > > > > > >> >
> > > > > > >> > anna
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <
> > > > > sophie@confluent.io>
> > > > > > >> > wrote:
> > > > > > >> >
> > > > > > >> > > Hi all,
> > > > > > >> > >
> > > > > > >> > > In light of some recent and upcoming rebalancing and
> > > > availability
> > > > > > >> > > improvements, it seems we have a need for explicitly
> > > triggering
> > > > a
> > > > > > >> consumer
> > > > > > >> > > group rebalance. Therefore I'd like to propose adding a
> new
> > > > > > >> > > rejoinGroup()method
> > > > > > >> > > to the Consumer client (better method name suggestions are
> > > very
> > > > > > >> welcome).
> > > > > > >> > >
> > > > > > >> > > Please take a look at the KIP and let me know what you
> > think!
> > > > > > >> > >
> > > > > > >> > > KIP document:
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > > > > >> > >
> > > > > > >> > > JIRA: https://issues.apache.org/jira/browse/KAFKA-9525
> > > > > > >> > >
> > > > > > >> > > Cheers,
> > > > > > >> > > Sophie
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Konstantine,
Thanks for the feedback! I've updated the sections with your suggestions. I
agree
in particular that it's really important to make sure users don't call this
unnecessarily,
 or for the wrong reasons: to that end I also extended the javadocs to
specify that this
API is for when changes to the subscription userdata occur. Hopefully that
should make
its intended usage quite clear.

Bill,
The rebalance triggered by this new API will be a "normal" rebalance, and
therefore
follow the existing listener semantics. For example a cooperative rebalance
will always
call onPartitionsAssigned, even if no partitions are actually moved.
An eager rebalance will still revoke all partitions first anyway.

Thanks for the feedback!
Sophie

On Tue, Feb 11, 2020 at 9:52 AM Bill Bejeck <bb...@gmail.com> wrote:

> Hi Sophie,
>
> Thanks for the KIP, makes sense to me.
>
> One quick question, I'm not sure if it's relevant or not.
>
> If a user provides a `ConsumerRebalanceListener` and a rebalance is
> triggered from an `enforceRebalance`  call,
> it seems possible the listener won't get called since partition assignments
> might not change.
> If that is the case, do we want to possibly consider adding a method to the
> `ConsumerRebalanceListener` for callbacks on `enforceRebalance` actions?
>
> Thanks,
> Bill
>
>
>
> On Tue, Feb 11, 2020 at 12:11 PM Konstantine Karantasis <
> konstantine@confluent.io> wrote:
>
> > Hi Sophie.
> >
> > Thanks for the KIP. I liked how focused the proposal is. Also, its
> > motivation is clear after carefully reading the KIP and its references.
> >
> > Yet, I think it'd be a good idea to call out explicitly on the Rejected
> > Alternatives section that an automatic and periodic triggering of
> > rebalances that would not require exposing this capability through the
> > Consumer interface does not cover your specific use cases and therefore
> is
> > not chosen as a desired approach. Maybe, even consider mentioning again
> > here that this method is expected to be used to respond to system changes
> > external to the consumer and its membership logic and is not proposed as
> a
> > way to resolve temporary imbalances due to membership changes that should
> > inherently be resolved by the assignor logic itself with one or more
> > consecutive rebalances.
> >
> > Also, in your javadoc I'd add some context similar to what someone can
> read
> > on the KIP. Specifically where you say: "for example if some condition
> has
> > changed that has implications for the partition assignment." I'd rather
> add
> > something like "for example, if some condition external and invisible to
> > the Consumer and its group membership has changed in ways that would
> > justify a new partition assignment". That's just an example, feel free to
> > reword, but I believe that saying explicitly that this condition is not
> > visible to the consumer is useful to understand that this is not
> necessary
> > under normal circumstances.
> >
> > In Compatibility, Deprecation, and Migration Plan section I think it's
> > worth mentioning that this is a new feature that affects new
> > implementations of the Consumer interface and any such new implementation
> > should override the new method. Implementations that wish to upgrade to a
> > newer version should be extended and recompiled, since no default
> > implementation will be provided.
> >
> > Naming is hard here, if someone wants to emphasize the ad hoc and
> irregular
> > nature of this call. After some thought I'm fine with 'enforceRebalance'
> > even if it could potentially be confused to a method that is supposed to
> be
> > called to remediate one or more previously unsuccessful rebalances (which
> > is partly what StreamThread#enforceRebalance is used for). The best I
> could
> > think of was 'onRequestRebalance' but that's not perfect either.
> >
> > Best,
> > Konstantine
> >
> >
> > On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <sophie@confluent.io
> >
> > wrote:
> >
> > > Thanks John. I took out the KafkaConsumer method and moved the javadocs
> > > to the Consumer#enforceRebalance in the KIP -- hope you're happy :P
> > >
> > > Also, I wanted to point out one minor change to the current proposal:
> > make
> > > this
> > > a blocking call, which accepts a timeout and returns whether the
> > rebalance
> > > completed within the timeout. It will still reduce to a nonblocking
> call
> > if
> > > a "zero"
> > > timeout is supplied. I've updated the KIP accordingly.
> > >
> > > Let me know if there are any further concerns, else I'll call for a
> vote.
> > >
> > > Cheers!
> > > Sophie
> > >
> > > On Mon, Feb 10, 2020 at 12:47 PM John Roesler <vv...@apache.org>
> > wrote:
> > >
> > > > Thanks Sophie,
> > > >
> > > > Sorry I didn't respond. I think your new method name sounds good.
> > > >
> > > > Regarding the interface vs implementation, I agree it's confusing.
> It's
> > > > always bothered me that the interface redirects you to an
> > implementation
> > > > JavaDocs, but never enough for me to stop what I'm doing to fix it.
> > > > It's not a big deal either way, I just thought it was strange to
> > propose
> > > a
> > > > "public interface" change, but not in terms of the actual interface
> > > class.
> > > >
> > > > It _is_ true that KafkaConsumer is also part of the public API, but
> > only
> > > > really
> > > > for the constructor. Any proposal to define a new "consumer client"
> API
> > > > should be on the Consumer interface (which you said you plan to do
> > > anyway).
> > > > I guess I brought it up because proposing an addition to Consumer
> > implies
> > > > it would be added to KafkaConsumer, but proposing an addition to
> > > > KafkaConsumer does not necessarily imply it would also be added to
> > > > Consumer. Does that make sense?
> > > >
> > > > Anyway, thanks for updating the KIP.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > >
> > > > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman wrote:
> > > > > Since this doesn't seem too controversial, I'll probably call for a
> > > vote
> > > > by
> > > > > end of day.
> > > > > If there any further comments/questions/concerns, please let me
> know!
> > > > >
> > > > > Thanks,
> > > > > Sophie
> > > > >
> > > > > On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <
> > > sophie@confluent.io
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Thanks for the feedback! That's a good point about trying to
> > prevent
> > > > users
> > > > > > from
> > > > > > thinking they should use this API during normal processing and
> > > > clarifying
> > > > > > when/why
> > > > > > you might need it -- regardless of the method name, we should
> > > > explicitly
> > > > > > call this out
> > > > > > in the javadocs.
> > > > > >
> > > > > > As for the method name, on reflection I agree that "rejoinGroup"
> > does
> > > > not
> > > > > > seem to be
> > > > > > appropriate. Of course that's what the consumer will actually be
> > > doing,
> > > > > > but that's just an
> > > > > > implementation detail -- the name should reflect what the API is
> > > doing,
> > > > > > not how it does it
> > > > > > (which can always change).
> > > > > >
> > > > > > How about "enforceRebalance"? This is stolen from the
> StreamThread
> > > > method
> > > > > > that does
> > > > > > exactly this (by unsubscribing) so it seems to fit. I'll update
> the
> > > KIP
> > > > > > with this unless anyone
> > > > > > has another suggestion.
> > > > > >
> > > > > > Regarding the Consumer vs KafkaConsumer matter, I included the
> > > > > > KafkaConsumer method
> > > > > > because that's where all the javadocs redirect to in the Consumer
> > > > > > interface. Also, FWIW
> > > > > > I'm pretty sure KafkaConsumer is also part of the public API --
> we
> > > > would
> > > > > > be adding a new
> > > > > > method to both.
> > > > > >
> > > > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <vvcephei@apache.org
> >
> > > > wrote:
> > > > > >
> > > > > >> Hi all,
> > > > > >>
> > > > > >> Thanks for the well motivated KIP, Sophie. I had some
> alternatives
> > > in
> > > > > >> mind, which
> > > > > >> I won't even bother to relate because I feel like the motivation
> > > made
> > > > a
> > > > > >> compelling
> > > > > >> argument for the API as proposed.
> > > > > >>
> > > > > >> One very minor point you might as well fix is that the API
> change
> > is
> > > > > >> targeted at
> > > > > >> KafkaConsumer (the implementation), but should be targeted at
> > > > > >> Consumer (the interface).
> > > > > >>
> > > > > >> I agree with your discomfort about the name. Adding a "rejoin"
> > > method
> > > > > >> seems strange
> > > > > >> since there's no "join" method. Instead the way you join the
> group
> > > the
> > > > > >> first time is just
> > > > > >> by calling "subscribe". But "resubscribe" seems too indirect
> from
> > > what
> > > > > >> we're really trying
> > > > > >> to do, which is to trigger a rebalance by sending a new
> JoinGroup
> > > > request.
> > > > > >>
> > > > > >> Another angle is that we don't want the method to sound like
> > > something
> > > > > >> you should
> > > > > >> be calling in normal circumstances, or people will be "tricked"
> > into
> > > > > >> calling it unnecessarily.
> > > > > >>
> > > > > >> So, I think "rejoinGroup" is fine, although a person _might_ be
> > > > forgiven
> > > > > >> for thinking they
> > > > > >> need to call it periodically or something. Did you consider
> > > > > >> "triggerRebalance", which
> > > > > >> sounds pretty advanced-ish, and accurately describes what
> happens
> > > when
> > > > > >> you call it?
> > > > > >>
> > > > > >> All in all, the KIP sounds good to me, and I'm in favor.
> > > > > >>
> > > > > >> Thanks,
> > > > > >> -John
> > > > > >>
> > > > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> > > > > >> > This situation was discussed at length after a recent talk I
> > gave.
> > > > This
> > > > > >> KIP
> > > > > >> > would be a great step towards increased availability and in
> > > > facilitating
> > > > > >> > lightweight rebalances.
> > > > > >> >
> > > > > >> > anna
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <
> > > > sophie@confluent.io>
> > > > > >> > wrote:
> > > > > >> >
> > > > > >> > > Hi all,
> > > > > >> > >
> > > > > >> > > In light of some recent and upcoming rebalancing and
> > > availability
> > > > > >> > > improvements, it seems we have a need for explicitly
> > triggering
> > > a
> > > > > >> consumer
> > > > > >> > > group rebalance. Therefore I'd like to propose adding a new
> > > > > >> > > rejoinGroup()method
> > > > > >> > > to the Consumer client (better method name suggestions are
> > very
> > > > > >> welcome).
> > > > > >> > >
> > > > > >> > > Please take a look at the KIP and let me know what you
> think!
> > > > > >> > >
> > > > > >> > > KIP document:
> > > > > >> > >
> > > > > >> > >
> > > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > > > >> > >
> > > > > >> > > JIRA: https://issues.apache.org/jira/browse/KAFKA-9525
> > > > > >> > >
> > > > > >> > > Cheers,
> > > > > >> > > Sophie
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Bill Bejeck <bb...@gmail.com>.
Hi Sophie,

Thanks for the KIP, makes sense to me.

One quick question, I'm not sure if it's relevant or not.

If a user provides a `ConsumerRebalanceListener` and a rebalance is
triggered from an `enforceRebalance`  call,
it seems possible the listener won't get called since partition assignments
might not change.
If that is the case, do we want to possibly consider adding a method to the
`ConsumerRebalanceListener` for callbacks on `enforceRebalance` actions?

Thanks,
Bill



On Tue, Feb 11, 2020 at 12:11 PM Konstantine Karantasis <
konstantine@confluent.io> wrote:

> Hi Sophie.
>
> Thanks for the KIP. I liked how focused the proposal is. Also, its
> motivation is clear after carefully reading the KIP and its references.
>
> Yet, I think it'd be a good idea to call out explicitly on the Rejected
> Alternatives section that an automatic and periodic triggering of
> rebalances that would not require exposing this capability through the
> Consumer interface does not cover your specific use cases and therefore is
> not chosen as a desired approach. Maybe, even consider mentioning again
> here that this method is expected to be used to respond to system changes
> external to the consumer and its membership logic and is not proposed as a
> way to resolve temporary imbalances due to membership changes that should
> inherently be resolved by the assignor logic itself with one or more
> consecutive rebalances.
>
> Also, in your javadoc I'd add some context similar to what someone can read
> on the KIP. Specifically where you say: "for example if some condition has
> changed that has implications for the partition assignment." I'd rather add
> something like "for example, if some condition external and invisible to
> the Consumer and its group membership has changed in ways that would
> justify a new partition assignment". That's just an example, feel free to
> reword, but I believe that saying explicitly that this condition is not
> visible to the consumer is useful to understand that this is not necessary
> under normal circumstances.
>
> In Compatibility, Deprecation, and Migration Plan section I think it's
> worth mentioning that this is a new feature that affects new
> implementations of the Consumer interface and any such new implementation
> should override the new method. Implementations that wish to upgrade to a
> newer version should be extended and recompiled, since no default
> implementation will be provided.
>
> Naming is hard here, if someone wants to emphasize the ad hoc and irregular
> nature of this call. After some thought I'm fine with 'enforceRebalance'
> even if it could potentially be confused to a method that is supposed to be
> called to remediate one or more previously unsuccessful rebalances (which
> is partly what StreamThread#enforceRebalance is used for). The best I could
> think of was 'onRequestRebalance' but that's not perfect either.
>
> Best,
> Konstantine
>
>
> On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <so...@confluent.io>
> wrote:
>
> > Thanks John. I took out the KafkaConsumer method and moved the javadocs
> > to the Consumer#enforceRebalance in the KIP -- hope you're happy :P
> >
> > Also, I wanted to point out one minor change to the current proposal:
> make
> > this
> > a blocking call, which accepts a timeout and returns whether the
> rebalance
> > completed within the timeout. It will still reduce to a nonblocking call
> if
> > a "zero"
> > timeout is supplied. I've updated the KIP accordingly.
> >
> > Let me know if there are any further concerns, else I'll call for a vote.
> >
> > Cheers!
> > Sophie
> >
> > On Mon, Feb 10, 2020 at 12:47 PM John Roesler <vv...@apache.org>
> wrote:
> >
> > > Thanks Sophie,
> > >
> > > Sorry I didn't respond. I think your new method name sounds good.
> > >
> > > Regarding the interface vs implementation, I agree it's confusing. It's
> > > always bothered me that the interface redirects you to an
> implementation
> > > JavaDocs, but never enough for me to stop what I'm doing to fix it.
> > > It's not a big deal either way, I just thought it was strange to
> propose
> > a
> > > "public interface" change, but not in terms of the actual interface
> > class.
> > >
> > > It _is_ true that KafkaConsumer is also part of the public API, but
> only
> > > really
> > > for the constructor. Any proposal to define a new "consumer client" API
> > > should be on the Consumer interface (which you said you plan to do
> > anyway).
> > > I guess I brought it up because proposing an addition to Consumer
> implies
> > > it would be added to KafkaConsumer, but proposing an addition to
> > > KafkaConsumer does not necessarily imply it would also be added to
> > > Consumer. Does that make sense?
> > >
> > > Anyway, thanks for updating the KIP.
> > >
> > > Thanks,
> > > -John
> > >
> > >
> > > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman wrote:
> > > > Since this doesn't seem too controversial, I'll probably call for a
> > vote
> > > by
> > > > end of day.
> > > > If there any further comments/questions/concerns, please let me know!
> > > >
> > > > Thanks,
> > > > Sophie
> > > >
> > > > On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <
> > sophie@confluent.io
> > > >
> > > > wrote:
> > > >
> > > > > Thanks for the feedback! That's a good point about trying to
> prevent
> > > users
> > > > > from
> > > > > thinking they should use this API during normal processing and
> > > clarifying
> > > > > when/why
> > > > > you might need it -- regardless of the method name, we should
> > > explicitly
> > > > > call this out
> > > > > in the javadocs.
> > > > >
> > > > > As for the method name, on reflection I agree that "rejoinGroup"
> does
> > > not
> > > > > seem to be
> > > > > appropriate. Of course that's what the consumer will actually be
> > doing,
> > > > > but that's just an
> > > > > implementation detail -- the name should reflect what the API is
> > doing,
> > > > > not how it does it
> > > > > (which can always change).
> > > > >
> > > > > How about "enforceRebalance"? This is stolen from the StreamThread
> > > method
> > > > > that does
> > > > > exactly this (by unsubscribing) so it seems to fit. I'll update the
> > KIP
> > > > > with this unless anyone
> > > > > has another suggestion.
> > > > >
> > > > > Regarding the Consumer vs KafkaConsumer matter, I included the
> > > > > KafkaConsumer method
> > > > > because that's where all the javadocs redirect to in the Consumer
> > > > > interface. Also, FWIW
> > > > > I'm pretty sure KafkaConsumer is also part of the public API -- we
> > > would
> > > > > be adding a new
> > > > > method to both.
> > > > >
> > > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <vv...@apache.org>
> > > wrote:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> Thanks for the well motivated KIP, Sophie. I had some alternatives
> > in
> > > > >> mind, which
> > > > >> I won't even bother to relate because I feel like the motivation
> > made
> > > a
> > > > >> compelling
> > > > >> argument for the API as proposed.
> > > > >>
> > > > >> One very minor point you might as well fix is that the API change
> is
> > > > >> targeted at
> > > > >> KafkaConsumer (the implementation), but should be targeted at
> > > > >> Consumer (the interface).
> > > > >>
> > > > >> I agree with your discomfort about the name. Adding a "rejoin"
> > method
> > > > >> seems strange
> > > > >> since there's no "join" method. Instead the way you join the group
> > the
> > > > >> first time is just
> > > > >> by calling "subscribe". But "resubscribe" seems too indirect from
> > what
> > > > >> we're really trying
> > > > >> to do, which is to trigger a rebalance by sending a new JoinGroup
> > > request.
> > > > >>
> > > > >> Another angle is that we don't want the method to sound like
> > something
> > > > >> you should
> > > > >> be calling in normal circumstances, or people will be "tricked"
> into
> > > > >> calling it unnecessarily.
> > > > >>
> > > > >> So, I think "rejoinGroup" is fine, although a person _might_ be
> > > forgiven
> > > > >> for thinking they
> > > > >> need to call it periodically or something. Did you consider
> > > > >> "triggerRebalance", which
> > > > >> sounds pretty advanced-ish, and accurately describes what happens
> > when
> > > > >> you call it?
> > > > >>
> > > > >> All in all, the KIP sounds good to me, and I'm in favor.
> > > > >>
> > > > >> Thanks,
> > > > >> -John
> > > > >>
> > > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> > > > >> > This situation was discussed at length after a recent talk I
> gave.
> > > This
> > > > >> KIP
> > > > >> > would be a great step towards increased availability and in
> > > facilitating
> > > > >> > lightweight rebalances.
> > > > >> >
> > > > >> > anna
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <
> > > sophie@confluent.io>
> > > > >> > wrote:
> > > > >> >
> > > > >> > > Hi all,
> > > > >> > >
> > > > >> > > In light of some recent and upcoming rebalancing and
> > availability
> > > > >> > > improvements, it seems we have a need for explicitly
> triggering
> > a
> > > > >> consumer
> > > > >> > > group rebalance. Therefore I'd like to propose adding a new
> > > > >> > > rejoinGroup()method
> > > > >> > > to the Consumer client (better method name suggestions are
> very
> > > > >> welcome).
> > > > >> > >
> > > > >> > > Please take a look at the KIP and let me know what you think!
> > > > >> > >
> > > > >> > > KIP document:
> > > > >> > >
> > > > >> > >
> > > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > > >> > >
> > > > >> > > JIRA: https://issues.apache.org/jira/browse/KAFKA-9525
> > > > >> > >
> > > > >> > > Cheers,
> > > > >> > > Sophie
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Konstantine Karantasis <ko...@confluent.io>.
Hi Sophie.

Thanks for the KIP. I liked how focused the proposal is. Also, its
motivation is clear after carefully reading the KIP and its references.

Yet, I think it'd be a good idea to call out explicitly on the Rejected
Alternatives section that an automatic and periodic triggering of
rebalances that would not require exposing this capability through the
Consumer interface does not cover your specific use cases and therefore is
not chosen as a desired approach. Maybe, even consider mentioning again
here that this method is expected to be used to respond to system changes
external to the consumer and its membership logic and is not proposed as a
way to resolve temporary imbalances due to membership changes that should
inherently be resolved by the assignor logic itself with one or more
consecutive rebalances.

Also, in your javadoc I'd add some context similar to what someone can read
on the KIP. Specifically where you say: "for example if some condition has
changed that has implications for the partition assignment." I'd rather add
something like "for example, if some condition external and invisible to
the Consumer and its group membership has changed in ways that would
justify a new partition assignment". That's just an example, feel free to
reword, but I believe that saying explicitly that this condition is not
visible to the consumer is useful to understand that this is not necessary
under normal circumstances.

In Compatibility, Deprecation, and Migration Plan section I think it's
worth mentioning that this is a new feature that affects new
implementations of the Consumer interface and any such new implementation
should override the new method. Implementations that wish to upgrade to a
newer version should be extended and recompiled, since no default
implementation will be provided.

Naming is hard here, if someone wants to emphasize the ad hoc and irregular
nature of this call. After some thought I'm fine with 'enforceRebalance'
even if it could potentially be confused to a method that is supposed to be
called to remediate one or more previously unsuccessful rebalances (which
is partly what StreamThread#enforceRebalance is used for). The best I could
think of was 'onRequestRebalance' but that's not perfect either.

Best,
Konstantine


On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <so...@confluent.io>
wrote:

> Thanks John. I took out the KafkaConsumer method and moved the javadocs
> to the Consumer#enforceRebalance in the KIP -- hope you're happy :P
>
> Also, I wanted to point out one minor change to the current proposal: make
> this
> a blocking call, which accepts a timeout and returns whether the rebalance
> completed within the timeout. It will still reduce to a nonblocking call if
> a "zero"
> timeout is supplied. I've updated the KIP accordingly.
>
> Let me know if there are any further concerns, else I'll call for a vote.
>
> Cheers!
> Sophie
>
> On Mon, Feb 10, 2020 at 12:47 PM John Roesler <vv...@apache.org> wrote:
>
> > Thanks Sophie,
> >
> > Sorry I didn't respond. I think your new method name sounds good.
> >
> > Regarding the interface vs implementation, I agree it's confusing. It's
> > always bothered me that the interface redirects you to an implementation
> > JavaDocs, but never enough for me to stop what I'm doing to fix it.
> > It's not a big deal either way, I just thought it was strange to propose
> a
> > "public interface" change, but not in terms of the actual interface
> class.
> >
> > It _is_ true that KafkaConsumer is also part of the public API, but only
> > really
> > for the constructor. Any proposal to define a new "consumer client" API
> > should be on the Consumer interface (which you said you plan to do
> anyway).
> > I guess I brought it up because proposing an addition to Consumer implies
> > it would be added to KafkaConsumer, but proposing an addition to
> > KafkaConsumer does not necessarily imply it would also be added to
> > Consumer. Does that make sense?
> >
> > Anyway, thanks for updating the KIP.
> >
> > Thanks,
> > -John
> >
> >
> > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman wrote:
> > > Since this doesn't seem too controversial, I'll probably call for a
> vote
> > by
> > > end of day.
> > > If there any further comments/questions/concerns, please let me know!
> > >
> > > Thanks,
> > > Sophie
> > >
> > > On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <
> sophie@confluent.io
> > >
> > > wrote:
> > >
> > > > Thanks for the feedback! That's a good point about trying to prevent
> > users
> > > > from
> > > > thinking they should use this API during normal processing and
> > clarifying
> > > > when/why
> > > > you might need it -- regardless of the method name, we should
> > explicitly
> > > > call this out
> > > > in the javadocs.
> > > >
> > > > As for the method name, on reflection I agree that "rejoinGroup" does
> > not
> > > > seem to be
> > > > appropriate. Of course that's what the consumer will actually be
> doing,
> > > > but that's just an
> > > > implementation detail -- the name should reflect what the API is
> doing,
> > > > not how it does it
> > > > (which can always change).
> > > >
> > > > How about "enforceRebalance"? This is stolen from the StreamThread
> > method
> > > > that does
> > > > exactly this (by unsubscribing) so it seems to fit. I'll update the
> KIP
> > > > with this unless anyone
> > > > has another suggestion.
> > > >
> > > > Regarding the Consumer vs KafkaConsumer matter, I included the
> > > > KafkaConsumer method
> > > > because that's where all the javadocs redirect to in the Consumer
> > > > interface. Also, FWIW
> > > > I'm pretty sure KafkaConsumer is also part of the public API -- we
> > would
> > > > be adding a new
> > > > method to both.
> > > >
> > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <vv...@apache.org>
> > wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> Thanks for the well motivated KIP, Sophie. I had some alternatives
> in
> > > >> mind, which
> > > >> I won't even bother to relate because I feel like the motivation
> made
> > a
> > > >> compelling
> > > >> argument for the API as proposed.
> > > >>
> > > >> One very minor point you might as well fix is that the API change is
> > > >> targeted at
> > > >> KafkaConsumer (the implementation), but should be targeted at
> > > >> Consumer (the interface).
> > > >>
> > > >> I agree with your discomfort about the name. Adding a "rejoin"
> method
> > > >> seems strange
> > > >> since there's no "join" method. Instead the way you join the group
> the
> > > >> first time is just
> > > >> by calling "subscribe". But "resubscribe" seems too indirect from
> what
> > > >> we're really trying
> > > >> to do, which is to trigger a rebalance by sending a new JoinGroup
> > request.
> > > >>
> > > >> Another angle is that we don't want the method to sound like
> something
> > > >> you should
> > > >> be calling in normal circumstances, or people will be "tricked" into
> > > >> calling it unnecessarily.
> > > >>
> > > >> So, I think "rejoinGroup" is fine, although a person _might_ be
> > forgiven
> > > >> for thinking they
> > > >> need to call it periodically or something. Did you consider
> > > >> "triggerRebalance", which
> > > >> sounds pretty advanced-ish, and accurately describes what happens
> when
> > > >> you call it?
> > > >>
> > > >> All in all, the KIP sounds good to me, and I'm in favor.
> > > >>
> > > >> Thanks,
> > > >> -John
> > > >>
> > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> > > >> > This situation was discussed at length after a recent talk I gave.
> > This
> > > >> KIP
> > > >> > would be a great step towards increased availability and in
> > facilitating
> > > >> > lightweight rebalances.
> > > >> >
> > > >> > anna
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <
> > sophie@confluent.io>
> > > >> > wrote:
> > > >> >
> > > >> > > Hi all,
> > > >> > >
> > > >> > > In light of some recent and upcoming rebalancing and
> availability
> > > >> > > improvements, it seems we have a need for explicitly triggering
> a
> > > >> consumer
> > > >> > > group rebalance. Therefore I'd like to propose adding a new
> > > >> > > rejoinGroup()method
> > > >> > > to the Consumer client (better method name suggestions are very
> > > >> welcome).
> > > >> > >
> > > >> > > Please take a look at the KIP and let me know what you think!
> > > >> > >
> > > >> > > KIP document:
> > > >> > >
> > > >> > >
> > > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > >> > >
> > > >> > > JIRA: https://issues.apache.org/jira/browse/KAFKA-9525
> > > >> > >
> > > >> > > Cheers,
> > > >> > > Sophie
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by John Roesler <vv...@apache.org>.
Sounds perfect. Thanks!
-John

On Mon, Feb 10, 2020, at 19:18, Sophie Blee-Goldman wrote:
> Thanks John. I took out the KafkaConsumer method and moved the javadocs
> to the Consumer#enforceRebalance in the KIP -- hope you're happy :P
> 
> Also, I wanted to point out one minor change to the current proposal: make
> this
> a blocking call, which accepts a timeout and returns whether the rebalance
> completed within the timeout. It will still reduce to a nonblocking call if
> a "zero"
> timeout is supplied. I've updated the KIP accordingly.
> 
> Let me know if there are any further concerns, else I'll call for a vote.
> 
> Cheers!
> Sophie
> 
> On Mon, Feb 10, 2020 at 12:47 PM John Roesler <vv...@apache.org> wrote:
> 
> > Thanks Sophie,
> >
> > Sorry I didn't respond. I think your new method name sounds good.
> >
> > Regarding the interface vs implementation, I agree it's confusing. It's
> > always bothered me that the interface redirects you to an implementation
> > JavaDocs, but never enough for me to stop what I'm doing to fix it.
> > It's not a big deal either way, I just thought it was strange to propose a
> > "public interface" change, but not in terms of the actual interface class.
> >
> > It _is_ true that KafkaConsumer is also part of the public API, but only
> > really
> > for the constructor. Any proposal to define a new "consumer client" API
> > should be on the Consumer interface (which you said you plan to do anyway).
> > I guess I brought it up because proposing an addition to Consumer implies
> > it would be added to KafkaConsumer, but proposing an addition to
> > KafkaConsumer does not necessarily imply it would also be added to
> > Consumer. Does that make sense?
> >
> > Anyway, thanks for updating the KIP.
> >
> > Thanks,
> > -John
> >
> >
> > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman wrote:
> > > Since this doesn't seem too controversial, I'll probably call for a vote
> > by
> > > end of day.
> > > If there any further comments/questions/concerns, please let me know!
> > >
> > > Thanks,
> > > Sophie
> > >
> > > On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <sophie@confluent.io
> > >
> > > wrote:
> > >
> > > > Thanks for the feedback! That's a good point about trying to prevent
> > users
> > > > from
> > > > thinking they should use this API during normal processing and
> > clarifying
> > > > when/why
> > > > you might need it -- regardless of the method name, we should
> > explicitly
> > > > call this out
> > > > in the javadocs.
> > > >
> > > > As for the method name, on reflection I agree that "rejoinGroup" does
> > not
> > > > seem to be
> > > > appropriate. Of course that's what the consumer will actually be doing,
> > > > but that's just an
> > > > implementation detail -- the name should reflect what the API is doing,
> > > > not how it does it
> > > > (which can always change).
> > > >
> > > > How about "enforceRebalance"? This is stolen from the StreamThread
> > method
> > > > that does
> > > > exactly this (by unsubscribing) so it seems to fit. I'll update the KIP
> > > > with this unless anyone
> > > > has another suggestion.
> > > >
> > > > Regarding the Consumer vs KafkaConsumer matter, I included the
> > > > KafkaConsumer method
> > > > because that's where all the javadocs redirect to in the Consumer
> > > > interface. Also, FWIW
> > > > I'm pretty sure KafkaConsumer is also part of the public API -- we
> > would
> > > > be adding a new
> > > > method to both.
> > > >
> > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <vv...@apache.org>
> > wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> Thanks for the well motivated KIP, Sophie. I had some alternatives in
> > > >> mind, which
> > > >> I won't even bother to relate because I feel like the motivation made
> > a
> > > >> compelling
> > > >> argument for the API as proposed.
> > > >>
> > > >> One very minor point you might as well fix is that the API change is
> > > >> targeted at
> > > >> KafkaConsumer (the implementation), but should be targeted at
> > > >> Consumer (the interface).
> > > >>
> > > >> I agree with your discomfort about the name. Adding a "rejoin" method
> > > >> seems strange
> > > >> since there's no "join" method. Instead the way you join the group the
> > > >> first time is just
> > > >> by calling "subscribe". But "resubscribe" seems too indirect from what
> > > >> we're really trying
> > > >> to do, which is to trigger a rebalance by sending a new JoinGroup
> > request.
> > > >>
> > > >> Another angle is that we don't want the method to sound like something
> > > >> you should
> > > >> be calling in normal circumstances, or people will be "tricked" into
> > > >> calling it unnecessarily.
> > > >>
> > > >> So, I think "rejoinGroup" is fine, although a person _might_ be
> > forgiven
> > > >> for thinking they
> > > >> need to call it periodically or something. Did you consider
> > > >> "triggerRebalance", which
> > > >> sounds pretty advanced-ish, and accurately describes what happens when
> > > >> you call it?
> > > >>
> > > >> All in all, the KIP sounds good to me, and I'm in favor.
> > > >>
> > > >> Thanks,
> > > >> -John
> > > >>
> > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> > > >> > This situation was discussed at length after a recent talk I gave.
> > This
> > > >> KIP
> > > >> > would be a great step towards increased availability and in
> > facilitating
> > > >> > lightweight rebalances.
> > > >> >
> > > >> > anna
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <
> > sophie@confluent.io>
> > > >> > wrote:
> > > >> >
> > > >> > > Hi all,
> > > >> > >
> > > >> > > In light of some recent and upcoming rebalancing and availability
> > > >> > > improvements, it seems we have a need for explicitly triggering a
> > > >> consumer
> > > >> > > group rebalance. Therefore I'd like to propose adding a new
> > > >> > > rejoinGroup()method
> > > >> > > to the Consumer client (better method name suggestions are very
> > > >> welcome).
> > > >> > >
> > > >> > > Please take a look at the KIP and let me know what you think!
> > > >> > >
> > > >> > > KIP document:
> > > >> > >
> > > >> > >
> > > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > >> > >
> > > >> > > JIRA: https://issues.apache.org/jira/browse/KAFKA-9525
> > > >> > >
> > > >> > > Cheers,
> > > >> > > Sophie
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Thanks John. I took out the KafkaConsumer method and moved the javadocs
to the Consumer#enforceRebalance in the KIP -- hope you're happy :P

Also, I wanted to point out one minor change to the current proposal: make
this
a blocking call, which accepts a timeout and returns whether the rebalance
completed within the timeout. It will still reduce to a nonblocking call if
a "zero"
timeout is supplied. I've updated the KIP accordingly.

Let me know if there are any further concerns, else I'll call for a vote.

Cheers!
Sophie

On Mon, Feb 10, 2020 at 12:47 PM John Roesler <vv...@apache.org> wrote:

> Thanks Sophie,
>
> Sorry I didn't respond. I think your new method name sounds good.
>
> Regarding the interface vs implementation, I agree it's confusing. It's
> always bothered me that the interface redirects you to an implementation
> JavaDocs, but never enough for me to stop what I'm doing to fix it.
> It's not a big deal either way, I just thought it was strange to propose a
> "public interface" change, but not in terms of the actual interface class.
>
> It _is_ true that KafkaConsumer is also part of the public API, but only
> really
> for the constructor. Any proposal to define a new "consumer client" API
> should be on the Consumer interface (which you said you plan to do anyway).
> I guess I brought it up because proposing an addition to Consumer implies
> it would be added to KafkaConsumer, but proposing an addition to
> KafkaConsumer does not necessarily imply it would also be added to
> Consumer. Does that make sense?
>
> Anyway, thanks for updating the KIP.
>
> Thanks,
> -John
>
>
> On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman wrote:
> > Since this doesn't seem too controversial, I'll probably call for a vote
> by
> > end of day.
> > If there any further comments/questions/concerns, please let me know!
> >
> > Thanks,
> > Sophie
> >
> > On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <sophie@confluent.io
> >
> > wrote:
> >
> > > Thanks for the feedback! That's a good point about trying to prevent
> users
> > > from
> > > thinking they should use this API during normal processing and
> clarifying
> > > when/why
> > > you might need it -- regardless of the method name, we should
> explicitly
> > > call this out
> > > in the javadocs.
> > >
> > > As for the method name, on reflection I agree that "rejoinGroup" does
> not
> > > seem to be
> > > appropriate. Of course that's what the consumer will actually be doing,
> > > but that's just an
> > > implementation detail -- the name should reflect what the API is doing,
> > > not how it does it
> > > (which can always change).
> > >
> > > How about "enforceRebalance"? This is stolen from the StreamThread
> method
> > > that does
> > > exactly this (by unsubscribing) so it seems to fit. I'll update the KIP
> > > with this unless anyone
> > > has another suggestion.
> > >
> > > Regarding the Consumer vs KafkaConsumer matter, I included the
> > > KafkaConsumer method
> > > because that's where all the javadocs redirect to in the Consumer
> > > interface. Also, FWIW
> > > I'm pretty sure KafkaConsumer is also part of the public API -- we
> would
> > > be adding a new
> > > method to both.
> > >
> > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <vv...@apache.org>
> wrote:
> > >
> > >> Hi all,
> > >>
> > >> Thanks for the well motivated KIP, Sophie. I had some alternatives in
> > >> mind, which
> > >> I won't even bother to relate because I feel like the motivation made
> a
> > >> compelling
> > >> argument for the API as proposed.
> > >>
> > >> One very minor point you might as well fix is that the API change is
> > >> targeted at
> > >> KafkaConsumer (the implementation), but should be targeted at
> > >> Consumer (the interface).
> > >>
> > >> I agree with your discomfort about the name. Adding a "rejoin" method
> > >> seems strange
> > >> since there's no "join" method. Instead the way you join the group the
> > >> first time is just
> > >> by calling "subscribe". But "resubscribe" seems too indirect from what
> > >> we're really trying
> > >> to do, which is to trigger a rebalance by sending a new JoinGroup
> request.
> > >>
> > >> Another angle is that we don't want the method to sound like something
> > >> you should
> > >> be calling in normal circumstances, or people will be "tricked" into
> > >> calling it unnecessarily.
> > >>
> > >> So, I think "rejoinGroup" is fine, although a person _might_ be
> forgiven
> > >> for thinking they
> > >> need to call it periodically or something. Did you consider
> > >> "triggerRebalance", which
> > >> sounds pretty advanced-ish, and accurately describes what happens when
> > >> you call it?
> > >>
> > >> All in all, the KIP sounds good to me, and I'm in favor.
> > >>
> > >> Thanks,
> > >> -John
> > >>
> > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> > >> > This situation was discussed at length after a recent talk I gave.
> This
> > >> KIP
> > >> > would be a great step towards increased availability and in
> facilitating
> > >> > lightweight rebalances.
> > >> >
> > >> > anna
> > >> >
> > >> >
> > >> >
> > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <
> sophie@confluent.io>
> > >> > wrote:
> > >> >
> > >> > > Hi all,
> > >> > >
> > >> > > In light of some recent and upcoming rebalancing and availability
> > >> > > improvements, it seems we have a need for explicitly triggering a
> > >> consumer
> > >> > > group rebalance. Therefore I'd like to propose adding a new
> > >> > > rejoinGroup()method
> > >> > > to the Consumer client (better method name suggestions are very
> > >> welcome).
> > >> > >
> > >> > > Please take a look at the KIP and let me know what you think!
> > >> > >
> > >> > > KIP document:
> > >> > >
> > >> > >
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > >> > >
> > >> > > JIRA: https://issues.apache.org/jira/browse/KAFKA-9525
> > >> > >
> > >> > > Cheers,
> > >> > > Sophie
> > >> > >
> > >> >
> > >>
> > >
> >
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by John Roesler <vv...@apache.org>.
Thanks Sophie,

Sorry I didn't respond. I think your new method name sounds good.

Regarding the interface vs implementation, I agree it's confusing. It's
always bothered me that the interface redirects you to an implementation
JavaDocs, but never enough for me to stop what I'm doing to fix it.
It's not a big deal either way, I just thought it was strange to propose a 
"public interface" change, but not in terms of the actual interface class.

It _is_ true that KafkaConsumer is also part of the public API, but only really
for the constructor. Any proposal to define a new "consumer client" API
should be on the Consumer interface (which you said you plan to do anyway).
I guess I brought it up because proposing an addition to Consumer implies
it would be added to KafkaConsumer, but proposing an addition to 
KafkaConsumer does not necessarily imply it would also be added to 
Consumer. Does that make sense?

Anyway, thanks for updating the KIP.

Thanks,
-John


On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman wrote:
> Since this doesn't seem too controversial, I'll probably call for a vote by
> end of day.
> If there any further comments/questions/concerns, please let me know!
> 
> Thanks,
> Sophie
> 
> On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <so...@confluent.io>
> wrote:
> 
> > Thanks for the feedback! That's a good point about trying to prevent users
> > from
> > thinking they should use this API during normal processing and clarifying
> > when/why
> > you might need it -- regardless of the method name, we should explicitly
> > call this out
> > in the javadocs.
> >
> > As for the method name, on reflection I agree that "rejoinGroup" does not
> > seem to be
> > appropriate. Of course that's what the consumer will actually be doing,
> > but that's just an
> > implementation detail -- the name should reflect what the API is doing,
> > not how it does it
> > (which can always change).
> >
> > How about "enforceRebalance"? This is stolen from the StreamThread method
> > that does
> > exactly this (by unsubscribing) so it seems to fit. I'll update the KIP
> > with this unless anyone
> > has another suggestion.
> >
> > Regarding the Consumer vs KafkaConsumer matter, I included the
> > KafkaConsumer method
> > because that's where all the javadocs redirect to in the Consumer
> > interface. Also, FWIW
> > I'm pretty sure KafkaConsumer is also part of the public API -- we would
> > be adding a new
> > method to both.
> >
> > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <vv...@apache.org> wrote:
> >
> >> Hi all,
> >>
> >> Thanks for the well motivated KIP, Sophie. I had some alternatives in
> >> mind, which
> >> I won't even bother to relate because I feel like the motivation made a
> >> compelling
> >> argument for the API as proposed.
> >>
> >> One very minor point you might as well fix is that the API change is
> >> targeted at
> >> KafkaConsumer (the implementation), but should be targeted at
> >> Consumer (the interface).
> >>
> >> I agree with your discomfort about the name. Adding a "rejoin" method
> >> seems strange
> >> since there's no "join" method. Instead the way you join the group the
> >> first time is just
> >> by calling "subscribe". But "resubscribe" seems too indirect from what
> >> we're really trying
> >> to do, which is to trigger a rebalance by sending a new JoinGroup request.
> >>
> >> Another angle is that we don't want the method to sound like something
> >> you should
> >> be calling in normal circumstances, or people will be "tricked" into
> >> calling it unnecessarily.
> >>
> >> So, I think "rejoinGroup" is fine, although a person _might_ be forgiven
> >> for thinking they
> >> need to call it periodically or something. Did you consider
> >> "triggerRebalance", which
> >> sounds pretty advanced-ish, and accurately describes what happens when
> >> you call it?
> >>
> >> All in all, the KIP sounds good to me, and I'm in favor.
> >>
> >> Thanks,
> >> -John
> >>
> >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> >> > This situation was discussed at length after a recent talk I gave. This
> >> KIP
> >> > would be a great step towards increased availability and in facilitating
> >> > lightweight rebalances.
> >> >
> >> > anna
> >> >
> >> >
> >> >
> >> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <so...@confluent.io>
> >> > wrote:
> >> >
> >> > > Hi all,
> >> > >
> >> > > In light of some recent and upcoming rebalancing and availability
> >> > > improvements, it seems we have a need for explicitly triggering a
> >> consumer
> >> > > group rebalance. Therefore I'd like to propose adding a new
> >> > > rejoinGroup()method
> >> > > to the Consumer client (better method name suggestions are very
> >> welcome).
> >> > >
> >> > > Please take a look at the KIP and let me know what you think!
> >> > >
> >> > > KIP document:
> >> > >
> >> > >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> >> > >
> >> > > JIRA: https://issues.apache.org/jira/browse/KAFKA-9525
> >> > >
> >> > > Cheers,
> >> > > Sophie
> >> > >
> >> >
> >>
> >
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Since this doesn't seem too controversial, I'll probably call for a vote by
end of day.
If there any further comments/questions/concerns, please let me know!

Thanks,
Sophie

On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <so...@confluent.io>
wrote:

> Thanks for the feedback! That's a good point about trying to prevent users
> from
> thinking they should use this API during normal processing and clarifying
> when/why
> you might need it -- regardless of the method name, we should explicitly
> call this out
> in the javadocs.
>
> As for the method name, on reflection I agree that "rejoinGroup" does not
> seem to be
> appropriate. Of course that's what the consumer will actually be doing,
> but that's just an
> implementation detail -- the name should reflect what the API is doing,
> not how it does it
> (which can always change).
>
> How about "enforceRebalance"? This is stolen from the StreamThread method
> that does
> exactly this (by unsubscribing) so it seems to fit. I'll update the KIP
> with this unless anyone
> has another suggestion.
>
> Regarding the Consumer vs KafkaConsumer matter, I included the
> KafkaConsumer method
> because that's where all the javadocs redirect to in the Consumer
> interface. Also, FWIW
> I'm pretty sure KafkaConsumer is also part of the public API -- we would
> be adding a new
> method to both.
>
> On Fri, Feb 7, 2020 at 7:42 PM John Roesler <vv...@apache.org> wrote:
>
>> Hi all,
>>
>> Thanks for the well motivated KIP, Sophie. I had some alternatives in
>> mind, which
>> I won't even bother to relate because I feel like the motivation made a
>> compelling
>> argument for the API as proposed.
>>
>> One very minor point you might as well fix is that the API change is
>> targeted at
>> KafkaConsumer (the implementation), but should be targeted at
>> Consumer (the interface).
>>
>> I agree with your discomfort about the name. Adding a "rejoin" method
>> seems strange
>> since there's no "join" method. Instead the way you join the group the
>> first time is just
>> by calling "subscribe". But "resubscribe" seems too indirect from what
>> we're really trying
>> to do, which is to trigger a rebalance by sending a new JoinGroup request.
>>
>> Another angle is that we don't want the method to sound like something
>> you should
>> be calling in normal circumstances, or people will be "tricked" into
>> calling it unnecessarily.
>>
>> So, I think "rejoinGroup" is fine, although a person _might_ be forgiven
>> for thinking they
>> need to call it periodically or something. Did you consider
>> "triggerRebalance", which
>> sounds pretty advanced-ish, and accurately describes what happens when
>> you call it?
>>
>> All in all, the KIP sounds good to me, and I'm in favor.
>>
>> Thanks,
>> -John
>>
>> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
>> > This situation was discussed at length after a recent talk I gave. This
>> KIP
>> > would be a great step towards increased availability and in facilitating
>> > lightweight rebalances.
>> >
>> > anna
>> >
>> >
>> >
>> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <so...@confluent.io>
>> > wrote:
>> >
>> > > Hi all,
>> > >
>> > > In light of some recent and upcoming rebalancing and availability
>> > > improvements, it seems we have a need for explicitly triggering a
>> consumer
>> > > group rebalance. Therefore I'd like to propose adding a new
>> > > rejoinGroup()method
>> > > to the Consumer client (better method name suggestions are very
>> welcome).
>> > >
>> > > Please take a look at the KIP and let me know what you think!
>> > >
>> > > KIP document:
>> > >
>> > >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
>> > >
>> > > JIRA: https://issues.apache.org/jira/browse/KAFKA-9525
>> > >
>> > > Cheers,
>> > > Sophie
>> > >
>> >
>>
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Thanks for the feedback! That's a good point about trying to prevent users
from
thinking they should use this API during normal processing and clarifying
when/why
you might need it -- regardless of the method name, we should explicitly
call this out
in the javadocs.

As for the method name, on reflection I agree that "rejoinGroup" does not
seem to be
appropriate. Of course that's what the consumer will actually be doing, but
that's just an
implementation detail -- the name should reflect what the API is doing, not
how it does it
(which can always change).

How about "enforceRebalance"? This is stolen from the StreamThread method
that does
exactly this (by unsubscribing) so it seems to fit. I'll update the KIP
with this unless anyone
has another suggestion.

Regarding the Consumer vs KafkaConsumer matter, I included the
KafkaConsumer method
because that's where all the javadocs redirect to in the Consumer
interface. Also, FWIW
I'm pretty sure KafkaConsumer is also part of the public API -- we would be
adding a new
method to both.

On Fri, Feb 7, 2020 at 7:42 PM John Roesler <vv...@apache.org> wrote:

> Hi all,
>
> Thanks for the well motivated KIP, Sophie. I had some alternatives in
> mind, which
> I won't even bother to relate because I feel like the motivation made a
> compelling
> argument for the API as proposed.
>
> One very minor point you might as well fix is that the API change is
> targeted at
> KafkaConsumer (the implementation), but should be targeted at
> Consumer (the interface).
>
> I agree with your discomfort about the name. Adding a "rejoin" method
> seems strange
> since there's no "join" method. Instead the way you join the group the
> first time is just
> by calling "subscribe". But "resubscribe" seems too indirect from what
> we're really trying
> to do, which is to trigger a rebalance by sending a new JoinGroup request.
>
> Another angle is that we don't want the method to sound like something you
> should
> be calling in normal circumstances, or people will be "tricked" into
> calling it unnecessarily.
>
> So, I think "rejoinGroup" is fine, although a person _might_ be forgiven
> for thinking they
> need to call it periodically or something. Did you consider
> "triggerRebalance", which
> sounds pretty advanced-ish, and accurately describes what happens when you
> call it?
>
> All in all, the KIP sounds good to me, and I'm in favor.
>
> Thanks,
> -John
>
> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> > This situation was discussed at length after a recent talk I gave. This
> KIP
> > would be a great step towards increased availability and in facilitating
> > lightweight rebalances.
> >
> > anna
> >
> >
> >
> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <so...@confluent.io>
> > wrote:
> >
> > > Hi all,
> > >
> > > In light of some recent and upcoming rebalancing and availability
> > > improvements, it seems we have a need for explicitly triggering a
> consumer
> > > group rebalance. Therefore I'd like to propose adding a new
> > > rejoinGroup()method
> > > to the Consumer client (better method name suggestions are very
> welcome).
> > >
> > > Please take a look at the KIP and let me know what you think!
> > >
> > > KIP document:
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > >
> > > JIRA: https://issues.apache.org/jira/browse/KAFKA-9525
> > >
> > > Cheers,
> > > Sophie
> > >
> >
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by John Roesler <vv...@apache.org>.
Hi all,

Thanks for the well motivated KIP, Sophie. I had some alternatives in mind, which
I won't even bother to relate because I feel like the motivation made a compelling
argument for the API as proposed.

One very minor point you might as well fix is that the API change is targeted at
KafkaConsumer (the implementation), but should be targeted at
Consumer (the interface).

I agree with your discomfort about the name. Adding a "rejoin" method seems strange
since there's no "join" method. Instead the way you join the group the first time is just
by calling "subscribe". But "resubscribe" seems too indirect from what we're really trying
to do, which is to trigger a rebalance by sending a new JoinGroup request.

Another angle is that we don't want the method to sound like something you should
be calling in normal circumstances, or people will be "tricked" into calling it unnecessarily.

So, I think "rejoinGroup" is fine, although a person _might_ be forgiven for thinking they
need to call it periodically or something. Did you consider "triggerRebalance", which 
sounds pretty advanced-ish, and accurately describes what happens when you call it?

All in all, the KIP sounds good to me, and I'm in favor.

Thanks,
-John

On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> This situation was discussed at length after a recent talk I gave. This KIP
> would be a great step towards increased availability and in facilitating
> lightweight rebalances.
> 
> anna
> 
> 
> 
> On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <so...@confluent.io>
> wrote:
> 
> > Hi all,
> >
> > In light of some recent and upcoming rebalancing and availability
> > improvements, it seems we have a need for explicitly triggering a consumer
> > group rebalance. Therefore I'd like to propose adding a new
> > rejoinGroup()method
> > to the Consumer client (better method name suggestions are very welcome).
> >
> > Please take a look at the KIP and let me know what you think!
> >
> > KIP document:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> >
> > JIRA: https://issues.apache.org/jira/browse/KAFKA-9525
> >
> > Cheers,
> > Sophie
> >
>

Re: [DISCUSS] KIP-568: Explicit rebalance triggering on the Consumer

Posted by Anna McDonald <jb...@happypants.org>.
This situation was discussed at length after a recent talk I gave. This KIP
would be a great step towards increased availability and in facilitating
lightweight rebalances.

anna



On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <so...@confluent.io>
wrote:

> Hi all,
>
> In light of some recent and upcoming rebalancing and availability
> improvements, it seems we have a need for explicitly triggering a consumer
> group rebalance. Therefore I'd like to propose adding a new
> rejoinGroup()method
> to the Consumer client (better method name suggestions are very welcome).
>
> Please take a look at the KIP and let me know what you think!
>
> KIP document:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
>
> JIRA: https://issues.apache.org/jira/browse/KAFKA-9525
>
> Cheers,
> Sophie
>