You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Becket Qin <be...@gmail.com> on 2017/11/01 16:53:42 UTC

[DISCUSS] KIP-219 - Improve Quota Communication

Hi,

We would like to start the discussion on KIP-219.

The KIP tries to improve quota throttling time communication between
brokers and clients to avoid clients timeout in case of long throttling
time.

The KIP link is following:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-219+-+Improve+quota+communication

Comments are welcome.

Thanks,

Jiangjie (Becket) Qin

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Prajakta Dumbre <du...@gmail.com>.
I have to develop Real-time recommendation System.
I am trying to use Apache Kafka for Website activity tracking Bu am not
able to understand how can I track website activity. Please tell me the way
how can I track website activity. Right now I am using Apache Kafka, apache
spark, and python. please help me

On Wed, Apr 25, 2018 at 10:34 AM, Ismael Juma <is...@juma.me.uk> wrote:

> Hi Jon,
>
> Not sure about this approach. It's worth mentioning this in the vote thread
> as well so that the people who voted originally have a chance to comment.
> Also, we should really get input from developers of Kafka clients
> (librdkafka, kafka-python, etc.) for this KIP.
>
> Ismael
>
> On Thu, Apr 5, 2018 at 2:50 PM, Jonghyun Lee <jo...@gmail.com> wrote:
>
> > Hi,
> >
> > I have been implementing KIP-219. I discussed the interface changes with
> > Becket Qin and Dong Lin, and we decided to bump up the protocol version
> of
> > ApiVersionsRequest and ApiVersionsResponse only, instead of bumping up
> all
> > requests/responses that may be throttled, to indicate clients whether or
> > not brokers perform throttling before sending out responses. We think
> this
> > is sufficient given that network client exchanges
> > ApiVersionsRequest/Response with each broker when establishing connection
> > to it and thus it can detect the broker's throttling behavior just by
> > examining the ApiVersionsResponse version.
> >
> > Please respond to this e-mail if you have any questions or concerns.
> >
> > Thanks,
> > Jon Lee
> >
> >
> > On Thu, Apr 5, 2018 at 2:29 PM, Becket Qin <be...@gmail.com> wrote:
> >
> > >
> > >
> > > On Thu, Nov 16, 2017 at 3:49 PM, Becket Qin <be...@gmail.com>
> > wrote:
> > >
> > >> Thanks Rajini,
> > >>
> > >> I updated the KIP wiki to clarify the scope of the KIP. To summarize,
> > the
> > >> current quota has a few caveats:
> > >> 1. The brokers are only throttling the NEXT request even if the
> current
> > >> request is already violating quota. So the broker will always process
> at
> > >> least one request from the client.
> > >> 2. The brokers are not able to know the client id until they fully
> read
> > >> the request out of the sockets even if that client is being throttled.
> > >> 3. The brokers are not communicating to the clients promptly, so the
> > >> clients have to blindly wait and sometimes times out unnecessarily.
> > >>
> > >> This KIP only tries to address 3 but not 1 and 2 because A) those two
> > >> issues are sort of orthogonal to 3 and B) the solution to 1 and 2 are
> > much
> > >> more complicated and worth a separate discussion.
> > >>
> > >> I'll wait till tomorrow and start a voting thread if there are further
> > >> concerns raised about the KIP.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >> On Thu, Nov 16, 2017 at 11:47 AM, Rajini Sivaram <
> > rajinisivaram@gmail.com
> > >> > wrote:
> > >>
> > >>> Hi Becket,
> > >>>
> > >>> The current user quota doesn't solve the problem. But I was thinking
> > that
> > >>> if we could ensure we don't read more from the network than the quota
> > >>> allows, we may be able to fix the issue in a different way
> (throttling
> > >>> all
> > >>> connections, each for a limited time prior to reading large
> requests).
> > >>> But
> > >>> it would be more complex (and even more messy for client-id quotas),
> > so I
> > >>> can understand why the solution you proposed in the KIP makes sense
> for
> > >>> the
> > >>> scenario that you described.
> > >>>
> > >>> Regards,
> > >>>
> > >>> Rajini
> > >>>
> > >>> On Tue, Nov 14, 2017 at 11:30 PM, Becket Qin <be...@gmail.com>
> > >>> wrote:
> > >>>
> > >>> > Hi Rajini,
> > >>> >
> > >>> > We are using SSL so we could use user quota. But I am not sure if
> > that
> > >>> > would solve the problem. The key issue in our case is that each
> > broker
> > >>> can
> > >>> > only handle ~300 MB/s of incoming bytes, but the MapReduce job is
> > >>> trying to
> > >>> > push 1-2 GB/s, unless we can throttle the clients to 300 MB/s, the
> > >>> broker
> > >>> > cannot sustain. In order to do that, we need to be able to throttle
> > >>> > requests for more than request timeout, potentially a few minutes.
> It
> > >>> seems
> > >>> > neither user quota nor limited throttle time can achieve this.
> > >>> >
> > >>> > Thanks,
> > >>> >
> > >>> > Jiangjie (Becket) Qin
> > >>> >
> > >>> > On Tue, Nov 14, 2017 at 7:44 AM, Rajini Sivaram <
> > >>> rajinisivaram@gmail.com>
> > >>> > wrote:
> > >>> >
> > >>> > > Hi Becket,
> > >>> > >
> > >>> > > For the specific scenario that you described, would it be
> possible
> > >>> to use
> > >>> > > user quotas rather than client-id quotas? With user quotas,
> perhaps
> > >>> we
> > >>> > can
> > >>> > > throttle more easily before reading requests as well (as you
> > >>> mentioned,
> > >>> > the
> > >>> > > difficulty with client-id quota is that we have to read partial
> > >>> requests
> > >>> > > and handle client-ids at network layer making that a much bigger
> > >>> change).
> > >>> > > If your clients are using SASL/SSL, I was wondering whether a
> > >>> solution
> > >>> > that
> > >>> > > improves user quotas and limits throttle time would work for you.
> > >>> > >
> > >>> > > Regards,
> > >>> > >
> > >>> > > Rajini
> > >>> > >
> > >>> > >
> > >>> > >
> > >>> > > On Thu, Nov 9, 2017 at 12:59 AM, Becket Qin <
> becket.qin@gmail.com>
> > >>> > wrote:
> > >>> > >
> > >>> > > > Since we will bump up the wire request version, another option
> is
> > >>> for
> > >>> > > > clients that are sending old request versions the broker can
> just
> > >>> keep
> > >>> > > the
> > >>> > > > current behavior. For clients sending the new request versions,
> > the
> > >>> > > broker
> > >>> > > > can respond then mute the channel as described in the KIP wiki.
> > In
> > >>> this
> > >>> > > > case, muting the channel is mostly for protection purpose. A
> > >>> correctly
> > >>> > > > implemented client should back off for throttle time before
> > >>> sending the
> > >>> > > > next request. The downside is that the broker needs to keep
> both
> > >>> logic
> > >>> > > and
> > >>> > > > it seems not gaining much benefit. So personally I prefer to
> just
> > >>> mute
> > >>> > > the
> > >>> > > > channel. But I am open to different opinions.
> > >>> > > >
> > >>> > > > Thanks,
> > >>> > > >
> > >>> > > > Jiangjie (Becket) Qin
> > >>> > > >
> > >>> > > > On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <
> becket.qin@gmail.com
> > >
> > >>> > wrote:
> > >>> > > >
> > >>> > > > > Hi Jun,
> > >>> > > > >
> > >>> > > > > Hmm, even if a connection is closed by the client when the
> > >>> channel is
> > >>> > > > > muted. After the channel is unmuted, it seems
> Selector.select()
> > >>> will
> > >>> > > > detect
> > >>> > > > > this and close the socket.
> > >>> > > > > It is true that before the channel is unmuted the socket will
> > be
> > >>> in a
> > >>> > > > > CLOSE_WAIT state though. So having an arbitrarily long muted
> > >>> duration
> > >>> > > may
> > >>> > > > > indeed cause problem.
> > >>> > > > >
> > >>> > > > > Thanks,
> > >>> > > > >
> > >>> > > > > Jiangjie (Becket) Qin
> > >>> > > > >
> > >>> > > > > On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <
> > becket.qin@gmail.com
> > >>> >
> > >>> > > wrote:
> > >>> > > > >
> > >>> > > > >> Hi Rajini,
> > >>> > > > >>
> > >>> > > > >> Thanks for the detail explanation. Please see the reply
> below:
> > >>> > > > >>
> > >>> > > > >> 2. Limiting the throttle time to connection.max.idle.ms on
> > the
> > >>> > broker
> > >>> > > > >> side is probably fine. However, clients may have a different
> > >>> > > > configuration
> > >>> > > > >> of connection.max.idle.ms and still reconnect before the
> > >>> throttle
> > >>> > > time
> > >>> > > > >> (which is the server side connection.max.idle.ms). It seems
> > >>> another
> > >>> > > > back
> > >>> > > > >> door for quota.
> > >>> > > > >>
> > >>> > > > >> 3. I agree we could just mute the server socket until
> > >>> > > > >> connection.max.idle.ms if the massive CLOSE_WAIT is a big
> > >>> issue.
> > >>> > This
> > >>> > > > >> helps guarantee only connection_rate *
> connection.max.idle.ms
> > >>> > sockets
> > >>> > > > >> will be in CLOSE_WAIT state. For cooperative clients,
> unmuting
> > >>> the
> > >>> > > > socket
> > >>> > > > >> will not have negative impact.
> > >>> > > > >>
> > >>> > > > >> 4. My concern for capping the throttle time to
> > >>> metrics.window.ms is
> > >>> > > > that
> > >>> > > > >> we will not be able to enforce quota effectively. It might
> be
> > >>> useful
> > >>> > > to
> > >>> > > > >> explain this with a real example we are trying to solve. We
> > >>> have a
> > >>> > > > >> MapReduce job pushing data to a Kafka cluster. The MapReduce
> > >>> job has
> > >>> > > > >> hundreds of producers and each of them sends a normal sized
> > >>> > > > ProduceRequest
> > >>> > > > >> (~2 MB) to each of the brokers in the cluster. Apparently
> the
> > >>> client
> > >>> > > id
> > >>> > > > >> will ran out of bytes quota pretty quickly, and the broker
> > >>> started
> > >>> > to
> > >>> > > > >> throttle the producers. The throttle time could actually be
> > >>> pretty
> > >>> > > long
> > >>> > > > >> (e.g. a few minute). At that point, request queue time on
> the
> > >>> > brokers
> > >>> > > > was
> > >>> > > > >> around 30 seconds. After that, a bunch of producer hit
> > >>> > > > request.timeout.ms
> > >>> > > > >> and reconnected and sent the next request again, which
> causes
> > >>> > another
> > >>> > > > spike
> > >>> > > > >> and a longer queue.
> > >>> > > > >>
> > >>> > > > >> In the above case, unless we set the quota window to be
> pretty
> > >>> big,
> > >>> > we
> > >>> > > > >> will not be able to enforce the quota. And if we set the
> > window
> > >>> size
> > >>> > > to
> > >>> > > > a
> > >>> > > > >> large value, the request might be throttled for longer than
> > >>> > > > >> connection.max.idle.ms.
> > >>> > > > >>
> > >>> > > > >> > We need a solution to improve flow control for
> well-behaved
> > >>> > clients
> > >>> > > > >> > which currently rely entirely on broker's throttling. The
> > KIP
> > >>> > > > addresses
> > >>> > > > >> > this using co-operative clients that sleep for an
> unbounded
> > >>> > throttle
> > >>> > > > >> time.
> > >>> > > > >> > I feel this is not ideal since the result is traffic with
> a
> > >>> lot of
> > >>> > > > >> spikes.
> > >>> > > > >> > Feedback from brokers to enable flow control in the client
> > is
> > >>> a
> > >>> > good
> > >>> > > > >> idea,
> > >>> > > > >> > but clients with excessive throttle times should really
> have
> > >>> been
> > >>> > > > >> > configured with smaller batch sizes.
> > >>> > > > >>
> > >>> > > > >> This is not really about a single producer with large size,
> it
> > >>> is a
> > >>> > > lot
> > >>> > > > >> of small producers talking to the client at the same time.
> > >>> Reducing
> > >>> > > the
> > >>> > > > >> batch size does not help much here. Also note that after the
> > >>> spike
> > >>> > > > >> traffic at the very beginning, the throttle time of the
> > >>> > > ProduceRequests
> > >>> > > > >> processed later are actually going to be increasing (for
> > >>> example,
> > >>> > the
> > >>> > > > first
> > >>> > > > >> throttled request will be throttled for 1 second, the second
> > >>> > throttled
> > >>> > > > >> request will be throttled for 10 sec, etc.). Due to the
> > throttle
> > >>> > time
> > >>> > > > >> variation, if every producer honors the throttle time, there
> > >>> will
> > >>> > not
> > >>> > > > be a
> > >>> > > > >> next spike after the first produce.
> > >>> > > > >>
> > >>> > > > >> > We need a solution to enforce smaller quotas to protect
> the
> > >>> broker
> > >>> > > > >> > from misbehaving clients. The KIP addresses this by muting
> > >>> > channels
> > >>> > > > for
> > >>> > > > >> an
> > >>> > > > >> > unbounded time. This introduces problems of channels in
> > >>> > CLOSE_WAIT.
> > >>> > > > And
> > >>> > > > >> > doesn't really solve all issues with misbehaving clients
> > >>> since new
> > >>> > > > >> > connections can be created to bypass quotas.
> > >>> > > > >>
> > >>> > > > >> Our current quota only protects cooperating clients because
> > our
> > >>> > quota
> > >>> > > is
> > >>> > > > >> really throttling the NEXT request after process a request
> > even
> > >>> if
> > >>> > > this
> > >>> > > > >> request itself has already violated quota. The misbehaving
> > >>> client
> > >>> > are
> > >>> > > > not
> > >>> > > > >> protected at all with the current quota mechanism. Like you
> > >>> > > mentioned, a
> > >>> > > > >> connection quota is required. We have been discussing about
> > >>> this at
> > >>> > > > >> LinkedIn for some time. Doing it right requires some major
> > >>> changes
> > >>> > > such
> > >>> > > > as
> > >>> > > > >> partially reading a request to identify the client id at
> > network
> > >>> > level
> > >>> > > > and
> > >>> > > > >> disconnect misbehaving clients.
> > >>> > > > >>
> > >>> > > > >> While handling misbehaving clients is important, we are not
> > >>> trying
> > >>> > to
> > >>> > > > >> address that in this KIP. This KIP is trying to improve the
> > >>> > > > communication
> > >>> > > > >> with good clients. Muting the channel is more of a migration
> > >>> plan so
> > >>> > > > that
> > >>> > > > >> we don't have regression on the old innocent (but good)
> > clients.
> > >>> > > > >>
> > >>> > > > >> Thanks,
> > >>> > > > >>
> > >>> > > > >> Jiangjie (Becket) Qin
> > >>> > > > >>
> > >>> > > > >>
> > >>> > > > >> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <ju...@confluent.io>
> > >>> wrote:
> > >>> > > > >>
> > >>> > > > >>> Hi, Jiangjie,
> > >>> > > > >>>
> > >>> > > > >>> 3. If a client closes a socket connection, typically the
> > server
> > >>> > only
> > >>> > > > >>> finds
> > >>> > > > >>> this out by reading from the channel and getting a negative
> > >>> size
> > >>> > > during
> > >>> > > > >>> the
> > >>> > > > >>> read. So, if a channel is muted by the server, the server
> > >>> won't be
> > >>> > > able
> > >>> > > > >>> to
> > >>> > > > >>> detect the closing of the connection by the client after
> the
> > >>> socket
> > >>> > > is
> > >>> > > > >>> unmuted. That's probably what Rajini wants to convey.
> > >>> > > > >>>
> > >>> > > > >>> Thanks,
> > >>> > > > >>>
> > >>> > > > >>> Jun
> > >>> > > > >>>
> > >>> > > > >>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <
> > >>> becket.qin@gmail.com>
> > >>> > > > wrote:
> > >>> > > > >>>
> > >>> > > > >>> > Thanks Rajini.
> > >>> > > > >>> >
> > >>> > > > >>> > 1. Good point. We do need to bump up the protocol version
> > so
> > >>> that
> > >>> > > the
> > >>> > > > >>> new
> > >>> > > > >>> > clients do not wait for another throttle time when they
> are
> > >>> > talking
> > >>> > > > to
> > >>> > > > >>> old
> > >>> > > > >>> > brokers. I'll update the KIP.
> > >>> > > > >>> >
> > >>> > > > >>> > 2. That is true. But the client was not supposed to send
> > >>> request
> > >>> > to
> > >>> > > > the
> > >>> > > > >>> > broker during that period anyways. So detecting the
> broker
> > >>> > failure
> > >>> > > > >>> later
> > >>> > > > >>> > seems fine?
> > >>> > > > >>> >
> > >>> > > > >>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as
> > the
> > >>> > > current
> > >>> > > > >>> state?
> > >>> > > > >>> > Currently the broker will still mute the socket until it
> > >>> sends
> > >>> > the
> > >>> > > > >>> response
> > >>> > > > >>> > back. If the clients disconnect while they are being
> > >>> throttled,
> > >>> > the
> > >>> > > > >>> closed
> > >>> > > > >>> > socket will not be detected until the throttle time has
> > >>> passed.
> > >>> > > > >>> >
> > >>> > > > >>> > Jun also suggested to bound the time by
> > >>> metric.sample.window.ms
> > >>> > in
> > >>> > > > the
> > >>> > > > >>> > ticket. I am not sure about the bound on throttle time.
> It
> > >>> seems
> > >>> > a
> > >>> > > > >>> little
> > >>> > > > >>> > difficult to come up with a good bound. If the bound is
> too
> > >>> > large,
> > >>> > > it
> > >>> > > > >>> does
> > >>> > > > >>> > not really help solve the various timeout issue we may
> > face.
> > >>> If
> > >>> > the
> > >>> > > > >>> bound
> > >>> > > > >>> > is too low, the quota is essentially not honored. We may
> > >>> > > potentially
> > >>> > > > >>> treat
> > >>> > > > >>> > different requests differently, but that seems too
> > >>> complicated
> > >>> > and
> > >>> > > > >>> error
> > >>> > > > >>> > prone.
> > >>> > > > >>> >
> > >>> > > > >>> > IMO, the key improvement we want to make is to tell the
> > >>> clients
> > >>> > how
> > >>> > > > >>> long
> > >>> > > > >>> > they will be throttled so the clients knows what happened
> > so
> > >>> they
> > >>> > > can
> > >>> > > > >>> act
> > >>> > > > >>> > accordingly instead of waiting naively. Muting the socket
> > on
> > >>> the
> > >>> > > > broker
> > >>> > > > >>> > side is just in case of non-cooperating clients. For the
> > >>> existing
> > >>> > > > >>> clients,
> > >>> > > > >>> > it seems this does not have much impact compare with what
> > we
> > >>> have
> > >>> > > > now.
> > >>> > > > >>> >
> > >>> > > > >>> > Thanks,
> > >>> > > > >>> >
> > >>> > > > >>> > Jiangjie (Becket) Qin
> > >>> > > > >>> >
> > >>> > > > >>> >
> > >>> > > > >>> >
> > >>> > > > >>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <
> > >>> > > > >>> rajinisivaram@gmail.com>
> > >>> > > > >>> > wrote:
> > >>> > > > >>> >
> > >>> > > > >>> > > Hi Becket,
> > >>> > > > >>> > >
> > >>> > > > >>> > > Thank you for the KIP. A few comments:
> > >>> > > > >>> > >
> > >>> > > > >>> > > 1.KIP says:  "*No public interface changes are needed.
> We
> > >>> only
> > >>> > > > >>> propose
> > >>> > > > >>> > > behavior change on the broker side.*"
> > >>> > > > >>> > >
> > >>> > > > >>> > > But from the proposed changes, it sounds like clients
> > will
> > >>> be
> > >>> > > > >>> updated to
> > >>> > > > >>> > > wait for throttle-time before sending next response,
> and
> > >>> also
> > >>> > not
> > >>> > > > >>> handle
> > >>> > > > >>> > > idle disconnections during that time. Doesn't that mean
> > >>> that
> > >>> > > > clients
> > >>> > > > >>> need
> > >>> > > > >>> > > to know that the broker has sent the response before
> > >>> > throttling,
> > >>> > > > >>> > requiring
> > >>> > > > >>> > > protocol/version change?
> > >>> > > > >>> > >
> > >>> > > > >>> > >
> > >>> > > > >>> > > 2. At the moment, broker failures are detected by
> clients
> > >>> (and
> > >>> > > vice
> > >>> > > > >>> > versa)
> > >>> > > > >>> > > within connections.max.idle.ms. By removing this check
> > >>> for an
> > >>> > > > >>> unlimited
> > >>> > > > >>> > > throttle time, failure detection could be delayed.
> > >>> > > > >>> > >
> > >>> > > > >>> > >
> > >>> > > > >>> > > 3. KIP says  "*Since this subsequent request is not
> > >>> actually
> > >>> > > > handled
> > >>> > > > >>> > until
> > >>> > > > >>> > > the broker unmutes the channel, the client can hit
> > >>> > > > >>> request.timeout.ms
> > >>> > > > >>> > > <http://request.timeout.ms> and reconnect. However,
> this
> > >>> is no
> > >>> > > > worse
> > >>> > > > >>> > than
> > >>> > > > >>> > > the current state.*"
> > >>> > > > >>> > >
> > >>> > > > >>> > > I think this could be worse than the current state
> > because
> > >>> > broker
> > >>> > > > >>> doesn't
> > >>> > > > >>> > > detect and close the channel for an unlimited throttle
> > >>> time,
> > >>> > > while
> > >>> > > > >>> new
> > >>> > > > >>> > > connections will get accepted. As a result, lot of
> > >>> connections
> > >>> > > > could
> > >>> > > > >>> be
> > >>> > > > >>> > in
> > >>> > > > >>> > > CLOSE_WAIT state when throttle time is high.
> > >>> > > > >>> > >
> > >>> > > > >>> > >
> > >>> > > > >>> > > Perhaps it is better to combine this KIP with a bound
> on
> > >>> > throttle
> > >>> > > > >>> time?
> > >>> > > > >>> > >
> > >>> > > > >>> > >
> > >>> > > > >>> > > Regards,
> > >>> > > > >>> > >
> > >>> > > > >>> > >
> > >>> > > > >>> > > Rajini
> > >>> > > > >>> > >
> > >>> > > > >>> > >
> > >>> > > > >>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <
> > >>> > becket.qin@gmail.com
> > >>> > > >
> > >>> > > > >>> wrote:
> > >>> > > > >>> > >
> > >>> > > > >>> > > > Thanks for the comment, Jun,
> > >>> > > > >>> > > >
> > >>> > > > >>> > > > 1. Yes, you are right. This could also happen with
> the
> > >>> > current
> > >>> > > > >>> quota
> > >>> > > > >>> > > > mechanism because we are essentially muting the
> socket
> > >>> during
> > >>> > > > >>> throttle
> > >>> > > > >>> > > > time. There might be two ways to solve this.
> > >>> > > > >>> > > > A) use another socket to send heartbeat.
> > >>> > > > >>> > > > B) let the GroupCoordinator know that the client will
> > not
> > >>> > > > >>> heartbeat for
> > >>> > > > >>> > > > some time.
> > >>> > > > >>> > > > It seems the first solution is cleaner.
> > >>> > > > >>> > > >
> > >>> > > > >>> > > > 2. For consumer it seems returning an empty response
> > is a
> > >>> > > better
> > >>> > > > >>> > option.
> > >>> > > > >>> > > In
> > >>> > > > >>> > > > the producer case, if there is a spike in traffic.
> The
> > >>> > brokers
> > >>> > > > >>> will see
> > >>> > > > >>> > > > queued up requests, but that is hard to avoid unless
> we
> > >>> have
> > >>> > > > >>> connection
> > >>> > > > >>> > > > level quota, which is a bigger change and may be
> easier
> > >>> to
> > >>> > > > discuss
> > >>> > > > >>> it
> > >>> > > > >>> > in
> > >>> > > > >>> > > a
> > >>> > > > >>> > > > separate KIP.
> > >>> > > > >>> > > >
> > >>> > > > >>> > > > Thanks,
> > >>> > > > >>> > > >
> > >>> > > > >>> > > > Jiangjie (Becket) Qin
> > >>> > > > >>> > > >
> > >>> > > > >>> > > >
> > >>> > > > >>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <
> > >>> jun@confluent.io>
> > >>> > > > wrote:
> > >>> > > > >>> > > >
> > >>> > > > >>> > > > > Hi, Jiangjie,
> > >>> > > > >>> > > > >
> > >>> > > > >>> > > > > Thanks for bringing this up. A couple of quick
> > >>> thoughts.
> > >>> > > > >>> > > > >
> > >>> > > > >>> > > > > 1. If the throttle time is large, what can happen
> is
> > >>> that a
> > >>> > > > >>> consumer
> > >>> > > > >>> > > > won't
> > >>> > > > >>> > > > > be able to heart beat to the group coordinator
> > frequent
> > >>> > > enough.
> > >>> > > > >>> In
> > >>> > > > >>> > that
> > >>> > > > >>> > > > > case, even with this KIP, it seems there could be
> > >>> frequent
> > >>> > > > >>> consumer
> > >>> > > > >>> > > group
> > >>> > > > >>> > > > > rebalances.
> > >>> > > > >>> > > > >
> > >>> > > > >>> > > > > 2. If we return a response immediately, for the
> > >>> consumer,
> > >>> > do
> > >>> > > we
> > >>> > > > >>> > return
> > >>> > > > >>> > > > the
> > >>> > > > >>> > > > > requested data or an empty response? If we do the
> > >>> former,
> > >>> > it
> > >>> > > > may
> > >>> > > > >>> not
> > >>> > > > >>> > > > > protect against the case when there are multiple
> > >>> consumer
> > >>> > > > >>> instances
> > >>> > > > >>> > > > > associated with the same user/clientid.
> > >>> > > > >>> > > > >
> > >>> > > > >>> > > > > Jun
> > >>> > > > >>> > > > >
> > >>> > > > >>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <
> > >>> > > > becket.qin@gmail.com
> > >>> > > > >>> >
> > >>> > > > >>> > > wrote:
> > >>> > > > >>> > > > >
> > >>> > > > >>> > > > > > Hi,
> > >>> > > > >>> > > > > >
> > >>> > > > >>> > > > > > We would like to start the discussion on KIP-219.
> > >>> > > > >>> > > > > >
> > >>> > > > >>> > > > > > The KIP tries to improve quota throttling time
> > >>> > > communication
> > >>> > > > >>> > between
> > >>> > > > >>> > > > > > brokers and clients to avoid clients timeout in
> > case
> > >>> of
> > >>> > > long
> > >>> > > > >>> > > throttling
> > >>> > > > >>> > > > > > time.
> > >>> > > > >>> > > > > >
> > >>> > > > >>> > > > > > The KIP link is following:
> > >>> > > > >>> > > > > > https://cwiki.apache.org/confl
> > >>> uence/display/KAFKA/KIP-
> > >>> > > > >>> > > > > 219+-+Improve+quota+
> > >>> > > > >>> > > > > > communication
> > >>> > > > >>> > > > > >
> > >>> > > > >>> > > > > > Comments are welcome.
> > >>> > > > >>> > > > > >
> > >>> > > > >>> > > > > > Thanks,
> > >>> > > > >>> > > > > >
> > >>> > > > >>> > > > > > Jiangjie (Becket) Qin
> > >>> > > > >>> > > > > >
> > >>> > > > >>> > > > >
> > >>> > > > >>> > > >
> > >>> > > > >>> > >
> > >>> > > > >>> >
> > >>> > > > >>>
> > >>> > > > >>
> > >>> > > > >>
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> > >
> >
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Jonghyun Lee <jo...@gmail.com>.
Hi Ismael,

I'll start a vote thread.

Jon

On Wed, Apr 25, 2018 at 2:28 PM, Dong Lin <li...@gmail.com> wrote:

> + Jon
>
> On Tue, Apr 24, 2018 at 10:04 PM, Ismael Juma <is...@juma.me.uk> wrote:
>
>> Hi Jon,
>>
>> Not sure about this approach. It's worth mentioning this in the vote
>> thread
>> as well so that the people who voted originally have a chance to comment.
>> Also, we should really get input from developers of Kafka clients
>> (librdkafka, kafka-python, etc.) for this KIP.
>>
>> Ismael
>>
>> On Thu, Apr 5, 2018 at 2:50 PM, Jonghyun Lee <jo...@gmail.com> wrote:
>>
>> > Hi,
>> >
>> > I have been implementing KIP-219. I discussed the interface changes with
>> > Becket Qin and Dong Lin, and we decided to bump up the protocol version
>> of
>> > ApiVersionsRequest and ApiVersionsResponse only, instead of bumping up
>> all
>> > requests/responses that may be throttled, to indicate clients whether or
>> > not brokers perform throttling before sending out responses. We think
>> this
>> > is sufficient given that network client exchanges
>> > ApiVersionsRequest/Response with each broker when establishing
>> connection
>> > to it and thus it can detect the broker's throttling behavior just by
>> > examining the ApiVersionsResponse version.
>> >
>> > Please respond to this e-mail if you have any questions or concerns.
>> >
>> > Thanks,
>> > Jon Lee
>> >
>> >
>> > On Thu, Apr 5, 2018 at 2:29 PM, Becket Qin <be...@gmail.com>
>> wrote:
>> >
>> > >
>> > >
>> > > On Thu, Nov 16, 2017 at 3:49 PM, Becket Qin <be...@gmail.com>
>> > wrote:
>> > >
>> > >> Thanks Rajini,
>> > >>
>> > >> I updated the KIP wiki to clarify the scope of the KIP. To summarize,
>> > the
>> > >> current quota has a few caveats:
>> > >> 1. The brokers are only throttling the NEXT request even if the
>> current
>> > >> request is already violating quota. So the broker will always
>> process at
>> > >> least one request from the client.
>> > >> 2. The brokers are not able to know the client id until they fully
>> read
>> > >> the request out of the sockets even if that client is being
>> throttled.
>> > >> 3. The brokers are not communicating to the clients promptly, so the
>> > >> clients have to blindly wait and sometimes times out unnecessarily.
>> > >>
>> > >> This KIP only tries to address 3 but not 1 and 2 because A) those two
>> > >> issues are sort of orthogonal to 3 and B) the solution to 1 and 2 are
>> > much
>> > >> more complicated and worth a separate discussion.
>> > >>
>> > >> I'll wait till tomorrow and start a voting thread if there are
>> further
>> > >> concerns raised about the KIP.
>> > >>
>> > >> Thanks,
>> > >>
>> > >> Jiangjie (Becket) Qin
>> > >>
>> > >> On Thu, Nov 16, 2017 at 11:47 AM, Rajini Sivaram <
>> > rajinisivaram@gmail.com
>> > >> > wrote:
>> > >>
>> > >>> Hi Becket,
>> > >>>
>> > >>> The current user quota doesn't solve the problem. But I was thinking
>> > that
>> > >>> if we could ensure we don't read more from the network than the
>> quota
>> > >>> allows, we may be able to fix the issue in a different way
>> (throttling
>> > >>> all
>> > >>> connections, each for a limited time prior to reading large
>> requests).
>> > >>> But
>> > >>> it would be more complex (and even more messy for client-id quotas),
>> > so I
>> > >>> can understand why the solution you proposed in the KIP makes sense
>> for
>> > >>> the
>> > >>> scenario that you described.
>> > >>>
>> > >>> Regards,
>> > >>>
>> > >>> Rajini
>> > >>>
>> > >>> On Tue, Nov 14, 2017 at 11:30 PM, Becket Qin <be...@gmail.com>
>> > >>> wrote:
>> > >>>
>> > >>> > Hi Rajini,
>> > >>> >
>> > >>> > We are using SSL so we could use user quota. But I am not sure if
>> > that
>> > >>> > would solve the problem. The key issue in our case is that each
>> > broker
>> > >>> can
>> > >>> > only handle ~300 MB/s of incoming bytes, but the MapReduce job is
>> > >>> trying to
>> > >>> > push 1-2 GB/s, unless we can throttle the clients to 300 MB/s, the
>> > >>> broker
>> > >>> > cannot sustain. In order to do that, we need to be able to
>> throttle
>> > >>> > requests for more than request timeout, potentially a few
>> minutes. It
>> > >>> seems
>> > >>> > neither user quota nor limited throttle time can achieve this.
>> > >>> >
>> > >>> > Thanks,
>> > >>> >
>> > >>> > Jiangjie (Becket) Qin
>> > >>> >
>> > >>> > On Tue, Nov 14, 2017 at 7:44 AM, Rajini Sivaram <
>> > >>> rajinisivaram@gmail.com>
>> > >>> > wrote:
>> > >>> >
>> > >>> > > Hi Becket,
>> > >>> > >
>> > >>> > > For the specific scenario that you described, would it be
>> possible
>> > >>> to use
>> > >>> > > user quotas rather than client-id quotas? With user quotas,
>> perhaps
>> > >>> we
>> > >>> > can
>> > >>> > > throttle more easily before reading requests as well (as you
>> > >>> mentioned,
>> > >>> > the
>> > >>> > > difficulty with client-id quota is that we have to read partial
>> > >>> requests
>> > >>> > > and handle client-ids at network layer making that a much bigger
>> > >>> change).
>> > >>> > > If your clients are using SASL/SSL, I was wondering whether a
>> > >>> solution
>> > >>> > that
>> > >>> > > improves user quotas and limits throttle time would work for
>> you.
>> > >>> > >
>> > >>> > > Regards,
>> > >>> > >
>> > >>> > > Rajini
>> > >>> > >
>> > >>> > >
>> > >>> > >
>> > >>> > > On Thu, Nov 9, 2017 at 12:59 AM, Becket Qin <
>> becket.qin@gmail.com>
>> > >>> > wrote:
>> > >>> > >
>> > >>> > > > Since we will bump up the wire request version, another
>> option is
>> > >>> for
>> > >>> > > > clients that are sending old request versions the broker can
>> just
>> > >>> keep
>> > >>> > > the
>> > >>> > > > current behavior. For clients sending the new request
>> versions,
>> > the
>> > >>> > > broker
>> > >>> > > > can respond then mute the channel as described in the KIP
>> wiki.
>> > In
>> > >>> this
>> > >>> > > > case, muting the channel is mostly for protection purpose. A
>> > >>> correctly
>> > >>> > > > implemented client should back off for throttle time before
>> > >>> sending the
>> > >>> > > > next request. The downside is that the broker needs to keep
>> both
>> > >>> logic
>> > >>> > > and
>> > >>> > > > it seems not gaining much benefit. So personally I prefer to
>> just
>> > >>> mute
>> > >>> > > the
>> > >>> > > > channel. But I am open to different opinions.
>> > >>> > > >
>> > >>> > > > Thanks,
>> > >>> > > >
>> > >>> > > > Jiangjie (Becket) Qin
>> > >>> > > >
>> > >>> > > > On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <
>> becket.qin@gmail.com
>> > >
>> > >>> > wrote:
>> > >>> > > >
>> > >>> > > > > Hi Jun,
>> > >>> > > > >
>> > >>> > > > > Hmm, even if a connection is closed by the client when the
>> > >>> channel is
>> > >>> > > > > muted. After the channel is unmuted, it seems
>> Selector.select()
>> > >>> will
>> > >>> > > > detect
>> > >>> > > > > this and close the socket.
>> > >>> > > > > It is true that before the channel is unmuted the socket
>> will
>> > be
>> > >>> in a
>> > >>> > > > > CLOSE_WAIT state though. So having an arbitrarily long muted
>> > >>> duration
>> > >>> > > may
>> > >>> > > > > indeed cause problem.
>> > >>> > > > >
>> > >>> > > > > Thanks,
>> > >>> > > > >
>> > >>> > > > > Jiangjie (Becket) Qin
>> > >>> > > > >
>> > >>> > > > > On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <
>> > becket.qin@gmail.com
>> > >>> >
>> > >>> > > wrote:
>> > >>> > > > >
>> > >>> > > > >> Hi Rajini,
>> > >>> > > > >>
>> > >>> > > > >> Thanks for the detail explanation. Please see the reply
>> below:
>> > >>> > > > >>
>> > >>> > > > >> 2. Limiting the throttle time to connection.max.idle.ms on
>> > the
>> > >>> > broker
>> > >>> > > > >> side is probably fine. However, clients may have a
>> different
>> > >>> > > > configuration
>> > >>> > > > >> of connection.max.idle.ms and still reconnect before the
>> > >>> throttle
>> > >>> > > time
>> > >>> > > > >> (which is the server side connection.max.idle.ms). It
>> seems
>> > >>> another
>> > >>> > > > back
>> > >>> > > > >> door for quota.
>> > >>> > > > >>
>> > >>> > > > >> 3. I agree we could just mute the server socket until
>> > >>> > > > >> connection.max.idle.ms if the massive CLOSE_WAIT is a big
>> > >>> issue.
>> > >>> > This
>> > >>> > > > >> helps guarantee only connection_rate *
>> connection.max.idle.ms
>> > >>> > sockets
>> > >>> > > > >> will be in CLOSE_WAIT state. For cooperative clients,
>> unmuting
>> > >>> the
>> > >>> > > > socket
>> > >>> > > > >> will not have negative impact.
>> > >>> > > > >>
>> > >>> > > > >> 4. My concern for capping the throttle time to
>> > >>> metrics.window.ms is
>> > >>> > > > that
>> > >>> > > > >> we will not be able to enforce quota effectively. It might
>> be
>> > >>> useful
>> > >>> > > to
>> > >>> > > > >> explain this with a real example we are trying to solve. We
>> > >>> have a
>> > >>> > > > >> MapReduce job pushing data to a Kafka cluster. The
>> MapReduce
>> > >>> job has
>> > >>> > > > >> hundreds of producers and each of them sends a normal sized
>> > >>> > > > ProduceRequest
>> > >>> > > > >> (~2 MB) to each of the brokers in the cluster. Apparently
>> the
>> > >>> client
>> > >>> > > id
>> > >>> > > > >> will ran out of bytes quota pretty quickly, and the broker
>> > >>> started
>> > >>> > to
>> > >>> > > > >> throttle the producers. The throttle time could actually be
>> > >>> pretty
>> > >>> > > long
>> > >>> > > > >> (e.g. a few minute). At that point, request queue time on
>> the
>> > >>> > brokers
>> > >>> > > > was
>> > >>> > > > >> around 30 seconds. After that, a bunch of producer hit
>> > >>> > > > request.timeout.ms
>> > >>> > > > >> and reconnected and sent the next request again, which
>> causes
>> > >>> > another
>> > >>> > > > spike
>> > >>> > > > >> and a longer queue.
>> > >>> > > > >>
>> > >>> > > > >> In the above case, unless we set the quota window to be
>> pretty
>> > >>> big,
>> > >>> > we
>> > >>> > > > >> will not be able to enforce the quota. And if we set the
>> > window
>> > >>> size
>> > >>> > > to
>> > >>> > > > a
>> > >>> > > > >> large value, the request might be throttled for longer than
>> > >>> > > > >> connection.max.idle.ms.
>> > >>> > > > >>
>> > >>> > > > >> > We need a solution to improve flow control for
>> well-behaved
>> > >>> > clients
>> > >>> > > > >> > which currently rely entirely on broker's throttling. The
>> > KIP
>> > >>> > > > addresses
>> > >>> > > > >> > this using co-operative clients that sleep for an
>> unbounded
>> > >>> > throttle
>> > >>> > > > >> time.
>> > >>> > > > >> > I feel this is not ideal since the result is traffic
>> with a
>> > >>> lot of
>> > >>> > > > >> spikes.
>> > >>> > > > >> > Feedback from brokers to enable flow control in the
>> client
>> > is
>> > >>> a
>> > >>> > good
>> > >>> > > > >> idea,
>> > >>> > > > >> > but clients with excessive throttle times should really
>> have
>> > >>> been
>> > >>> > > > >> > configured with smaller batch sizes.
>> > >>> > > > >>
>> > >>> > > > >> This is not really about a single producer with large
>> size, it
>> > >>> is a
>> > >>> > > lot
>> > >>> > > > >> of small producers talking to the client at the same time.
>> > >>> Reducing
>> > >>> > > the
>> > >>> > > > >> batch size does not help much here. Also note that after
>> the
>> > >>> spike
>> > >>> > > > >> traffic at the very beginning, the throttle time of the
>> > >>> > > ProduceRequests
>> > >>> > > > >> processed later are actually going to be increasing (for
>> > >>> example,
>> > >>> > the
>> > >>> > > > first
>> > >>> > > > >> throttled request will be throttled for 1 second, the
>> second
>> > >>> > throttled
>> > >>> > > > >> request will be throttled for 10 sec, etc.). Due to the
>> > throttle
>> > >>> > time
>> > >>> > > > >> variation, if every producer honors the throttle time,
>> there
>> > >>> will
>> > >>> > not
>> > >>> > > > be a
>> > >>> > > > >> next spike after the first produce.
>> > >>> > > > >>
>> > >>> > > > >> > We need a solution to enforce smaller quotas to protect
>> the
>> > >>> broker
>> > >>> > > > >> > from misbehaving clients. The KIP addresses this by
>> muting
>> > >>> > channels
>> > >>> > > > for
>> > >>> > > > >> an
>> > >>> > > > >> > unbounded time. This introduces problems of channels in
>> > >>> > CLOSE_WAIT.
>> > >>> > > > And
>> > >>> > > > >> > doesn't really solve all issues with misbehaving clients
>> > >>> since new
>> > >>> > > > >> > connections can be created to bypass quotas.
>> > >>> > > > >>
>> > >>> > > > >> Our current quota only protects cooperating clients because
>> > our
>> > >>> > quota
>> > >>> > > is
>> > >>> > > > >> really throttling the NEXT request after process a request
>> > even
>> > >>> if
>> > >>> > > this
>> > >>> > > > >> request itself has already violated quota. The misbehaving
>> > >>> client
>> > >>> > are
>> > >>> > > > not
>> > >>> > > > >> protected at all with the current quota mechanism. Like you
>> > >>> > > mentioned, a
>> > >>> > > > >> connection quota is required. We have been discussing about
>> > >>> this at
>> > >>> > > > >> LinkedIn for some time. Doing it right requires some major
>> > >>> changes
>> > >>> > > such
>> > >>> > > > as
>> > >>> > > > >> partially reading a request to identify the client id at
>> > network
>> > >>> > level
>> > >>> > > > and
>> > >>> > > > >> disconnect misbehaving clients.
>> > >>> > > > >>
>> > >>> > > > >> While handling misbehaving clients is important, we are not
>> > >>> trying
>> > >>> > to
>> > >>> > > > >> address that in this KIP. This KIP is trying to improve the
>> > >>> > > > communication
>> > >>> > > > >> with good clients. Muting the channel is more of a
>> migration
>> > >>> plan so
>> > >>> > > > that
>> > >>> > > > >> we don't have regression on the old innocent (but good)
>> > clients.
>> > >>> > > > >>
>> > >>> > > > >> Thanks,
>> > >>> > > > >>
>> > >>> > > > >> Jiangjie (Becket) Qin
>> > >>> > > > >>
>> > >>> > > > >>
>> > >>> > > > >> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <ju...@confluent.io>
>> > >>> wrote:
>> > >>> > > > >>
>> > >>> > > > >>> Hi, Jiangjie,
>> > >>> > > > >>>
>> > >>> > > > >>> 3. If a client closes a socket connection, typically the
>> > server
>> > >>> > only
>> > >>> > > > >>> finds
>> > >>> > > > >>> this out by reading from the channel and getting a
>> negative
>> > >>> size
>> > >>> > > during
>> > >>> > > > >>> the
>> > >>> > > > >>> read. So, if a channel is muted by the server, the server
>> > >>> won't be
>> > >>> > > able
>> > >>> > > > >>> to
>> > >>> > > > >>> detect the closing of the connection by the client after
>> the
>> > >>> socket
>> > >>> > > is
>> > >>> > > > >>> unmuted. That's probably what Rajini wants to convey.
>> > >>> > > > >>>
>> > >>> > > > >>> Thanks,
>> > >>> > > > >>>
>> > >>> > > > >>> Jun
>> > >>> > > > >>>
>> > >>> > > > >>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <
>> > >>> becket.qin@gmail.com>
>> > >>> > > > wrote:
>> > >>> > > > >>>
>> > >>> > > > >>> > Thanks Rajini.
>> > >>> > > > >>> >
>> > >>> > > > >>> > 1. Good point. We do need to bump up the protocol
>> version
>> > so
>> > >>> that
>> > >>> > > the
>> > >>> > > > >>> new
>> > >>> > > > >>> > clients do not wait for another throttle time when they
>> are
>> > >>> > talking
>> > >>> > > > to
>> > >>> > > > >>> old
>> > >>> > > > >>> > brokers. I'll update the KIP.
>> > >>> > > > >>> >
>> > >>> > > > >>> > 2. That is true. But the client was not supposed to send
>> > >>> request
>> > >>> > to
>> > >>> > > > the
>> > >>> > > > >>> > broker during that period anyways. So detecting the
>> broker
>> > >>> > failure
>> > >>> > > > >>> later
>> > >>> > > > >>> > seems fine?
>> > >>> > > > >>> >
>> > >>> > > > >>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as
>> > the
>> > >>> > > current
>> > >>> > > > >>> state?
>> > >>> > > > >>> > Currently the broker will still mute the socket until it
>> > >>> sends
>> > >>> > the
>> > >>> > > > >>> response
>> > >>> > > > >>> > back. If the clients disconnect while they are being
>> > >>> throttled,
>> > >>> > the
>> > >>> > > > >>> closed
>> > >>> > > > >>> > socket will not be detected until the throttle time has
>> > >>> passed.
>> > >>> > > > >>> >
>> > >>> > > > >>> > Jun also suggested to bound the time by
>> > >>> metric.sample.window.ms
>> > >>> > in
>> > >>> > > > the
>> > >>> > > > >>> > ticket. I am not sure about the bound on throttle time.
>> It
>> > >>> seems
>> > >>> > a
>> > >>> > > > >>> little
>> > >>> > > > >>> > difficult to come up with a good bound. If the bound is
>> too
>> > >>> > large,
>> > >>> > > it
>> > >>> > > > >>> does
>> > >>> > > > >>> > not really help solve the various timeout issue we may
>> > face.
>> > >>> If
>> > >>> > the
>> > >>> > > > >>> bound
>> > >>> > > > >>> > is too low, the quota is essentially not honored. We may
>> > >>> > > potentially
>> > >>> > > > >>> treat
>> > >>> > > > >>> > different requests differently, but that seems too
>> > >>> complicated
>> > >>> > and
>> > >>> > > > >>> error
>> > >>> > > > >>> > prone.
>> > >>> > > > >>> >
>> > >>> > > > >>> > IMO, the key improvement we want to make is to tell the
>> > >>> clients
>> > >>> > how
>> > >>> > > > >>> long
>> > >>> > > > >>> > they will be throttled so the clients knows what
>> happened
>> > so
>> > >>> they
>> > >>> > > can
>> > >>> > > > >>> act
>> > >>> > > > >>> > accordingly instead of waiting naively. Muting the
>> socket
>> > on
>> > >>> the
>> > >>> > > > broker
>> > >>> > > > >>> > side is just in case of non-cooperating clients. For the
>> > >>> existing
>> > >>> > > > >>> clients,
>> > >>> > > > >>> > it seems this does not have much impact compare with
>> what
>> > we
>> > >>> have
>> > >>> > > > now.
>> > >>> > > > >>> >
>> > >>> > > > >>> > Thanks,
>> > >>> > > > >>> >
>> > >>> > > > >>> > Jiangjie (Becket) Qin
>> > >>> > > > >>> >
>> > >>> > > > >>> >
>> > >>> > > > >>> >
>> > >>> > > > >>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <
>> > >>> > > > >>> rajinisivaram@gmail.com>
>> > >>> > > > >>> > wrote:
>> > >>> > > > >>> >
>> > >>> > > > >>> > > Hi Becket,
>> > >>> > > > >>> > >
>> > >>> > > > >>> > > Thank you for the KIP. A few comments:
>> > >>> > > > >>> > >
>> > >>> > > > >>> > > 1.KIP says:  "*No public interface changes are
>> needed. We
>> > >>> only
>> > >>> > > > >>> propose
>> > >>> > > > >>> > > behavior change on the broker side.*"
>> > >>> > > > >>> > >
>> > >>> > > > >>> > > But from the proposed changes, it sounds like clients
>> > will
>> > >>> be
>> > >>> > > > >>> updated to
>> > >>> > > > >>> > > wait for throttle-time before sending next response,
>> and
>> > >>> also
>> > >>> > not
>> > >>> > > > >>> handle
>> > >>> > > > >>> > > idle disconnections during that time. Doesn't that
>> mean
>> > >>> that
>> > >>> > > > clients
>> > >>> > > > >>> need
>> > >>> > > > >>> > > to know that the broker has sent the response before
>> > >>> > throttling,
>> > >>> > > > >>> > requiring
>> > >>> > > > >>> > > protocol/version change?
>> > >>> > > > >>> > >
>> > >>> > > > >>> > >
>> > >>> > > > >>> > > 2. At the moment, broker failures are detected by
>> clients
>> > >>> (and
>> > >>> > > vice
>> > >>> > > > >>> > versa)
>> > >>> > > > >>> > > within connections.max.idle.ms. By removing this
>> check
>> > >>> for an
>> > >>> > > > >>> unlimited
>> > >>> > > > >>> > > throttle time, failure detection could be delayed.
>> > >>> > > > >>> > >
>> > >>> > > > >>> > >
>> > >>> > > > >>> > > 3. KIP says  "*Since this subsequent request is not
>> > >>> actually
>> > >>> > > > handled
>> > >>> > > > >>> > until
>> > >>> > > > >>> > > the broker unmutes the channel, the client can hit
>> > >>> > > > >>> request.timeout.ms
>> > >>> > > > >>> > > <http://request.timeout.ms> and reconnect. However,
>> this
>> > >>> is no
>> > >>> > > > worse
>> > >>> > > > >>> > than
>> > >>> > > > >>> > > the current state.*"
>> > >>> > > > >>> > >
>> > >>> > > > >>> > > I think this could be worse than the current state
>> > because
>> > >>> > broker
>> > >>> > > > >>> doesn't
>> > >>> > > > >>> > > detect and close the channel for an unlimited throttle
>> > >>> time,
>> > >>> > > while
>> > >>> > > > >>> new
>> > >>> > > > >>> > > connections will get accepted. As a result, lot of
>> > >>> connections
>> > >>> > > > could
>> > >>> > > > >>> be
>> > >>> > > > >>> > in
>> > >>> > > > >>> > > CLOSE_WAIT state when throttle time is high.
>> > >>> > > > >>> > >
>> > >>> > > > >>> > >
>> > >>> > > > >>> > > Perhaps it is better to combine this KIP with a bound
>> on
>> > >>> > throttle
>> > >>> > > > >>> time?
>> > >>> > > > >>> > >
>> > >>> > > > >>> > >
>> > >>> > > > >>> > > Regards,
>> > >>> > > > >>> > >
>> > >>> > > > >>> > >
>> > >>> > > > >>> > > Rajini
>> > >>> > > > >>> > >
>> > >>> > > > >>> > >
>> > >>> > > > >>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <
>> > >>> > becket.qin@gmail.com
>> > >>> > > >
>> > >>> > > > >>> wrote:
>> > >>> > > > >>> > >
>> > >>> > > > >>> > > > Thanks for the comment, Jun,
>> > >>> > > > >>> > > >
>> > >>> > > > >>> > > > 1. Yes, you are right. This could also happen with
>> the
>> > >>> > current
>> > >>> > > > >>> quota
>> > >>> > > > >>> > > > mechanism because we are essentially muting the
>> socket
>> > >>> during
>> > >>> > > > >>> throttle
>> > >>> > > > >>> > > > time. There might be two ways to solve this.
>> > >>> > > > >>> > > > A) use another socket to send heartbeat.
>> > >>> > > > >>> > > > B) let the GroupCoordinator know that the client
>> will
>> > not
>> > >>> > > > >>> heartbeat for
>> > >>> > > > >>> > > > some time.
>> > >>> > > > >>> > > > It seems the first solution is cleaner.
>> > >>> > > > >>> > > >
>> > >>> > > > >>> > > > 2. For consumer it seems returning an empty response
>> > is a
>> > >>> > > better
>> > >>> > > > >>> > option.
>> > >>> > > > >>> > > In
>> > >>> > > > >>> > > > the producer case, if there is a spike in traffic.
>> The
>> > >>> > brokers
>> > >>> > > > >>> will see
>> > >>> > > > >>> > > > queued up requests, but that is hard to avoid
>> unless we
>> > >>> have
>> > >>> > > > >>> connection
>> > >>> > > > >>> > > > level quota, which is a bigger change and may be
>> easier
>> > >>> to
>> > >>> > > > discuss
>> > >>> > > > >>> it
>> > >>> > > > >>> > in
>> > >>> > > > >>> > > a
>> > >>> > > > >>> > > > separate KIP.
>> > >>> > > > >>> > > >
>> > >>> > > > >>> > > > Thanks,
>> > >>> > > > >>> > > >
>> > >>> > > > >>> > > > Jiangjie (Becket) Qin
>> > >>> > > > >>> > > >
>> > >>> > > > >>> > > >
>> > >>> > > > >>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <
>> > >>> jun@confluent.io>
>> > >>> > > > wrote:
>> > >>> > > > >>> > > >
>> > >>> > > > >>> > > > > Hi, Jiangjie,
>> > >>> > > > >>> > > > >
>> > >>> > > > >>> > > > > Thanks for bringing this up. A couple of quick
>> > >>> thoughts.
>> > >>> > > > >>> > > > >
>> > >>> > > > >>> > > > > 1. If the throttle time is large, what can happen
>> is
>> > >>> that a
>> > >>> > > > >>> consumer
>> > >>> > > > >>> > > > won't
>> > >>> > > > >>> > > > > be able to heart beat to the group coordinator
>> > frequent
>> > >>> > > enough.
>> > >>> > > > >>> In
>> > >>> > > > >>> > that
>> > >>> > > > >>> > > > > case, even with this KIP, it seems there could be
>> > >>> frequent
>> > >>> > > > >>> consumer
>> > >>> > > > >>> > > group
>> > >>> > > > >>> > > > > rebalances.
>> > >>> > > > >>> > > > >
>> > >>> > > > >>> > > > > 2. If we return a response immediately, for the
>> > >>> consumer,
>> > >>> > do
>> > >>> > > we
>> > >>> > > > >>> > return
>> > >>> > > > >>> > > > the
>> > >>> > > > >>> > > > > requested data or an empty response? If we do the
>> > >>> former,
>> > >>> > it
>> > >>> > > > may
>> > >>> > > > >>> not
>> > >>> > > > >>> > > > > protect against the case when there are multiple
>> > >>> consumer
>> > >>> > > > >>> instances
>> > >>> > > > >>> > > > > associated with the same user/clientid.
>> > >>> > > > >>> > > > >
>> > >>> > > > >>> > > > > Jun
>> > >>> > > > >>> > > > >
>> > >>> > > > >>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <
>> > >>> > > > becket.qin@gmail.com
>> > >>> > > > >>> >
>> > >>> > > > >>> > > wrote:
>> > >>> > > > >>> > > > >
>> > >>> > > > >>> > > > > > Hi,
>> > >>> > > > >>> > > > > >
>> > >>> > > > >>> > > > > > We would like to start the discussion on
>> KIP-219.
>> > >>> > > > >>> > > > > >
>> > >>> > > > >>> > > > > > The KIP tries to improve quota throttling time
>> > >>> > > communication
>> > >>> > > > >>> > between
>> > >>> > > > >>> > > > > > brokers and clients to avoid clients timeout in
>> > case
>> > >>> of
>> > >>> > > long
>> > >>> > > > >>> > > throttling
>> > >>> > > > >>> > > > > > time.
>> > >>> > > > >>> > > > > >
>> > >>> > > > >>> > > > > > The KIP link is following:
>> > >>> > > > >>> > > > > > https://cwiki.apache.org/confl
>> > >>> uence/display/KAFKA/KIP-
>> > >>> > > > >>> > > > > 219+-+Improve+quota+
>> > >>> > > > >>> > > > > > communication
>> > >>> > > > >>> > > > > >
>> > >>> > > > >>> > > > > > Comments are welcome.
>> > >>> > > > >>> > > > > >
>> > >>> > > > >>> > > > > > Thanks,
>> > >>> > > > >>> > > > > >
>> > >>> > > > >>> > > > > > Jiangjie (Becket) Qin
>> > >>> > > > >>> > > > > >
>> > >>> > > > >>> > > > >
>> > >>> > > > >>> > > >
>> > >>> > > > >>> > >
>> > >>> > > > >>> >
>> > >>> > > > >>>
>> > >>> > > > >>
>> > >>> > > > >>
>> > >>> > > > >
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> > >>
>> > >>
>> > >
>> >
>>
>
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Dong Lin <li...@gmail.com>.
+ Jon

On Tue, Apr 24, 2018 at 10:04 PM, Ismael Juma <is...@juma.me.uk> wrote:

> Hi Jon,
>
> Not sure about this approach. It's worth mentioning this in the vote thread
> as well so that the people who voted originally have a chance to comment.
> Also, we should really get input from developers of Kafka clients
> (librdkafka, kafka-python, etc.) for this KIP.
>
> Ismael
>
> On Thu, Apr 5, 2018 at 2:50 PM, Jonghyun Lee <jo...@gmail.com> wrote:
>
> > Hi,
> >
> > I have been implementing KIP-219. I discussed the interface changes with
> > Becket Qin and Dong Lin, and we decided to bump up the protocol version
> of
> > ApiVersionsRequest and ApiVersionsResponse only, instead of bumping up
> all
> > requests/responses that may be throttled, to indicate clients whether or
> > not brokers perform throttling before sending out responses. We think
> this
> > is sufficient given that network client exchanges
> > ApiVersionsRequest/Response with each broker when establishing connection
> > to it and thus it can detect the broker's throttling behavior just by
> > examining the ApiVersionsResponse version.
> >
> > Please respond to this e-mail if you have any questions or concerns.
> >
> > Thanks,
> > Jon Lee
> >
> >
> > On Thu, Apr 5, 2018 at 2:29 PM, Becket Qin <be...@gmail.com> wrote:
> >
> > >
> > >
> > > On Thu, Nov 16, 2017 at 3:49 PM, Becket Qin <be...@gmail.com>
> > wrote:
> > >
> > >> Thanks Rajini,
> > >>
> > >> I updated the KIP wiki to clarify the scope of the KIP. To summarize,
> > the
> > >> current quota has a few caveats:
> > >> 1. The brokers are only throttling the NEXT request even if the
> current
> > >> request is already violating quota. So the broker will always process
> at
> > >> least one request from the client.
> > >> 2. The brokers are not able to know the client id until they fully
> read
> > >> the request out of the sockets even if that client is being throttled.
> > >> 3. The brokers are not communicating to the clients promptly, so the
> > >> clients have to blindly wait and sometimes times out unnecessarily.
> > >>
> > >> This KIP only tries to address 3 but not 1 and 2 because A) those two
> > >> issues are sort of orthogonal to 3 and B) the solution to 1 and 2 are
> > much
> > >> more complicated and worth a separate discussion.
> > >>
> > >> I'll wait till tomorrow and start a voting thread if there are further
> > >> concerns raised about the KIP.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >> On Thu, Nov 16, 2017 at 11:47 AM, Rajini Sivaram <
> > rajinisivaram@gmail.com
> > >> > wrote:
> > >>
> > >>> Hi Becket,
> > >>>
> > >>> The current user quota doesn't solve the problem. But I was thinking
> > that
> > >>> if we could ensure we don't read more from the network than the quota
> > >>> allows, we may be able to fix the issue in a different way
> (throttling
> > >>> all
> > >>> connections, each for a limited time prior to reading large
> requests).
> > >>> But
> > >>> it would be more complex (and even more messy for client-id quotas),
> > so I
> > >>> can understand why the solution you proposed in the KIP makes sense
> for
> > >>> the
> > >>> scenario that you described.
> > >>>
> > >>> Regards,
> > >>>
> > >>> Rajini
> > >>>
> > >>> On Tue, Nov 14, 2017 at 11:30 PM, Becket Qin <be...@gmail.com>
> > >>> wrote:
> > >>>
> > >>> > Hi Rajini,
> > >>> >
> > >>> > We are using SSL so we could use user quota. But I am not sure if
> > that
> > >>> > would solve the problem. The key issue in our case is that each
> > broker
> > >>> can
> > >>> > only handle ~300 MB/s of incoming bytes, but the MapReduce job is
> > >>> trying to
> > >>> > push 1-2 GB/s, unless we can throttle the clients to 300 MB/s, the
> > >>> broker
> > >>> > cannot sustain. In order to do that, we need to be able to throttle
> > >>> > requests for more than request timeout, potentially a few minutes.
> It
> > >>> seems
> > >>> > neither user quota nor limited throttle time can achieve this.
> > >>> >
> > >>> > Thanks,
> > >>> >
> > >>> > Jiangjie (Becket) Qin
> > >>> >
> > >>> > On Tue, Nov 14, 2017 at 7:44 AM, Rajini Sivaram <
> > >>> rajinisivaram@gmail.com>
> > >>> > wrote:
> > >>> >
> > >>> > > Hi Becket,
> > >>> > >
> > >>> > > For the specific scenario that you described, would it be
> possible
> > >>> to use
> > >>> > > user quotas rather than client-id quotas? With user quotas,
> perhaps
> > >>> we
> > >>> > can
> > >>> > > throttle more easily before reading requests as well (as you
> > >>> mentioned,
> > >>> > the
> > >>> > > difficulty with client-id quota is that we have to read partial
> > >>> requests
> > >>> > > and handle client-ids at network layer making that a much bigger
> > >>> change).
> > >>> > > If your clients are using SASL/SSL, I was wondering whether a
> > >>> solution
> > >>> > that
> > >>> > > improves user quotas and limits throttle time would work for you.
> > >>> > >
> > >>> > > Regards,
> > >>> > >
> > >>> > > Rajini
> > >>> > >
> > >>> > >
> > >>> > >
> > >>> > > On Thu, Nov 9, 2017 at 12:59 AM, Becket Qin <
> becket.qin@gmail.com>
> > >>> > wrote:
> > >>> > >
> > >>> > > > Since we will bump up the wire request version, another option
> is
> > >>> for
> > >>> > > > clients that are sending old request versions the broker can
> just
> > >>> keep
> > >>> > > the
> > >>> > > > current behavior. For clients sending the new request versions,
> > the
> > >>> > > broker
> > >>> > > > can respond then mute the channel as described in the KIP wiki.
> > In
> > >>> this
> > >>> > > > case, muting the channel is mostly for protection purpose. A
> > >>> correctly
> > >>> > > > implemented client should back off for throttle time before
> > >>> sending the
> > >>> > > > next request. The downside is that the broker needs to keep
> both
> > >>> logic
> > >>> > > and
> > >>> > > > it seems not gaining much benefit. So personally I prefer to
> just
> > >>> mute
> > >>> > > the
> > >>> > > > channel. But I am open to different opinions.
> > >>> > > >
> > >>> > > > Thanks,
> > >>> > > >
> > >>> > > > Jiangjie (Becket) Qin
> > >>> > > >
> > >>> > > > On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <
> becket.qin@gmail.com
> > >
> > >>> > wrote:
> > >>> > > >
> > >>> > > > > Hi Jun,
> > >>> > > > >
> > >>> > > > > Hmm, even if a connection is closed by the client when the
> > >>> channel is
> > >>> > > > > muted. After the channel is unmuted, it seems
> Selector.select()
> > >>> will
> > >>> > > > detect
> > >>> > > > > this and close the socket.
> > >>> > > > > It is true that before the channel is unmuted the socket will
> > be
> > >>> in a
> > >>> > > > > CLOSE_WAIT state though. So having an arbitrarily long muted
> > >>> duration
> > >>> > > may
> > >>> > > > > indeed cause problem.
> > >>> > > > >
> > >>> > > > > Thanks,
> > >>> > > > >
> > >>> > > > > Jiangjie (Becket) Qin
> > >>> > > > >
> > >>> > > > > On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <
> > becket.qin@gmail.com
> > >>> >
> > >>> > > wrote:
> > >>> > > > >
> > >>> > > > >> Hi Rajini,
> > >>> > > > >>
> > >>> > > > >> Thanks for the detail explanation. Please see the reply
> below:
> > >>> > > > >>
> > >>> > > > >> 2. Limiting the throttle time to connection.max.idle.ms on
> > the
> > >>> > broker
> > >>> > > > >> side is probably fine. However, clients may have a different
> > >>> > > > configuration
> > >>> > > > >> of connection.max.idle.ms and still reconnect before the
> > >>> throttle
> > >>> > > time
> > >>> > > > >> (which is the server side connection.max.idle.ms). It seems
> > >>> another
> > >>> > > > back
> > >>> > > > >> door for quota.
> > >>> > > > >>
> > >>> > > > >> 3. I agree we could just mute the server socket until
> > >>> > > > >> connection.max.idle.ms if the massive CLOSE_WAIT is a big
> > >>> issue.
> > >>> > This
> > >>> > > > >> helps guarantee only connection_rate *
> connection.max.idle.ms
> > >>> > sockets
> > >>> > > > >> will be in CLOSE_WAIT state. For cooperative clients,
> unmuting
> > >>> the
> > >>> > > > socket
> > >>> > > > >> will not have negative impact.
> > >>> > > > >>
> > >>> > > > >> 4. My concern for capping the throttle time to
> > >>> metrics.window.ms is
> > >>> > > > that
> > >>> > > > >> we will not be able to enforce quota effectively. It might
> be
> > >>> useful
> > >>> > > to
> > >>> > > > >> explain this with a real example we are trying to solve. We
> > >>> have a
> > >>> > > > >> MapReduce job pushing data to a Kafka cluster. The MapReduce
> > >>> job has
> > >>> > > > >> hundreds of producers and each of them sends a normal sized
> > >>> > > > ProduceRequest
> > >>> > > > >> (~2 MB) to each of the brokers in the cluster. Apparently
> the
> > >>> client
> > >>> > > id
> > >>> > > > >> will ran out of bytes quota pretty quickly, and the broker
> > >>> started
> > >>> > to
> > >>> > > > >> throttle the producers. The throttle time could actually be
> > >>> pretty
> > >>> > > long
> > >>> > > > >> (e.g. a few minute). At that point, request queue time on
> the
> > >>> > brokers
> > >>> > > > was
> > >>> > > > >> around 30 seconds. After that, a bunch of producer hit
> > >>> > > > request.timeout.ms
> > >>> > > > >> and reconnected and sent the next request again, which
> causes
> > >>> > another
> > >>> > > > spike
> > >>> > > > >> and a longer queue.
> > >>> > > > >>
> > >>> > > > >> In the above case, unless we set the quota window to be
> pretty
> > >>> big,
> > >>> > we
> > >>> > > > >> will not be able to enforce the quota. And if we set the
> > window
> > >>> size
> > >>> > > to
> > >>> > > > a
> > >>> > > > >> large value, the request might be throttled for longer than
> > >>> > > > >> connection.max.idle.ms.
> > >>> > > > >>
> > >>> > > > >> > We need a solution to improve flow control for
> well-behaved
> > >>> > clients
> > >>> > > > >> > which currently rely entirely on broker's throttling. The
> > KIP
> > >>> > > > addresses
> > >>> > > > >> > this using co-operative clients that sleep for an
> unbounded
> > >>> > throttle
> > >>> > > > >> time.
> > >>> > > > >> > I feel this is not ideal since the result is traffic with
> a
> > >>> lot of
> > >>> > > > >> spikes.
> > >>> > > > >> > Feedback from brokers to enable flow control in the client
> > is
> > >>> a
> > >>> > good
> > >>> > > > >> idea,
> > >>> > > > >> > but clients with excessive throttle times should really
> have
> > >>> been
> > >>> > > > >> > configured with smaller batch sizes.
> > >>> > > > >>
> > >>> > > > >> This is not really about a single producer with large size,
> it
> > >>> is a
> > >>> > > lot
> > >>> > > > >> of small producers talking to the client at the same time.
> > >>> Reducing
> > >>> > > the
> > >>> > > > >> batch size does not help much here. Also note that after the
> > >>> spike
> > >>> > > > >> traffic at the very beginning, the throttle time of the
> > >>> > > ProduceRequests
> > >>> > > > >> processed later are actually going to be increasing (for
> > >>> example,
> > >>> > the
> > >>> > > > first
> > >>> > > > >> throttled request will be throttled for 1 second, the second
> > >>> > throttled
> > >>> > > > >> request will be throttled for 10 sec, etc.). Due to the
> > throttle
> > >>> > time
> > >>> > > > >> variation, if every producer honors the throttle time, there
> > >>> will
> > >>> > not
> > >>> > > > be a
> > >>> > > > >> next spike after the first produce.
> > >>> > > > >>
> > >>> > > > >> > We need a solution to enforce smaller quotas to protect
> the
> > >>> broker
> > >>> > > > >> > from misbehaving clients. The KIP addresses this by muting
> > >>> > channels
> > >>> > > > for
> > >>> > > > >> an
> > >>> > > > >> > unbounded time. This introduces problems of channels in
> > >>> > CLOSE_WAIT.
> > >>> > > > And
> > >>> > > > >> > doesn't really solve all issues with misbehaving clients
> > >>> since new
> > >>> > > > >> > connections can be created to bypass quotas.
> > >>> > > > >>
> > >>> > > > >> Our current quota only protects cooperating clients because
> > our
> > >>> > quota
> > >>> > > is
> > >>> > > > >> really throttling the NEXT request after process a request
> > even
> > >>> if
> > >>> > > this
> > >>> > > > >> request itself has already violated quota. The misbehaving
> > >>> client
> > >>> > are
> > >>> > > > not
> > >>> > > > >> protected at all with the current quota mechanism. Like you
> > >>> > > mentioned, a
> > >>> > > > >> connection quota is required. We have been discussing about
> > >>> this at
> > >>> > > > >> LinkedIn for some time. Doing it right requires some major
> > >>> changes
> > >>> > > such
> > >>> > > > as
> > >>> > > > >> partially reading a request to identify the client id at
> > network
> > >>> > level
> > >>> > > > and
> > >>> > > > >> disconnect misbehaving clients.
> > >>> > > > >>
> > >>> > > > >> While handling misbehaving clients is important, we are not
> > >>> trying
> > >>> > to
> > >>> > > > >> address that in this KIP. This KIP is trying to improve the
> > >>> > > > communication
> > >>> > > > >> with good clients. Muting the channel is more of a migration
> > >>> plan so
> > >>> > > > that
> > >>> > > > >> we don't have regression on the old innocent (but good)
> > clients.
> > >>> > > > >>
> > >>> > > > >> Thanks,
> > >>> > > > >>
> > >>> > > > >> Jiangjie (Becket) Qin
> > >>> > > > >>
> > >>> > > > >>
> > >>> > > > >> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <ju...@confluent.io>
> > >>> wrote:
> > >>> > > > >>
> > >>> > > > >>> Hi, Jiangjie,
> > >>> > > > >>>
> > >>> > > > >>> 3. If a client closes a socket connection, typically the
> > server
> > >>> > only
> > >>> > > > >>> finds
> > >>> > > > >>> this out by reading from the channel and getting a negative
> > >>> size
> > >>> > > during
> > >>> > > > >>> the
> > >>> > > > >>> read. So, if a channel is muted by the server, the server
> > >>> won't be
> > >>> > > able
> > >>> > > > >>> to
> > >>> > > > >>> detect the closing of the connection by the client after
> the
> > >>> socket
> > >>> > > is
> > >>> > > > >>> unmuted. That's probably what Rajini wants to convey.
> > >>> > > > >>>
> > >>> > > > >>> Thanks,
> > >>> > > > >>>
> > >>> > > > >>> Jun
> > >>> > > > >>>
> > >>> > > > >>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <
> > >>> becket.qin@gmail.com>
> > >>> > > > wrote:
> > >>> > > > >>>
> > >>> > > > >>> > Thanks Rajini.
> > >>> > > > >>> >
> > >>> > > > >>> > 1. Good point. We do need to bump up the protocol version
> > so
> > >>> that
> > >>> > > the
> > >>> > > > >>> new
> > >>> > > > >>> > clients do not wait for another throttle time when they
> are
> > >>> > talking
> > >>> > > > to
> > >>> > > > >>> old
> > >>> > > > >>> > brokers. I'll update the KIP.
> > >>> > > > >>> >
> > >>> > > > >>> > 2. That is true. But the client was not supposed to send
> > >>> request
> > >>> > to
> > >>> > > > the
> > >>> > > > >>> > broker during that period anyways. So detecting the
> broker
> > >>> > failure
> > >>> > > > >>> later
> > >>> > > > >>> > seems fine?
> > >>> > > > >>> >
> > >>> > > > >>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as
> > the
> > >>> > > current
> > >>> > > > >>> state?
> > >>> > > > >>> > Currently the broker will still mute the socket until it
> > >>> sends
> > >>> > the
> > >>> > > > >>> response
> > >>> > > > >>> > back. If the clients disconnect while they are being
> > >>> throttled,
> > >>> > the
> > >>> > > > >>> closed
> > >>> > > > >>> > socket will not be detected until the throttle time has
> > >>> passed.
> > >>> > > > >>> >
> > >>> > > > >>> > Jun also suggested to bound the time by
> > >>> metric.sample.window.ms
> > >>> > in
> > >>> > > > the
> > >>> > > > >>> > ticket. I am not sure about the bound on throttle time.
> It
> > >>> seems
> > >>> > a
> > >>> > > > >>> little
> > >>> > > > >>> > difficult to come up with a good bound. If the bound is
> too
> > >>> > large,
> > >>> > > it
> > >>> > > > >>> does
> > >>> > > > >>> > not really help solve the various timeout issue we may
> > face.
> > >>> If
> > >>> > the
> > >>> > > > >>> bound
> > >>> > > > >>> > is too low, the quota is essentially not honored. We may
> > >>> > > potentially
> > >>> > > > >>> treat
> > >>> > > > >>> > different requests differently, but that seems too
> > >>> complicated
> > >>> > and
> > >>> > > > >>> error
> > >>> > > > >>> > prone.
> > >>> > > > >>> >
> > >>> > > > >>> > IMO, the key improvement we want to make is to tell the
> > >>> clients
> > >>> > how
> > >>> > > > >>> long
> > >>> > > > >>> > they will be throttled so the clients knows what happened
> > so
> > >>> they
> > >>> > > can
> > >>> > > > >>> act
> > >>> > > > >>> > accordingly instead of waiting naively. Muting the socket
> > on
> > >>> the
> > >>> > > > broker
> > >>> > > > >>> > side is just in case of non-cooperating clients. For the
> > >>> existing
> > >>> > > > >>> clients,
> > >>> > > > >>> > it seems this does not have much impact compare with what
> > we
> > >>> have
> > >>> > > > now.
> > >>> > > > >>> >
> > >>> > > > >>> > Thanks,
> > >>> > > > >>> >
> > >>> > > > >>> > Jiangjie (Becket) Qin
> > >>> > > > >>> >
> > >>> > > > >>> >
> > >>> > > > >>> >
> > >>> > > > >>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <
> > >>> > > > >>> rajinisivaram@gmail.com>
> > >>> > > > >>> > wrote:
> > >>> > > > >>> >
> > >>> > > > >>> > > Hi Becket,
> > >>> > > > >>> > >
> > >>> > > > >>> > > Thank you for the KIP. A few comments:
> > >>> > > > >>> > >
> > >>> > > > >>> > > 1.KIP says:  "*No public interface changes are needed.
> We
> > >>> only
> > >>> > > > >>> propose
> > >>> > > > >>> > > behavior change on the broker side.*"
> > >>> > > > >>> > >
> > >>> > > > >>> > > But from the proposed changes, it sounds like clients
> > will
> > >>> be
> > >>> > > > >>> updated to
> > >>> > > > >>> > > wait for throttle-time before sending next response,
> and
> > >>> also
> > >>> > not
> > >>> > > > >>> handle
> > >>> > > > >>> > > idle disconnections during that time. Doesn't that mean
> > >>> that
> > >>> > > > clients
> > >>> > > > >>> need
> > >>> > > > >>> > > to know that the broker has sent the response before
> > >>> > throttling,
> > >>> > > > >>> > requiring
> > >>> > > > >>> > > protocol/version change?
> > >>> > > > >>> > >
> > >>> > > > >>> > >
> > >>> > > > >>> > > 2. At the moment, broker failures are detected by
> clients
> > >>> (and
> > >>> > > vice
> > >>> > > > >>> > versa)
> > >>> > > > >>> > > within connections.max.idle.ms. By removing this check
> > >>> for an
> > >>> > > > >>> unlimited
> > >>> > > > >>> > > throttle time, failure detection could be delayed.
> > >>> > > > >>> > >
> > >>> > > > >>> > >
> > >>> > > > >>> > > 3. KIP says  "*Since this subsequent request is not
> > >>> actually
> > >>> > > > handled
> > >>> > > > >>> > until
> > >>> > > > >>> > > the broker unmutes the channel, the client can hit
> > >>> > > > >>> request.timeout.ms
> > >>> > > > >>> > > <http://request.timeout.ms> and reconnect. However,
> this
> > >>> is no
> > >>> > > > worse
> > >>> > > > >>> > than
> > >>> > > > >>> > > the current state.*"
> > >>> > > > >>> > >
> > >>> > > > >>> > > I think this could be worse than the current state
> > because
> > >>> > broker
> > >>> > > > >>> doesn't
> > >>> > > > >>> > > detect and close the channel for an unlimited throttle
> > >>> time,
> > >>> > > while
> > >>> > > > >>> new
> > >>> > > > >>> > > connections will get accepted. As a result, lot of
> > >>> connections
> > >>> > > > could
> > >>> > > > >>> be
> > >>> > > > >>> > in
> > >>> > > > >>> > > CLOSE_WAIT state when throttle time is high.
> > >>> > > > >>> > >
> > >>> > > > >>> > >
> > >>> > > > >>> > > Perhaps it is better to combine this KIP with a bound
> on
> > >>> > throttle
> > >>> > > > >>> time?
> > >>> > > > >>> > >
> > >>> > > > >>> > >
> > >>> > > > >>> > > Regards,
> > >>> > > > >>> > >
> > >>> > > > >>> > >
> > >>> > > > >>> > > Rajini
> > >>> > > > >>> > >
> > >>> > > > >>> > >
> > >>> > > > >>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <
> > >>> > becket.qin@gmail.com
> > >>> > > >
> > >>> > > > >>> wrote:
> > >>> > > > >>> > >
> > >>> > > > >>> > > > Thanks for the comment, Jun,
> > >>> > > > >>> > > >
> > >>> > > > >>> > > > 1. Yes, you are right. This could also happen with
> the
> > >>> > current
> > >>> > > > >>> quota
> > >>> > > > >>> > > > mechanism because we are essentially muting the
> socket
> > >>> during
> > >>> > > > >>> throttle
> > >>> > > > >>> > > > time. There might be two ways to solve this.
> > >>> > > > >>> > > > A) use another socket to send heartbeat.
> > >>> > > > >>> > > > B) let the GroupCoordinator know that the client will
> > not
> > >>> > > > >>> heartbeat for
> > >>> > > > >>> > > > some time.
> > >>> > > > >>> > > > It seems the first solution is cleaner.
> > >>> > > > >>> > > >
> > >>> > > > >>> > > > 2. For consumer it seems returning an empty response
> > is a
> > >>> > > better
> > >>> > > > >>> > option.
> > >>> > > > >>> > > In
> > >>> > > > >>> > > > the producer case, if there is a spike in traffic.
> The
> > >>> > brokers
> > >>> > > > >>> will see
> > >>> > > > >>> > > > queued up requests, but that is hard to avoid unless
> we
> > >>> have
> > >>> > > > >>> connection
> > >>> > > > >>> > > > level quota, which is a bigger change and may be
> easier
> > >>> to
> > >>> > > > discuss
> > >>> > > > >>> it
> > >>> > > > >>> > in
> > >>> > > > >>> > > a
> > >>> > > > >>> > > > separate KIP.
> > >>> > > > >>> > > >
> > >>> > > > >>> > > > Thanks,
> > >>> > > > >>> > > >
> > >>> > > > >>> > > > Jiangjie (Becket) Qin
> > >>> > > > >>> > > >
> > >>> > > > >>> > > >
> > >>> > > > >>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <
> > >>> jun@confluent.io>
> > >>> > > > wrote:
> > >>> > > > >>> > > >
> > >>> > > > >>> > > > > Hi, Jiangjie,
> > >>> > > > >>> > > > >
> > >>> > > > >>> > > > > Thanks for bringing this up. A couple of quick
> > >>> thoughts.
> > >>> > > > >>> > > > >
> > >>> > > > >>> > > > > 1. If the throttle time is large, what can happen
> is
> > >>> that a
> > >>> > > > >>> consumer
> > >>> > > > >>> > > > won't
> > >>> > > > >>> > > > > be able to heart beat to the group coordinator
> > frequent
> > >>> > > enough.
> > >>> > > > >>> In
> > >>> > > > >>> > that
> > >>> > > > >>> > > > > case, even with this KIP, it seems there could be
> > >>> frequent
> > >>> > > > >>> consumer
> > >>> > > > >>> > > group
> > >>> > > > >>> > > > > rebalances.
> > >>> > > > >>> > > > >
> > >>> > > > >>> > > > > 2. If we return a response immediately, for the
> > >>> consumer,
> > >>> > do
> > >>> > > we
> > >>> > > > >>> > return
> > >>> > > > >>> > > > the
> > >>> > > > >>> > > > > requested data or an empty response? If we do the
> > >>> former,
> > >>> > it
> > >>> > > > may
> > >>> > > > >>> not
> > >>> > > > >>> > > > > protect against the case when there are multiple
> > >>> consumer
> > >>> > > > >>> instances
> > >>> > > > >>> > > > > associated with the same user/clientid.
> > >>> > > > >>> > > > >
> > >>> > > > >>> > > > > Jun
> > >>> > > > >>> > > > >
> > >>> > > > >>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <
> > >>> > > > becket.qin@gmail.com
> > >>> > > > >>> >
> > >>> > > > >>> > > wrote:
> > >>> > > > >>> > > > >
> > >>> > > > >>> > > > > > Hi,
> > >>> > > > >>> > > > > >
> > >>> > > > >>> > > > > > We would like to start the discussion on KIP-219.
> > >>> > > > >>> > > > > >
> > >>> > > > >>> > > > > > The KIP tries to improve quota throttling time
> > >>> > > communication
> > >>> > > > >>> > between
> > >>> > > > >>> > > > > > brokers and clients to avoid clients timeout in
> > case
> > >>> of
> > >>> > > long
> > >>> > > > >>> > > throttling
> > >>> > > > >>> > > > > > time.
> > >>> > > > >>> > > > > >
> > >>> > > > >>> > > > > > The KIP link is following:
> > >>> > > > >>> > > > > > https://cwiki.apache.org/confl
> > >>> uence/display/KAFKA/KIP-
> > >>> > > > >>> > > > > 219+-+Improve+quota+
> > >>> > > > >>> > > > > > communication
> > >>> > > > >>> > > > > >
> > >>> > > > >>> > > > > > Comments are welcome.
> > >>> > > > >>> > > > > >
> > >>> > > > >>> > > > > > Thanks,
> > >>> > > > >>> > > > > >
> > >>> > > > >>> > > > > > Jiangjie (Becket) Qin
> > >>> > > > >>> > > > > >
> > >>> > > > >>> > > > >
> > >>> > > > >>> > > >
> > >>> > > > >>> > >
> > >>> > > > >>> >
> > >>> > > > >>>
> > >>> > > > >>
> > >>> > > > >>
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> > >
> >
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Ismael Juma <is...@juma.me.uk>.
Hi Jon,

Not sure about this approach. It's worth mentioning this in the vote thread
as well so that the people who voted originally have a chance to comment.
Also, we should really get input from developers of Kafka clients
(librdkafka, kafka-python, etc.) for this KIP.

Ismael

On Thu, Apr 5, 2018 at 2:50 PM, Jonghyun Lee <jo...@gmail.com> wrote:

> Hi,
>
> I have been implementing KIP-219. I discussed the interface changes with
> Becket Qin and Dong Lin, and we decided to bump up the protocol version of
> ApiVersionsRequest and ApiVersionsResponse only, instead of bumping up all
> requests/responses that may be throttled, to indicate clients whether or
> not brokers perform throttling before sending out responses. We think this
> is sufficient given that network client exchanges
> ApiVersionsRequest/Response with each broker when establishing connection
> to it and thus it can detect the broker's throttling behavior just by
> examining the ApiVersionsResponse version.
>
> Please respond to this e-mail if you have any questions or concerns.
>
> Thanks,
> Jon Lee
>
>
> On Thu, Apr 5, 2018 at 2:29 PM, Becket Qin <be...@gmail.com> wrote:
>
> >
> >
> > On Thu, Nov 16, 2017 at 3:49 PM, Becket Qin <be...@gmail.com>
> wrote:
> >
> >> Thanks Rajini,
> >>
> >> I updated the KIP wiki to clarify the scope of the KIP. To summarize,
> the
> >> current quota has a few caveats:
> >> 1. The brokers are only throttling the NEXT request even if the current
> >> request is already violating quota. So the broker will always process at
> >> least one request from the client.
> >> 2. The brokers are not able to know the client id until they fully read
> >> the request out of the sockets even if that client is being throttled.
> >> 3. The brokers are not communicating to the clients promptly, so the
> >> clients have to blindly wait and sometimes times out unnecessarily.
> >>
> >> This KIP only tries to address 3 but not 1 and 2 because A) those two
> >> issues are sort of orthogonal to 3 and B) the solution to 1 and 2 are
> much
> >> more complicated and worth a separate discussion.
> >>
> >> I'll wait till tomorrow and start a voting thread if there are further
> >> concerns raised about the KIP.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Thu, Nov 16, 2017 at 11:47 AM, Rajini Sivaram <
> rajinisivaram@gmail.com
> >> > wrote:
> >>
> >>> Hi Becket,
> >>>
> >>> The current user quota doesn't solve the problem. But I was thinking
> that
> >>> if we could ensure we don't read more from the network than the quota
> >>> allows, we may be able to fix the issue in a different way (throttling
> >>> all
> >>> connections, each for a limited time prior to reading large requests).
> >>> But
> >>> it would be more complex (and even more messy for client-id quotas),
> so I
> >>> can understand why the solution you proposed in the KIP makes sense for
> >>> the
> >>> scenario that you described.
> >>>
> >>> Regards,
> >>>
> >>> Rajini
> >>>
> >>> On Tue, Nov 14, 2017 at 11:30 PM, Becket Qin <be...@gmail.com>
> >>> wrote:
> >>>
> >>> > Hi Rajini,
> >>> >
> >>> > We are using SSL so we could use user quota. But I am not sure if
> that
> >>> > would solve the problem. The key issue in our case is that each
> broker
> >>> can
> >>> > only handle ~300 MB/s of incoming bytes, but the MapReduce job is
> >>> trying to
> >>> > push 1-2 GB/s, unless we can throttle the clients to 300 MB/s, the
> >>> broker
> >>> > cannot sustain. In order to do that, we need to be able to throttle
> >>> > requests for more than request timeout, potentially a few minutes. It
> >>> seems
> >>> > neither user quota nor limited throttle time can achieve this.
> >>> >
> >>> > Thanks,
> >>> >
> >>> > Jiangjie (Becket) Qin
> >>> >
> >>> > On Tue, Nov 14, 2017 at 7:44 AM, Rajini Sivaram <
> >>> rajinisivaram@gmail.com>
> >>> > wrote:
> >>> >
> >>> > > Hi Becket,
> >>> > >
> >>> > > For the specific scenario that you described, would it be possible
> >>> to use
> >>> > > user quotas rather than client-id quotas? With user quotas, perhaps
> >>> we
> >>> > can
> >>> > > throttle more easily before reading requests as well (as you
> >>> mentioned,
> >>> > the
> >>> > > difficulty with client-id quota is that we have to read partial
> >>> requests
> >>> > > and handle client-ids at network layer making that a much bigger
> >>> change).
> >>> > > If your clients are using SASL/SSL, I was wondering whether a
> >>> solution
> >>> > that
> >>> > > improves user quotas and limits throttle time would work for you.
> >>> > >
> >>> > > Regards,
> >>> > >
> >>> > > Rajini
> >>> > >
> >>> > >
> >>> > >
> >>> > > On Thu, Nov 9, 2017 at 12:59 AM, Becket Qin <be...@gmail.com>
> >>> > wrote:
> >>> > >
> >>> > > > Since we will bump up the wire request version, another option is
> >>> for
> >>> > > > clients that are sending old request versions the broker can just
> >>> keep
> >>> > > the
> >>> > > > current behavior. For clients sending the new request versions,
> the
> >>> > > broker
> >>> > > > can respond then mute the channel as described in the KIP wiki.
> In
> >>> this
> >>> > > > case, muting the channel is mostly for protection purpose. A
> >>> correctly
> >>> > > > implemented client should back off for throttle time before
> >>> sending the
> >>> > > > next request. The downside is that the broker needs to keep both
> >>> logic
> >>> > > and
> >>> > > > it seems not gaining much benefit. So personally I prefer to just
> >>> mute
> >>> > > the
> >>> > > > channel. But I am open to different opinions.
> >>> > > >
> >>> > > > Thanks,
> >>> > > >
> >>> > > > Jiangjie (Becket) Qin
> >>> > > >
> >>> > > > On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <becket.qin@gmail.com
> >
> >>> > wrote:
> >>> > > >
> >>> > > > > Hi Jun,
> >>> > > > >
> >>> > > > > Hmm, even if a connection is closed by the client when the
> >>> channel is
> >>> > > > > muted. After the channel is unmuted, it seems Selector.select()
> >>> will
> >>> > > > detect
> >>> > > > > this and close the socket.
> >>> > > > > It is true that before the channel is unmuted the socket will
> be
> >>> in a
> >>> > > > > CLOSE_WAIT state though. So having an arbitrarily long muted
> >>> duration
> >>> > > may
> >>> > > > > indeed cause problem.
> >>> > > > >
> >>> > > > > Thanks,
> >>> > > > >
> >>> > > > > Jiangjie (Becket) Qin
> >>> > > > >
> >>> > > > > On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <
> becket.qin@gmail.com
> >>> >
> >>> > > wrote:
> >>> > > > >
> >>> > > > >> Hi Rajini,
> >>> > > > >>
> >>> > > > >> Thanks for the detail explanation. Please see the reply below:
> >>> > > > >>
> >>> > > > >> 2. Limiting the throttle time to connection.max.idle.ms on
> the
> >>> > broker
> >>> > > > >> side is probably fine. However, clients may have a different
> >>> > > > configuration
> >>> > > > >> of connection.max.idle.ms and still reconnect before the
> >>> throttle
> >>> > > time
> >>> > > > >> (which is the server side connection.max.idle.ms). It seems
> >>> another
> >>> > > > back
> >>> > > > >> door for quota.
> >>> > > > >>
> >>> > > > >> 3. I agree we could just mute the server socket until
> >>> > > > >> connection.max.idle.ms if the massive CLOSE_WAIT is a big
> >>> issue.
> >>> > This
> >>> > > > >> helps guarantee only connection_rate * connection.max.idle.ms
> >>> > sockets
> >>> > > > >> will be in CLOSE_WAIT state. For cooperative clients, unmuting
> >>> the
> >>> > > > socket
> >>> > > > >> will not have negative impact.
> >>> > > > >>
> >>> > > > >> 4. My concern for capping the throttle time to
> >>> metrics.window.ms is
> >>> > > > that
> >>> > > > >> we will not be able to enforce quota effectively. It might be
> >>> useful
> >>> > > to
> >>> > > > >> explain this with a real example we are trying to solve. We
> >>> have a
> >>> > > > >> MapReduce job pushing data to a Kafka cluster. The MapReduce
> >>> job has
> >>> > > > >> hundreds of producers and each of them sends a normal sized
> >>> > > > ProduceRequest
> >>> > > > >> (~2 MB) to each of the brokers in the cluster. Apparently the
> >>> client
> >>> > > id
> >>> > > > >> will ran out of bytes quota pretty quickly, and the broker
> >>> started
> >>> > to
> >>> > > > >> throttle the producers. The throttle time could actually be
> >>> pretty
> >>> > > long
> >>> > > > >> (e.g. a few minute). At that point, request queue time on the
> >>> > brokers
> >>> > > > was
> >>> > > > >> around 30 seconds. After that, a bunch of producer hit
> >>> > > > request.timeout.ms
> >>> > > > >> and reconnected and sent the next request again, which causes
> >>> > another
> >>> > > > spike
> >>> > > > >> and a longer queue.
> >>> > > > >>
> >>> > > > >> In the above case, unless we set the quota window to be pretty
> >>> big,
> >>> > we
> >>> > > > >> will not be able to enforce the quota. And if we set the
> window
> >>> size
> >>> > > to
> >>> > > > a
> >>> > > > >> large value, the request might be throttled for longer than
> >>> > > > >> connection.max.idle.ms.
> >>> > > > >>
> >>> > > > >> > We need a solution to improve flow control for well-behaved
> >>> > clients
> >>> > > > >> > which currently rely entirely on broker's throttling. The
> KIP
> >>> > > > addresses
> >>> > > > >> > this using co-operative clients that sleep for an unbounded
> >>> > throttle
> >>> > > > >> time.
> >>> > > > >> > I feel this is not ideal since the result is traffic with a
> >>> lot of
> >>> > > > >> spikes.
> >>> > > > >> > Feedback from brokers to enable flow control in the client
> is
> >>> a
> >>> > good
> >>> > > > >> idea,
> >>> > > > >> > but clients with excessive throttle times should really have
> >>> been
> >>> > > > >> > configured with smaller batch sizes.
> >>> > > > >>
> >>> > > > >> This is not really about a single producer with large size, it
> >>> is a
> >>> > > lot
> >>> > > > >> of small producers talking to the client at the same time.
> >>> Reducing
> >>> > > the
> >>> > > > >> batch size does not help much here. Also note that after the
> >>> spike
> >>> > > > >> traffic at the very beginning, the throttle time of the
> >>> > > ProduceRequests
> >>> > > > >> processed later are actually going to be increasing (for
> >>> example,
> >>> > the
> >>> > > > first
> >>> > > > >> throttled request will be throttled for 1 second, the second
> >>> > throttled
> >>> > > > >> request will be throttled for 10 sec, etc.). Due to the
> throttle
> >>> > time
> >>> > > > >> variation, if every producer honors the throttle time, there
> >>> will
> >>> > not
> >>> > > > be a
> >>> > > > >> next spike after the first produce.
> >>> > > > >>
> >>> > > > >> > We need a solution to enforce smaller quotas to protect the
> >>> broker
> >>> > > > >> > from misbehaving clients. The KIP addresses this by muting
> >>> > channels
> >>> > > > for
> >>> > > > >> an
> >>> > > > >> > unbounded time. This introduces problems of channels in
> >>> > CLOSE_WAIT.
> >>> > > > And
> >>> > > > >> > doesn't really solve all issues with misbehaving clients
> >>> since new
> >>> > > > >> > connections can be created to bypass quotas.
> >>> > > > >>
> >>> > > > >> Our current quota only protects cooperating clients because
> our
> >>> > quota
> >>> > > is
> >>> > > > >> really throttling the NEXT request after process a request
> even
> >>> if
> >>> > > this
> >>> > > > >> request itself has already violated quota. The misbehaving
> >>> client
> >>> > are
> >>> > > > not
> >>> > > > >> protected at all with the current quota mechanism. Like you
> >>> > > mentioned, a
> >>> > > > >> connection quota is required. We have been discussing about
> >>> this at
> >>> > > > >> LinkedIn for some time. Doing it right requires some major
> >>> changes
> >>> > > such
> >>> > > > as
> >>> > > > >> partially reading a request to identify the client id at
> network
> >>> > level
> >>> > > > and
> >>> > > > >> disconnect misbehaving clients.
> >>> > > > >>
> >>> > > > >> While handling misbehaving clients is important, we are not
> >>> trying
> >>> > to
> >>> > > > >> address that in this KIP. This KIP is trying to improve the
> >>> > > > communication
> >>> > > > >> with good clients. Muting the channel is more of a migration
> >>> plan so
> >>> > > > that
> >>> > > > >> we don't have regression on the old innocent (but good)
> clients.
> >>> > > > >>
> >>> > > > >> Thanks,
> >>> > > > >>
> >>> > > > >> Jiangjie (Becket) Qin
> >>> > > > >>
> >>> > > > >>
> >>> > > > >> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <ju...@confluent.io>
> >>> wrote:
> >>> > > > >>
> >>> > > > >>> Hi, Jiangjie,
> >>> > > > >>>
> >>> > > > >>> 3. If a client closes a socket connection, typically the
> server
> >>> > only
> >>> > > > >>> finds
> >>> > > > >>> this out by reading from the channel and getting a negative
> >>> size
> >>> > > during
> >>> > > > >>> the
> >>> > > > >>> read. So, if a channel is muted by the server, the server
> >>> won't be
> >>> > > able
> >>> > > > >>> to
> >>> > > > >>> detect the closing of the connection by the client after the
> >>> socket
> >>> > > is
> >>> > > > >>> unmuted. That's probably what Rajini wants to convey.
> >>> > > > >>>
> >>> > > > >>> Thanks,
> >>> > > > >>>
> >>> > > > >>> Jun
> >>> > > > >>>
> >>> > > > >>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <
> >>> becket.qin@gmail.com>
> >>> > > > wrote:
> >>> > > > >>>
> >>> > > > >>> > Thanks Rajini.
> >>> > > > >>> >
> >>> > > > >>> > 1. Good point. We do need to bump up the protocol version
> so
> >>> that
> >>> > > the
> >>> > > > >>> new
> >>> > > > >>> > clients do not wait for another throttle time when they are
> >>> > talking
> >>> > > > to
> >>> > > > >>> old
> >>> > > > >>> > brokers. I'll update the KIP.
> >>> > > > >>> >
> >>> > > > >>> > 2. That is true. But the client was not supposed to send
> >>> request
> >>> > to
> >>> > > > the
> >>> > > > >>> > broker during that period anyways. So detecting the broker
> >>> > failure
> >>> > > > >>> later
> >>> > > > >>> > seems fine?
> >>> > > > >>> >
> >>> > > > >>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as
> the
> >>> > > current
> >>> > > > >>> state?
> >>> > > > >>> > Currently the broker will still mute the socket until it
> >>> sends
> >>> > the
> >>> > > > >>> response
> >>> > > > >>> > back. If the clients disconnect while they are being
> >>> throttled,
> >>> > the
> >>> > > > >>> closed
> >>> > > > >>> > socket will not be detected until the throttle time has
> >>> passed.
> >>> > > > >>> >
> >>> > > > >>> > Jun also suggested to bound the time by
> >>> metric.sample.window.ms
> >>> > in
> >>> > > > the
> >>> > > > >>> > ticket. I am not sure about the bound on throttle time. It
> >>> seems
> >>> > a
> >>> > > > >>> little
> >>> > > > >>> > difficult to come up with a good bound. If the bound is too
> >>> > large,
> >>> > > it
> >>> > > > >>> does
> >>> > > > >>> > not really help solve the various timeout issue we may
> face.
> >>> If
> >>> > the
> >>> > > > >>> bound
> >>> > > > >>> > is too low, the quota is essentially not honored. We may
> >>> > > potentially
> >>> > > > >>> treat
> >>> > > > >>> > different requests differently, but that seems too
> >>> complicated
> >>> > and
> >>> > > > >>> error
> >>> > > > >>> > prone.
> >>> > > > >>> >
> >>> > > > >>> > IMO, the key improvement we want to make is to tell the
> >>> clients
> >>> > how
> >>> > > > >>> long
> >>> > > > >>> > they will be throttled so the clients knows what happened
> so
> >>> they
> >>> > > can
> >>> > > > >>> act
> >>> > > > >>> > accordingly instead of waiting naively. Muting the socket
> on
> >>> the
> >>> > > > broker
> >>> > > > >>> > side is just in case of non-cooperating clients. For the
> >>> existing
> >>> > > > >>> clients,
> >>> > > > >>> > it seems this does not have much impact compare with what
> we
> >>> have
> >>> > > > now.
> >>> > > > >>> >
> >>> > > > >>> > Thanks,
> >>> > > > >>> >
> >>> > > > >>> > Jiangjie (Becket) Qin
> >>> > > > >>> >
> >>> > > > >>> >
> >>> > > > >>> >
> >>> > > > >>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <
> >>> > > > >>> rajinisivaram@gmail.com>
> >>> > > > >>> > wrote:
> >>> > > > >>> >
> >>> > > > >>> > > Hi Becket,
> >>> > > > >>> > >
> >>> > > > >>> > > Thank you for the KIP. A few comments:
> >>> > > > >>> > >
> >>> > > > >>> > > 1.KIP says:  "*No public interface changes are needed. We
> >>> only
> >>> > > > >>> propose
> >>> > > > >>> > > behavior change on the broker side.*"
> >>> > > > >>> > >
> >>> > > > >>> > > But from the proposed changes, it sounds like clients
> will
> >>> be
> >>> > > > >>> updated to
> >>> > > > >>> > > wait for throttle-time before sending next response, and
> >>> also
> >>> > not
> >>> > > > >>> handle
> >>> > > > >>> > > idle disconnections during that time. Doesn't that mean
> >>> that
> >>> > > > clients
> >>> > > > >>> need
> >>> > > > >>> > > to know that the broker has sent the response before
> >>> > throttling,
> >>> > > > >>> > requiring
> >>> > > > >>> > > protocol/version change?
> >>> > > > >>> > >
> >>> > > > >>> > >
> >>> > > > >>> > > 2. At the moment, broker failures are detected by clients
> >>> (and
> >>> > > vice
> >>> > > > >>> > versa)
> >>> > > > >>> > > within connections.max.idle.ms. By removing this check
> >>> for an
> >>> > > > >>> unlimited
> >>> > > > >>> > > throttle time, failure detection could be delayed.
> >>> > > > >>> > >
> >>> > > > >>> > >
> >>> > > > >>> > > 3. KIP says  "*Since this subsequent request is not
> >>> actually
> >>> > > > handled
> >>> > > > >>> > until
> >>> > > > >>> > > the broker unmutes the channel, the client can hit
> >>> > > > >>> request.timeout.ms
> >>> > > > >>> > > <http://request.timeout.ms> and reconnect. However, this
> >>> is no
> >>> > > > worse
> >>> > > > >>> > than
> >>> > > > >>> > > the current state.*"
> >>> > > > >>> > >
> >>> > > > >>> > > I think this could be worse than the current state
> because
> >>> > broker
> >>> > > > >>> doesn't
> >>> > > > >>> > > detect and close the channel for an unlimited throttle
> >>> time,
> >>> > > while
> >>> > > > >>> new
> >>> > > > >>> > > connections will get accepted. As a result, lot of
> >>> connections
> >>> > > > could
> >>> > > > >>> be
> >>> > > > >>> > in
> >>> > > > >>> > > CLOSE_WAIT state when throttle time is high.
> >>> > > > >>> > >
> >>> > > > >>> > >
> >>> > > > >>> > > Perhaps it is better to combine this KIP with a bound on
> >>> > throttle
> >>> > > > >>> time?
> >>> > > > >>> > >
> >>> > > > >>> > >
> >>> > > > >>> > > Regards,
> >>> > > > >>> > >
> >>> > > > >>> > >
> >>> > > > >>> > > Rajini
> >>> > > > >>> > >
> >>> > > > >>> > >
> >>> > > > >>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <
> >>> > becket.qin@gmail.com
> >>> > > >
> >>> > > > >>> wrote:
> >>> > > > >>> > >
> >>> > > > >>> > > > Thanks for the comment, Jun,
> >>> > > > >>> > > >
> >>> > > > >>> > > > 1. Yes, you are right. This could also happen with the
> >>> > current
> >>> > > > >>> quota
> >>> > > > >>> > > > mechanism because we are essentially muting the socket
> >>> during
> >>> > > > >>> throttle
> >>> > > > >>> > > > time. There might be two ways to solve this.
> >>> > > > >>> > > > A) use another socket to send heartbeat.
> >>> > > > >>> > > > B) let the GroupCoordinator know that the client will
> not
> >>> > > > >>> heartbeat for
> >>> > > > >>> > > > some time.
> >>> > > > >>> > > > It seems the first solution is cleaner.
> >>> > > > >>> > > >
> >>> > > > >>> > > > 2. For consumer it seems returning an empty response
> is a
> >>> > > better
> >>> > > > >>> > option.
> >>> > > > >>> > > In
> >>> > > > >>> > > > the producer case, if there is a spike in traffic. The
> >>> > brokers
> >>> > > > >>> will see
> >>> > > > >>> > > > queued up requests, but that is hard to avoid unless we
> >>> have
> >>> > > > >>> connection
> >>> > > > >>> > > > level quota, which is a bigger change and may be easier
> >>> to
> >>> > > > discuss
> >>> > > > >>> it
> >>> > > > >>> > in
> >>> > > > >>> > > a
> >>> > > > >>> > > > separate KIP.
> >>> > > > >>> > > >
> >>> > > > >>> > > > Thanks,
> >>> > > > >>> > > >
> >>> > > > >>> > > > Jiangjie (Becket) Qin
> >>> > > > >>> > > >
> >>> > > > >>> > > >
> >>> > > > >>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <
> >>> jun@confluent.io>
> >>> > > > wrote:
> >>> > > > >>> > > >
> >>> > > > >>> > > > > Hi, Jiangjie,
> >>> > > > >>> > > > >
> >>> > > > >>> > > > > Thanks for bringing this up. A couple of quick
> >>> thoughts.
> >>> > > > >>> > > > >
> >>> > > > >>> > > > > 1. If the throttle time is large, what can happen is
> >>> that a
> >>> > > > >>> consumer
> >>> > > > >>> > > > won't
> >>> > > > >>> > > > > be able to heart beat to the group coordinator
> frequent
> >>> > > enough.
> >>> > > > >>> In
> >>> > > > >>> > that
> >>> > > > >>> > > > > case, even with this KIP, it seems there could be
> >>> frequent
> >>> > > > >>> consumer
> >>> > > > >>> > > group
> >>> > > > >>> > > > > rebalances.
> >>> > > > >>> > > > >
> >>> > > > >>> > > > > 2. If we return a response immediately, for the
> >>> consumer,
> >>> > do
> >>> > > we
> >>> > > > >>> > return
> >>> > > > >>> > > > the
> >>> > > > >>> > > > > requested data or an empty response? If we do the
> >>> former,
> >>> > it
> >>> > > > may
> >>> > > > >>> not
> >>> > > > >>> > > > > protect against the case when there are multiple
> >>> consumer
> >>> > > > >>> instances
> >>> > > > >>> > > > > associated with the same user/clientid.
> >>> > > > >>> > > > >
> >>> > > > >>> > > > > Jun
> >>> > > > >>> > > > >
> >>> > > > >>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <
> >>> > > > becket.qin@gmail.com
> >>> > > > >>> >
> >>> > > > >>> > > wrote:
> >>> > > > >>> > > > >
> >>> > > > >>> > > > > > Hi,
> >>> > > > >>> > > > > >
> >>> > > > >>> > > > > > We would like to start the discussion on KIP-219.
> >>> > > > >>> > > > > >
> >>> > > > >>> > > > > > The KIP tries to improve quota throttling time
> >>> > > communication
> >>> > > > >>> > between
> >>> > > > >>> > > > > > brokers and clients to avoid clients timeout in
> case
> >>> of
> >>> > > long
> >>> > > > >>> > > throttling
> >>> > > > >>> > > > > > time.
> >>> > > > >>> > > > > >
> >>> > > > >>> > > > > > The KIP link is following:
> >>> > > > >>> > > > > > https://cwiki.apache.org/confl
> >>> uence/display/KAFKA/KIP-
> >>> > > > >>> > > > > 219+-+Improve+quota+
> >>> > > > >>> > > > > > communication
> >>> > > > >>> > > > > >
> >>> > > > >>> > > > > > Comments are welcome.
> >>> > > > >>> > > > > >
> >>> > > > >>> > > > > > Thanks,
> >>> > > > >>> > > > > >
> >>> > > > >>> > > > > > Jiangjie (Becket) Qin
> >>> > > > >>> > > > > >
> >>> > > > >>> > > > >
> >>> > > > >>> > > >
> >>> > > > >>> > >
> >>> > > > >>> >
> >>> > > > >>>
> >>> > > > >>
> >>> > > > >>
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Jonghyun Lee <jo...@gmail.com>.
Hi,

I have been implementing KIP-219. I discussed the interface changes with
Becket Qin and Dong Lin, and we decided to bump up the protocol version of
ApiVersionsRequest and ApiVersionsResponse only, instead of bumping up all
requests/responses that may be throttled, to indicate clients whether or
not brokers perform throttling before sending out responses. We think this
is sufficient given that network client exchanges
ApiVersionsRequest/Response with each broker when establishing connection
to it and thus it can detect the broker's throttling behavior just by
examining the ApiVersionsResponse version.

Please respond to this e-mail if you have any questions or concerns.

Thanks,
Jon Lee


On Thu, Apr 5, 2018 at 2:29 PM, Becket Qin <be...@gmail.com> wrote:

>
>
> On Thu, Nov 16, 2017 at 3:49 PM, Becket Qin <be...@gmail.com> wrote:
>
>> Thanks Rajini,
>>
>> I updated the KIP wiki to clarify the scope of the KIP. To summarize, the
>> current quota has a few caveats:
>> 1. The brokers are only throttling the NEXT request even if the current
>> request is already violating quota. So the broker will always process at
>> least one request from the client.
>> 2. The brokers are not able to know the client id until they fully read
>> the request out of the sockets even if that client is being throttled.
>> 3. The brokers are not communicating to the clients promptly, so the
>> clients have to blindly wait and sometimes times out unnecessarily.
>>
>> This KIP only tries to address 3 but not 1 and 2 because A) those two
>> issues are sort of orthogonal to 3 and B) the solution to 1 and 2 are much
>> more complicated and worth a separate discussion.
>>
>> I'll wait till tomorrow and start a voting thread if there are further
>> concerns raised about the KIP.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Thu, Nov 16, 2017 at 11:47 AM, Rajini Sivaram <rajinisivaram@gmail.com
>> > wrote:
>>
>>> Hi Becket,
>>>
>>> The current user quota doesn't solve the problem. But I was thinking that
>>> if we could ensure we don't read more from the network than the quota
>>> allows, we may be able to fix the issue in a different way (throttling
>>> all
>>> connections, each for a limited time prior to reading large requests).
>>> But
>>> it would be more complex (and even more messy for client-id quotas), so I
>>> can understand why the solution you proposed in the KIP makes sense for
>>> the
>>> scenario that you described.
>>>
>>> Regards,
>>>
>>> Rajini
>>>
>>> On Tue, Nov 14, 2017 at 11:30 PM, Becket Qin <be...@gmail.com>
>>> wrote:
>>>
>>> > Hi Rajini,
>>> >
>>> > We are using SSL so we could use user quota. But I am not sure if that
>>> > would solve the problem. The key issue in our case is that each broker
>>> can
>>> > only handle ~300 MB/s of incoming bytes, but the MapReduce job is
>>> trying to
>>> > push 1-2 GB/s, unless we can throttle the clients to 300 MB/s, the
>>> broker
>>> > cannot sustain. In order to do that, we need to be able to throttle
>>> > requests for more than request timeout, potentially a few minutes. It
>>> seems
>>> > neither user quota nor limited throttle time can achieve this.
>>> >
>>> > Thanks,
>>> >
>>> > Jiangjie (Becket) Qin
>>> >
>>> > On Tue, Nov 14, 2017 at 7:44 AM, Rajini Sivaram <
>>> rajinisivaram@gmail.com>
>>> > wrote:
>>> >
>>> > > Hi Becket,
>>> > >
>>> > > For the specific scenario that you described, would it be possible
>>> to use
>>> > > user quotas rather than client-id quotas? With user quotas, perhaps
>>> we
>>> > can
>>> > > throttle more easily before reading requests as well (as you
>>> mentioned,
>>> > the
>>> > > difficulty with client-id quota is that we have to read partial
>>> requests
>>> > > and handle client-ids at network layer making that a much bigger
>>> change).
>>> > > If your clients are using SASL/SSL, I was wondering whether a
>>> solution
>>> > that
>>> > > improves user quotas and limits throttle time would work for you.
>>> > >
>>> > > Regards,
>>> > >
>>> > > Rajini
>>> > >
>>> > >
>>> > >
>>> > > On Thu, Nov 9, 2017 at 12:59 AM, Becket Qin <be...@gmail.com>
>>> > wrote:
>>> > >
>>> > > > Since we will bump up the wire request version, another option is
>>> for
>>> > > > clients that are sending old request versions the broker can just
>>> keep
>>> > > the
>>> > > > current behavior. For clients sending the new request versions, the
>>> > > broker
>>> > > > can respond then mute the channel as described in the KIP wiki. In
>>> this
>>> > > > case, muting the channel is mostly for protection purpose. A
>>> correctly
>>> > > > implemented client should back off for throttle time before
>>> sending the
>>> > > > next request. The downside is that the broker needs to keep both
>>> logic
>>> > > and
>>> > > > it seems not gaining much benefit. So personally I prefer to just
>>> mute
>>> > > the
>>> > > > channel. But I am open to different opinions.
>>> > > >
>>> > > > Thanks,
>>> > > >
>>> > > > Jiangjie (Becket) Qin
>>> > > >
>>> > > > On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <be...@gmail.com>
>>> > wrote:
>>> > > >
>>> > > > > Hi Jun,
>>> > > > >
>>> > > > > Hmm, even if a connection is closed by the client when the
>>> channel is
>>> > > > > muted. After the channel is unmuted, it seems Selector.select()
>>> will
>>> > > > detect
>>> > > > > this and close the socket.
>>> > > > > It is true that before the channel is unmuted the socket will be
>>> in a
>>> > > > > CLOSE_WAIT state though. So having an arbitrarily long muted
>>> duration
>>> > > may
>>> > > > > indeed cause problem.
>>> > > > >
>>> > > > > Thanks,
>>> > > > >
>>> > > > > Jiangjie (Becket) Qin
>>> > > > >
>>> > > > > On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <becket.qin@gmail.com
>>> >
>>> > > wrote:
>>> > > > >
>>> > > > >> Hi Rajini,
>>> > > > >>
>>> > > > >> Thanks for the detail explanation. Please see the reply below:
>>> > > > >>
>>> > > > >> 2. Limiting the throttle time to connection.max.idle.ms on the
>>> > broker
>>> > > > >> side is probably fine. However, clients may have a different
>>> > > > configuration
>>> > > > >> of connection.max.idle.ms and still reconnect before the
>>> throttle
>>> > > time
>>> > > > >> (which is the server side connection.max.idle.ms). It seems
>>> another
>>> > > > back
>>> > > > >> door for quota.
>>> > > > >>
>>> > > > >> 3. I agree we could just mute the server socket until
>>> > > > >> connection.max.idle.ms if the massive CLOSE_WAIT is a big
>>> issue.
>>> > This
>>> > > > >> helps guarantee only connection_rate * connection.max.idle.ms
>>> > sockets
>>> > > > >> will be in CLOSE_WAIT state. For cooperative clients, unmuting
>>> the
>>> > > > socket
>>> > > > >> will not have negative impact.
>>> > > > >>
>>> > > > >> 4. My concern for capping the throttle time to
>>> metrics.window.ms is
>>> > > > that
>>> > > > >> we will not be able to enforce quota effectively. It might be
>>> useful
>>> > > to
>>> > > > >> explain this with a real example we are trying to solve. We
>>> have a
>>> > > > >> MapReduce job pushing data to a Kafka cluster. The MapReduce
>>> job has
>>> > > > >> hundreds of producers and each of them sends a normal sized
>>> > > > ProduceRequest
>>> > > > >> (~2 MB) to each of the brokers in the cluster. Apparently the
>>> client
>>> > > id
>>> > > > >> will ran out of bytes quota pretty quickly, and the broker
>>> started
>>> > to
>>> > > > >> throttle the producers. The throttle time could actually be
>>> pretty
>>> > > long
>>> > > > >> (e.g. a few minute). At that point, request queue time on the
>>> > brokers
>>> > > > was
>>> > > > >> around 30 seconds. After that, a bunch of producer hit
>>> > > > request.timeout.ms
>>> > > > >> and reconnected and sent the next request again, which causes
>>> > another
>>> > > > spike
>>> > > > >> and a longer queue.
>>> > > > >>
>>> > > > >> In the above case, unless we set the quota window to be pretty
>>> big,
>>> > we
>>> > > > >> will not be able to enforce the quota. And if we set the window
>>> size
>>> > > to
>>> > > > a
>>> > > > >> large value, the request might be throttled for longer than
>>> > > > >> connection.max.idle.ms.
>>> > > > >>
>>> > > > >> > We need a solution to improve flow control for well-behaved
>>> > clients
>>> > > > >> > which currently rely entirely on broker's throttling. The KIP
>>> > > > addresses
>>> > > > >> > this using co-operative clients that sleep for an unbounded
>>> > throttle
>>> > > > >> time.
>>> > > > >> > I feel this is not ideal since the result is traffic with a
>>> lot of
>>> > > > >> spikes.
>>> > > > >> > Feedback from brokers to enable flow control in the client is
>>> a
>>> > good
>>> > > > >> idea,
>>> > > > >> > but clients with excessive throttle times should really have
>>> been
>>> > > > >> > configured with smaller batch sizes.
>>> > > > >>
>>> > > > >> This is not really about a single producer with large size, it
>>> is a
>>> > > lot
>>> > > > >> of small producers talking to the client at the same time.
>>> Reducing
>>> > > the
>>> > > > >> batch size does not help much here. Also note that after the
>>> spike
>>> > > > >> traffic at the very beginning, the throttle time of the
>>> > > ProduceRequests
>>> > > > >> processed later are actually going to be increasing (for
>>> example,
>>> > the
>>> > > > first
>>> > > > >> throttled request will be throttled for 1 second, the second
>>> > throttled
>>> > > > >> request will be throttled for 10 sec, etc.). Due to the throttle
>>> > time
>>> > > > >> variation, if every producer honors the throttle time, there
>>> will
>>> > not
>>> > > > be a
>>> > > > >> next spike after the first produce.
>>> > > > >>
>>> > > > >> > We need a solution to enforce smaller quotas to protect the
>>> broker
>>> > > > >> > from misbehaving clients. The KIP addresses this by muting
>>> > channels
>>> > > > for
>>> > > > >> an
>>> > > > >> > unbounded time. This introduces problems of channels in
>>> > CLOSE_WAIT.
>>> > > > And
>>> > > > >> > doesn't really solve all issues with misbehaving clients
>>> since new
>>> > > > >> > connections can be created to bypass quotas.
>>> > > > >>
>>> > > > >> Our current quota only protects cooperating clients because our
>>> > quota
>>> > > is
>>> > > > >> really throttling the NEXT request after process a request even
>>> if
>>> > > this
>>> > > > >> request itself has already violated quota. The misbehaving
>>> client
>>> > are
>>> > > > not
>>> > > > >> protected at all with the current quota mechanism. Like you
>>> > > mentioned, a
>>> > > > >> connection quota is required. We have been discussing about
>>> this at
>>> > > > >> LinkedIn for some time. Doing it right requires some major
>>> changes
>>> > > such
>>> > > > as
>>> > > > >> partially reading a request to identify the client id at network
>>> > level
>>> > > > and
>>> > > > >> disconnect misbehaving clients.
>>> > > > >>
>>> > > > >> While handling misbehaving clients is important, we are not
>>> trying
>>> > to
>>> > > > >> address that in this KIP. This KIP is trying to improve the
>>> > > > communication
>>> > > > >> with good clients. Muting the channel is more of a migration
>>> plan so
>>> > > > that
>>> > > > >> we don't have regression on the old innocent (but good) clients.
>>> > > > >>
>>> > > > >> Thanks,
>>> > > > >>
>>> > > > >> Jiangjie (Becket) Qin
>>> > > > >>
>>> > > > >>
>>> > > > >> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <ju...@confluent.io>
>>> wrote:
>>> > > > >>
>>> > > > >>> Hi, Jiangjie,
>>> > > > >>>
>>> > > > >>> 3. If a client closes a socket connection, typically the server
>>> > only
>>> > > > >>> finds
>>> > > > >>> this out by reading from the channel and getting a negative
>>> size
>>> > > during
>>> > > > >>> the
>>> > > > >>> read. So, if a channel is muted by the server, the server
>>> won't be
>>> > > able
>>> > > > >>> to
>>> > > > >>> detect the closing of the connection by the client after the
>>> socket
>>> > > is
>>> > > > >>> unmuted. That's probably what Rajini wants to convey.
>>> > > > >>>
>>> > > > >>> Thanks,
>>> > > > >>>
>>> > > > >>> Jun
>>> > > > >>>
>>> > > > >>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <
>>> becket.qin@gmail.com>
>>> > > > wrote:
>>> > > > >>>
>>> > > > >>> > Thanks Rajini.
>>> > > > >>> >
>>> > > > >>> > 1. Good point. We do need to bump up the protocol version so
>>> that
>>> > > the
>>> > > > >>> new
>>> > > > >>> > clients do not wait for another throttle time when they are
>>> > talking
>>> > > > to
>>> > > > >>> old
>>> > > > >>> > brokers. I'll update the KIP.
>>> > > > >>> >
>>> > > > >>> > 2. That is true. But the client was not supposed to send
>>> request
>>> > to
>>> > > > the
>>> > > > >>> > broker during that period anyways. So detecting the broker
>>> > failure
>>> > > > >>> later
>>> > > > >>> > seems fine?
>>> > > > >>> >
>>> > > > >>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as the
>>> > > current
>>> > > > >>> state?
>>> > > > >>> > Currently the broker will still mute the socket until it
>>> sends
>>> > the
>>> > > > >>> response
>>> > > > >>> > back. If the clients disconnect while they are being
>>> throttled,
>>> > the
>>> > > > >>> closed
>>> > > > >>> > socket will not be detected until the throttle time has
>>> passed.
>>> > > > >>> >
>>> > > > >>> > Jun also suggested to bound the time by
>>> metric.sample.window.ms
>>> > in
>>> > > > the
>>> > > > >>> > ticket. I am not sure about the bound on throttle time. It
>>> seems
>>> > a
>>> > > > >>> little
>>> > > > >>> > difficult to come up with a good bound. If the bound is too
>>> > large,
>>> > > it
>>> > > > >>> does
>>> > > > >>> > not really help solve the various timeout issue we may face.
>>> If
>>> > the
>>> > > > >>> bound
>>> > > > >>> > is too low, the quota is essentially not honored. We may
>>> > > potentially
>>> > > > >>> treat
>>> > > > >>> > different requests differently, but that seems too
>>> complicated
>>> > and
>>> > > > >>> error
>>> > > > >>> > prone.
>>> > > > >>> >
>>> > > > >>> > IMO, the key improvement we want to make is to tell the
>>> clients
>>> > how
>>> > > > >>> long
>>> > > > >>> > they will be throttled so the clients knows what happened so
>>> they
>>> > > can
>>> > > > >>> act
>>> > > > >>> > accordingly instead of waiting naively. Muting the socket on
>>> the
>>> > > > broker
>>> > > > >>> > side is just in case of non-cooperating clients. For the
>>> existing
>>> > > > >>> clients,
>>> > > > >>> > it seems this does not have much impact compare with what we
>>> have
>>> > > > now.
>>> > > > >>> >
>>> > > > >>> > Thanks,
>>> > > > >>> >
>>> > > > >>> > Jiangjie (Becket) Qin
>>> > > > >>> >
>>> > > > >>> >
>>> > > > >>> >
>>> > > > >>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <
>>> > > > >>> rajinisivaram@gmail.com>
>>> > > > >>> > wrote:
>>> > > > >>> >
>>> > > > >>> > > Hi Becket,
>>> > > > >>> > >
>>> > > > >>> > > Thank you for the KIP. A few comments:
>>> > > > >>> > >
>>> > > > >>> > > 1.KIP says:  "*No public interface changes are needed. We
>>> only
>>> > > > >>> propose
>>> > > > >>> > > behavior change on the broker side.*"
>>> > > > >>> > >
>>> > > > >>> > > But from the proposed changes, it sounds like clients will
>>> be
>>> > > > >>> updated to
>>> > > > >>> > > wait for throttle-time before sending next response, and
>>> also
>>> > not
>>> > > > >>> handle
>>> > > > >>> > > idle disconnections during that time. Doesn't that mean
>>> that
>>> > > > clients
>>> > > > >>> need
>>> > > > >>> > > to know that the broker has sent the response before
>>> > throttling,
>>> > > > >>> > requiring
>>> > > > >>> > > protocol/version change?
>>> > > > >>> > >
>>> > > > >>> > >
>>> > > > >>> > > 2. At the moment, broker failures are detected by clients
>>> (and
>>> > > vice
>>> > > > >>> > versa)
>>> > > > >>> > > within connections.max.idle.ms. By removing this check
>>> for an
>>> > > > >>> unlimited
>>> > > > >>> > > throttle time, failure detection could be delayed.
>>> > > > >>> > >
>>> > > > >>> > >
>>> > > > >>> > > 3. KIP says  "*Since this subsequent request is not
>>> actually
>>> > > > handled
>>> > > > >>> > until
>>> > > > >>> > > the broker unmutes the channel, the client can hit
>>> > > > >>> request.timeout.ms
>>> > > > >>> > > <http://request.timeout.ms> and reconnect. However, this
>>> is no
>>> > > > worse
>>> > > > >>> > than
>>> > > > >>> > > the current state.*"
>>> > > > >>> > >
>>> > > > >>> > > I think this could be worse than the current state because
>>> > broker
>>> > > > >>> doesn't
>>> > > > >>> > > detect and close the channel for an unlimited throttle
>>> time,
>>> > > while
>>> > > > >>> new
>>> > > > >>> > > connections will get accepted. As a result, lot of
>>> connections
>>> > > > could
>>> > > > >>> be
>>> > > > >>> > in
>>> > > > >>> > > CLOSE_WAIT state when throttle time is high.
>>> > > > >>> > >
>>> > > > >>> > >
>>> > > > >>> > > Perhaps it is better to combine this KIP with a bound on
>>> > throttle
>>> > > > >>> time?
>>> > > > >>> > >
>>> > > > >>> > >
>>> > > > >>> > > Regards,
>>> > > > >>> > >
>>> > > > >>> > >
>>> > > > >>> > > Rajini
>>> > > > >>> > >
>>> > > > >>> > >
>>> > > > >>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <
>>> > becket.qin@gmail.com
>>> > > >
>>> > > > >>> wrote:
>>> > > > >>> > >
>>> > > > >>> > > > Thanks for the comment, Jun,
>>> > > > >>> > > >
>>> > > > >>> > > > 1. Yes, you are right. This could also happen with the
>>> > current
>>> > > > >>> quota
>>> > > > >>> > > > mechanism because we are essentially muting the socket
>>> during
>>> > > > >>> throttle
>>> > > > >>> > > > time. There might be two ways to solve this.
>>> > > > >>> > > > A) use another socket to send heartbeat.
>>> > > > >>> > > > B) let the GroupCoordinator know that the client will not
>>> > > > >>> heartbeat for
>>> > > > >>> > > > some time.
>>> > > > >>> > > > It seems the first solution is cleaner.
>>> > > > >>> > > >
>>> > > > >>> > > > 2. For consumer it seems returning an empty response is a
>>> > > better
>>> > > > >>> > option.
>>> > > > >>> > > In
>>> > > > >>> > > > the producer case, if there is a spike in traffic. The
>>> > brokers
>>> > > > >>> will see
>>> > > > >>> > > > queued up requests, but that is hard to avoid unless we
>>> have
>>> > > > >>> connection
>>> > > > >>> > > > level quota, which is a bigger change and may be easier
>>> to
>>> > > > discuss
>>> > > > >>> it
>>> > > > >>> > in
>>> > > > >>> > > a
>>> > > > >>> > > > separate KIP.
>>> > > > >>> > > >
>>> > > > >>> > > > Thanks,
>>> > > > >>> > > >
>>> > > > >>> > > > Jiangjie (Becket) Qin
>>> > > > >>> > > >
>>> > > > >>> > > >
>>> > > > >>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <
>>> jun@confluent.io>
>>> > > > wrote:
>>> > > > >>> > > >
>>> > > > >>> > > > > Hi, Jiangjie,
>>> > > > >>> > > > >
>>> > > > >>> > > > > Thanks for bringing this up. A couple of quick
>>> thoughts.
>>> > > > >>> > > > >
>>> > > > >>> > > > > 1. If the throttle time is large, what can happen is
>>> that a
>>> > > > >>> consumer
>>> > > > >>> > > > won't
>>> > > > >>> > > > > be able to heart beat to the group coordinator frequent
>>> > > enough.
>>> > > > >>> In
>>> > > > >>> > that
>>> > > > >>> > > > > case, even with this KIP, it seems there could be
>>> frequent
>>> > > > >>> consumer
>>> > > > >>> > > group
>>> > > > >>> > > > > rebalances.
>>> > > > >>> > > > >
>>> > > > >>> > > > > 2. If we return a response immediately, for the
>>> consumer,
>>> > do
>>> > > we
>>> > > > >>> > return
>>> > > > >>> > > > the
>>> > > > >>> > > > > requested data or an empty response? If we do the
>>> former,
>>> > it
>>> > > > may
>>> > > > >>> not
>>> > > > >>> > > > > protect against the case when there are multiple
>>> consumer
>>> > > > >>> instances
>>> > > > >>> > > > > associated with the same user/clientid.
>>> > > > >>> > > > >
>>> > > > >>> > > > > Jun
>>> > > > >>> > > > >
>>> > > > >>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <
>>> > > > becket.qin@gmail.com
>>> > > > >>> >
>>> > > > >>> > > wrote:
>>> > > > >>> > > > >
>>> > > > >>> > > > > > Hi,
>>> > > > >>> > > > > >
>>> > > > >>> > > > > > We would like to start the discussion on KIP-219.
>>> > > > >>> > > > > >
>>> > > > >>> > > > > > The KIP tries to improve quota throttling time
>>> > > communication
>>> > > > >>> > between
>>> > > > >>> > > > > > brokers and clients to avoid clients timeout in case
>>> of
>>> > > long
>>> > > > >>> > > throttling
>>> > > > >>> > > > > > time.
>>> > > > >>> > > > > >
>>> > > > >>> > > > > > The KIP link is following:
>>> > > > >>> > > > > > https://cwiki.apache.org/confl
>>> uence/display/KAFKA/KIP-
>>> > > > >>> > > > > 219+-+Improve+quota+
>>> > > > >>> > > > > > communication
>>> > > > >>> > > > > >
>>> > > > >>> > > > > > Comments are welcome.
>>> > > > >>> > > > > >
>>> > > > >>> > > > > > Thanks,
>>> > > > >>> > > > > >
>>> > > > >>> > > > > > Jiangjie (Becket) Qin
>>> > > > >>> > > > > >
>>> > > > >>> > > > >
>>> > > > >>> > > >
>>> > > > >>> > >
>>> > > > >>> >
>>> > > > >>>
>>> > > > >>
>>> > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Becket Qin <be...@gmail.com>.
Thanks Rajini,

I updated the KIP wiki to clarify the scope of the KIP. To summarize, the
current quota has a few caveats:
1. The brokers are only throttling the NEXT request even if the current
request is already violating quota. So the broker will always process at
least one request from the client.
2. The brokers are not able to know the client id until they fully read the
request out of the sockets even if that client is being throttled.
3. The brokers are not communicating to the clients promptly, so the
clients have to blindly wait and sometimes times out unnecessarily.

This KIP only tries to address 3 but not 1 and 2 because A) those two
issues are sort of orthogonal to 3 and B) the solution to 1 and 2 are much
more complicated and worth a separate discussion.

I'll wait till tomorrow and start a voting thread if there are further
concerns raised about the KIP.

Thanks,

Jiangjie (Becket) Qin

On Thu, Nov 16, 2017 at 11:47 AM, Rajini Sivaram <ra...@gmail.com>
wrote:

> Hi Becket,
>
> The current user quota doesn't solve the problem. But I was thinking that
> if we could ensure we don't read more from the network than the quota
> allows, we may be able to fix the issue in a different way (throttling all
> connections, each for a limited time prior to reading large requests). But
> it would be more complex (and even more messy for client-id quotas), so I
> can understand why the solution you proposed in the KIP makes sense for the
> scenario that you described.
>
> Regards,
>
> Rajini
>
> On Tue, Nov 14, 2017 at 11:30 PM, Becket Qin <be...@gmail.com> wrote:
>
> > Hi Rajini,
> >
> > We are using SSL so we could use user quota. But I am not sure if that
> > would solve the problem. The key issue in our case is that each broker
> can
> > only handle ~300 MB/s of incoming bytes, but the MapReduce job is trying
> to
> > push 1-2 GB/s, unless we can throttle the clients to 300 MB/s, the broker
> > cannot sustain. In order to do that, we need to be able to throttle
> > requests for more than request timeout, potentially a few minutes. It
> seems
> > neither user quota nor limited throttle time can achieve this.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Nov 14, 2017 at 7:44 AM, Rajini Sivaram <rajinisivaram@gmail.com
> >
> > wrote:
> >
> > > Hi Becket,
> > >
> > > For the specific scenario that you described, would it be possible to
> use
> > > user quotas rather than client-id quotas? With user quotas, perhaps we
> > can
> > > throttle more easily before reading requests as well (as you mentioned,
> > the
> > > difficulty with client-id quota is that we have to read partial
> requests
> > > and handle client-ids at network layer making that a much bigger
> change).
> > > If your clients are using SASL/SSL, I was wondering whether a solution
> > that
> > > improves user quotas and limits throttle time would work for you.
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > >
> > >
> > > On Thu, Nov 9, 2017 at 12:59 AM, Becket Qin <be...@gmail.com>
> > wrote:
> > >
> > > > Since we will bump up the wire request version, another option is for
> > > > clients that are sending old request versions the broker can just
> keep
> > > the
> > > > current behavior. For clients sending the new request versions, the
> > > broker
> > > > can respond then mute the channel as described in the KIP wiki. In
> this
> > > > case, muting the channel is mostly for protection purpose. A
> correctly
> > > > implemented client should back off for throttle time before sending
> the
> > > > next request. The downside is that the broker needs to keep both
> logic
> > > and
> > > > it seems not gaining much benefit. So personally I prefer to just
> mute
> > > the
> > > > channel. But I am open to different opinions.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <be...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > Hmm, even if a connection is closed by the client when the channel
> is
> > > > > muted. After the channel is unmuted, it seems Selector.select()
> will
> > > > detect
> > > > > this and close the socket.
> > > > > It is true that before the channel is unmuted the socket will be
> in a
> > > > > CLOSE_WAIT state though. So having an arbitrarily long muted
> duration
> > > may
> > > > > indeed cause problem.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <be...@gmail.com>
> > > wrote:
> > > > >
> > > > >> Hi Rajini,
> > > > >>
> > > > >> Thanks for the detail explanation. Please see the reply below:
> > > > >>
> > > > >> 2. Limiting the throttle time to connection.max.idle.ms on the
> > broker
> > > > >> side is probably fine. However, clients may have a different
> > > > configuration
> > > > >> of connection.max.idle.ms and still reconnect before the throttle
> > > time
> > > > >> (which is the server side connection.max.idle.ms). It seems
> another
> > > > back
> > > > >> door for quota.
> > > > >>
> > > > >> 3. I agree we could just mute the server socket until
> > > > >> connection.max.idle.ms if the massive CLOSE_WAIT is a big issue.
> > This
> > > > >> helps guarantee only connection_rate * connection.max.idle.ms
> > sockets
> > > > >> will be in CLOSE_WAIT state. For cooperative clients, unmuting the
> > > > socket
> > > > >> will not have negative impact.
> > > > >>
> > > > >> 4. My concern for capping the throttle time to metrics.window.ms
> is
> > > > that
> > > > >> we will not be able to enforce quota effectively. It might be
> useful
> > > to
> > > > >> explain this with a real example we are trying to solve. We have a
> > > > >> MapReduce job pushing data to a Kafka cluster. The MapReduce job
> has
> > > > >> hundreds of producers and each of them sends a normal sized
> > > > ProduceRequest
> > > > >> (~2 MB) to each of the brokers in the cluster. Apparently the
> client
> > > id
> > > > >> will ran out of bytes quota pretty quickly, and the broker started
> > to
> > > > >> throttle the producers. The throttle time could actually be pretty
> > > long
> > > > >> (e.g. a few minute). At that point, request queue time on the
> > brokers
> > > > was
> > > > >> around 30 seconds. After that, a bunch of producer hit
> > > > request.timeout.ms
> > > > >> and reconnected and sent the next request again, which causes
> > another
> > > > spike
> > > > >> and a longer queue.
> > > > >>
> > > > >> In the above case, unless we set the quota window to be pretty
> big,
> > we
> > > > >> will not be able to enforce the quota. And if we set the window
> size
> > > to
> > > > a
> > > > >> large value, the request might be throttled for longer than
> > > > >> connection.max.idle.ms.
> > > > >>
> > > > >> > We need a solution to improve flow control for well-behaved
> > clients
> > > > >> > which currently rely entirely on broker's throttling. The KIP
> > > > addresses
> > > > >> > this using co-operative clients that sleep for an unbounded
> > throttle
> > > > >> time.
> > > > >> > I feel this is not ideal since the result is traffic with a lot
> of
> > > > >> spikes.
> > > > >> > Feedback from brokers to enable flow control in the client is a
> > good
> > > > >> idea,
> > > > >> > but clients with excessive throttle times should really have
> been
> > > > >> > configured with smaller batch sizes.
> > > > >>
> > > > >> This is not really about a single producer with large size, it is
> a
> > > lot
> > > > >> of small producers talking to the client at the same time.
> Reducing
> > > the
> > > > >> batch size does not help much here. Also note that after the spike
> > > > >> traffic at the very beginning, the throttle time of the
> > > ProduceRequests
> > > > >> processed later are actually going to be increasing (for example,
> > the
> > > > first
> > > > >> throttled request will be throttled for 1 second, the second
> > throttled
> > > > >> request will be throttled for 10 sec, etc.). Due to the throttle
> > time
> > > > >> variation, if every producer honors the throttle time, there will
> > not
> > > > be a
> > > > >> next spike after the first produce.
> > > > >>
> > > > >> > We need a solution to enforce smaller quotas to protect the
> broker
> > > > >> > from misbehaving clients. The KIP addresses this by muting
> > channels
> > > > for
> > > > >> an
> > > > >> > unbounded time. This introduces problems of channels in
> > CLOSE_WAIT.
> > > > And
> > > > >> > doesn't really solve all issues with misbehaving clients since
> new
> > > > >> > connections can be created to bypass quotas.
> > > > >>
> > > > >> Our current quota only protects cooperating clients because our
> > quota
> > > is
> > > > >> really throttling the NEXT request after process a request even if
> > > this
> > > > >> request itself has already violated quota. The misbehaving client
> > are
> > > > not
> > > > >> protected at all with the current quota mechanism. Like you
> > > mentioned, a
> > > > >> connection quota is required. We have been discussing about this
> at
> > > > >> LinkedIn for some time. Doing it right requires some major changes
> > > such
> > > > as
> > > > >> partially reading a request to identify the client id at network
> > level
> > > > and
> > > > >> disconnect misbehaving clients.
> > > > >>
> > > > >> While handling misbehaving clients is important, we are not trying
> > to
> > > > >> address that in this KIP. This KIP is trying to improve the
> > > > communication
> > > > >> with good clients. Muting the channel is more of a migration plan
> so
> > > > that
> > > > >> we don't have regression on the old innocent (but good) clients.
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Jiangjie (Becket) Qin
> > > > >>
> > > > >>
> > > > >> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <ju...@confluent.io> wrote:
> > > > >>
> > > > >>> Hi, Jiangjie,
> > > > >>>
> > > > >>> 3. If a client closes a socket connection, typically the server
> > only
> > > > >>> finds
> > > > >>> this out by reading from the channel and getting a negative size
> > > during
> > > > >>> the
> > > > >>> read. So, if a channel is muted by the server, the server won't
> be
> > > able
> > > > >>> to
> > > > >>> detect the closing of the connection by the client after the
> socket
> > > is
> > > > >>> unmuted. That's probably what Rajini wants to convey.
> > > > >>>
> > > > >>> Thanks,
> > > > >>>
> > > > >>> Jun
> > > > >>>
> > > > >>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <becket.qin@gmail.com
> >
> > > > wrote:
> > > > >>>
> > > > >>> > Thanks Rajini.
> > > > >>> >
> > > > >>> > 1. Good point. We do need to bump up the protocol version so
> that
> > > the
> > > > >>> new
> > > > >>> > clients do not wait for another throttle time when they are
> > talking
> > > > to
> > > > >>> old
> > > > >>> > brokers. I'll update the KIP.
> > > > >>> >
> > > > >>> > 2. That is true. But the client was not supposed to send
> request
> > to
> > > > the
> > > > >>> > broker during that period anyways. So detecting the broker
> > failure
> > > > >>> later
> > > > >>> > seems fine?
> > > > >>> >
> > > > >>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as the
> > > current
> > > > >>> state?
> > > > >>> > Currently the broker will still mute the socket until it sends
> > the
> > > > >>> response
> > > > >>> > back. If the clients disconnect while they are being throttled,
> > the
> > > > >>> closed
> > > > >>> > socket will not be detected until the throttle time has passed.
> > > > >>> >
> > > > >>> > Jun also suggested to bound the time by
> metric.sample.window.ms
> > in
> > > > the
> > > > >>> > ticket. I am not sure about the bound on throttle time. It
> seems
> > a
> > > > >>> little
> > > > >>> > difficult to come up with a good bound. If the bound is too
> > large,
> > > it
> > > > >>> does
> > > > >>> > not really help solve the various timeout issue we may face. If
> > the
> > > > >>> bound
> > > > >>> > is too low, the quota is essentially not honored. We may
> > > potentially
> > > > >>> treat
> > > > >>> > different requests differently, but that seems too complicated
> > and
> > > > >>> error
> > > > >>> > prone.
> > > > >>> >
> > > > >>> > IMO, the key improvement we want to make is to tell the clients
> > how
> > > > >>> long
> > > > >>> > they will be throttled so the clients knows what happened so
> they
> > > can
> > > > >>> act
> > > > >>> > accordingly instead of waiting naively. Muting the socket on
> the
> > > > broker
> > > > >>> > side is just in case of non-cooperating clients. For the
> existing
> > > > >>> clients,
> > > > >>> > it seems this does not have much impact compare with what we
> have
> > > > now.
> > > > >>> >
> > > > >>> > Thanks,
> > > > >>> >
> > > > >>> > Jiangjie (Becket) Qin
> > > > >>> >
> > > > >>> >
> > > > >>> >
> > > > >>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <
> > > > >>> rajinisivaram@gmail.com>
> > > > >>> > wrote:
> > > > >>> >
> > > > >>> > > Hi Becket,
> > > > >>> > >
> > > > >>> > > Thank you for the KIP. A few comments:
> > > > >>> > >
> > > > >>> > > 1.KIP says:  "*No public interface changes are needed. We
> only
> > > > >>> propose
> > > > >>> > > behavior change on the broker side.*"
> > > > >>> > >
> > > > >>> > > But from the proposed changes, it sounds like clients will be
> > > > >>> updated to
> > > > >>> > > wait for throttle-time before sending next response, and also
> > not
> > > > >>> handle
> > > > >>> > > idle disconnections during that time. Doesn't that mean that
> > > > clients
> > > > >>> need
> > > > >>> > > to know that the broker has sent the response before
> > throttling,
> > > > >>> > requiring
> > > > >>> > > protocol/version change?
> > > > >>> > >
> > > > >>> > >
> > > > >>> > > 2. At the moment, broker failures are detected by clients
> (and
> > > vice
> > > > >>> > versa)
> > > > >>> > > within connections.max.idle.ms. By removing this check for
> an
> > > > >>> unlimited
> > > > >>> > > throttle time, failure detection could be delayed.
> > > > >>> > >
> > > > >>> > >
> > > > >>> > > 3. KIP says  "*Since this subsequent request is not actually
> > > > handled
> > > > >>> > until
> > > > >>> > > the broker unmutes the channel, the client can hit
> > > > >>> request.timeout.ms
> > > > >>> > > <http://request.timeout.ms> and reconnect. However, this is
> no
> > > > worse
> > > > >>> > than
> > > > >>> > > the current state.*"
> > > > >>> > >
> > > > >>> > > I think this could be worse than the current state because
> > broker
> > > > >>> doesn't
> > > > >>> > > detect and close the channel for an unlimited throttle time,
> > > while
> > > > >>> new
> > > > >>> > > connections will get accepted. As a result, lot of
> connections
> > > > could
> > > > >>> be
> > > > >>> > in
> > > > >>> > > CLOSE_WAIT state when throttle time is high.
> > > > >>> > >
> > > > >>> > >
> > > > >>> > > Perhaps it is better to combine this KIP with a bound on
> > throttle
> > > > >>> time?
> > > > >>> > >
> > > > >>> > >
> > > > >>> > > Regards,
> > > > >>> > >
> > > > >>> > >
> > > > >>> > > Rajini
> > > > >>> > >
> > > > >>> > >
> > > > >>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <
> > becket.qin@gmail.com
> > > >
> > > > >>> wrote:
> > > > >>> > >
> > > > >>> > > > Thanks for the comment, Jun,
> > > > >>> > > >
> > > > >>> > > > 1. Yes, you are right. This could also happen with the
> > current
> > > > >>> quota
> > > > >>> > > > mechanism because we are essentially muting the socket
> during
> > > > >>> throttle
> > > > >>> > > > time. There might be two ways to solve this.
> > > > >>> > > > A) use another socket to send heartbeat.
> > > > >>> > > > B) let the GroupCoordinator know that the client will not
> > > > >>> heartbeat for
> > > > >>> > > > some time.
> > > > >>> > > > It seems the first solution is cleaner.
> > > > >>> > > >
> > > > >>> > > > 2. For consumer it seems returning an empty response is a
> > > better
> > > > >>> > option.
> > > > >>> > > In
> > > > >>> > > > the producer case, if there is a spike in traffic. The
> > brokers
> > > > >>> will see
> > > > >>> > > > queued up requests, but that is hard to avoid unless we
> have
> > > > >>> connection
> > > > >>> > > > level quota, which is a bigger change and may be easier to
> > > > discuss
> > > > >>> it
> > > > >>> > in
> > > > >>> > > a
> > > > >>> > > > separate KIP.
> > > > >>> > > >
> > > > >>> > > > Thanks,
> > > > >>> > > >
> > > > >>> > > > Jiangjie (Becket) Qin
> > > > >>> > > >
> > > > >>> > > >
> > > > >>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <jun@confluent.io
> >
> > > > wrote:
> > > > >>> > > >
> > > > >>> > > > > Hi, Jiangjie,
> > > > >>> > > > >
> > > > >>> > > > > Thanks for bringing this up. A couple of quick thoughts.
> > > > >>> > > > >
> > > > >>> > > > > 1. If the throttle time is large, what can happen is
> that a
> > > > >>> consumer
> > > > >>> > > > won't
> > > > >>> > > > > be able to heart beat to the group coordinator frequent
> > > enough.
> > > > >>> In
> > > > >>> > that
> > > > >>> > > > > case, even with this KIP, it seems there could be
> frequent
> > > > >>> consumer
> > > > >>> > > group
> > > > >>> > > > > rebalances.
> > > > >>> > > > >
> > > > >>> > > > > 2. If we return a response immediately, for the consumer,
> > do
> > > we
> > > > >>> > return
> > > > >>> > > > the
> > > > >>> > > > > requested data or an empty response? If we do the former,
> > it
> > > > may
> > > > >>> not
> > > > >>> > > > > protect against the case when there are multiple consumer
> > > > >>> instances
> > > > >>> > > > > associated with the same user/clientid.
> > > > >>> > > > >
> > > > >>> > > > > Jun
> > > > >>> > > > >
> > > > >>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <
> > > > becket.qin@gmail.com
> > > > >>> >
> > > > >>> > > wrote:
> > > > >>> > > > >
> > > > >>> > > > > > Hi,
> > > > >>> > > > > >
> > > > >>> > > > > > We would like to start the discussion on KIP-219.
> > > > >>> > > > > >
> > > > >>> > > > > > The KIP tries to improve quota throttling time
> > > communication
> > > > >>> > between
> > > > >>> > > > > > brokers and clients to avoid clients timeout in case of
> > > long
> > > > >>> > > throttling
> > > > >>> > > > > > time.
> > > > >>> > > > > >
> > > > >>> > > > > > The KIP link is following:
> > > > >>> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > >>> > > > > 219+-+Improve+quota+
> > > > >>> > > > > > communication
> > > > >>> > > > > >
> > > > >>> > > > > > Comments are welcome.
> > > > >>> > > > > >
> > > > >>> > > > > > Thanks,
> > > > >>> > > > > >
> > > > >>> > > > > > Jiangjie (Becket) Qin
> > > > >>> > > > > >
> > > > >>> > > > >
> > > > >>> > > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Rajini Sivaram <ra...@gmail.com>.
Hi Becket,

The current user quota doesn't solve the problem. But I was thinking that
if we could ensure we don't read more from the network than the quota
allows, we may be able to fix the issue in a different way (throttling all
connections, each for a limited time prior to reading large requests). But
it would be more complex (and even more messy for client-id quotas), so I
can understand why the solution you proposed in the KIP makes sense for the
scenario that you described.

Regards,

Rajini

On Tue, Nov 14, 2017 at 11:30 PM, Becket Qin <be...@gmail.com> wrote:

> Hi Rajini,
>
> We are using SSL so we could use user quota. But I am not sure if that
> would solve the problem. The key issue in our case is that each broker can
> only handle ~300 MB/s of incoming bytes, but the MapReduce job is trying to
> push 1-2 GB/s, unless we can throttle the clients to 300 MB/s, the broker
> cannot sustain. In order to do that, we need to be able to throttle
> requests for more than request timeout, potentially a few minutes. It seems
> neither user quota nor limited throttle time can achieve this.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Nov 14, 2017 at 7:44 AM, Rajini Sivaram <ra...@gmail.com>
> wrote:
>
> > Hi Becket,
> >
> > For the specific scenario that you described, would it be possible to use
> > user quotas rather than client-id quotas? With user quotas, perhaps we
> can
> > throttle more easily before reading requests as well (as you mentioned,
> the
> > difficulty with client-id quota is that we have to read partial requests
> > and handle client-ids at network layer making that a much bigger change).
> > If your clients are using SASL/SSL, I was wondering whether a solution
> that
> > improves user quotas and limits throttle time would work for you.
> >
> > Regards,
> >
> > Rajini
> >
> >
> >
> > On Thu, Nov 9, 2017 at 12:59 AM, Becket Qin <be...@gmail.com>
> wrote:
> >
> > > Since we will bump up the wire request version, another option is for
> > > clients that are sending old request versions the broker can just keep
> > the
> > > current behavior. For clients sending the new request versions, the
> > broker
> > > can respond then mute the channel as described in the KIP wiki. In this
> > > case, muting the channel is mostly for protection purpose. A correctly
> > > implemented client should back off for throttle time before sending the
> > > next request. The downside is that the broker needs to keep both logic
> > and
> > > it seems not gaining much benefit. So personally I prefer to just mute
> > the
> > > channel. But I am open to different opinions.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <be...@gmail.com>
> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Hmm, even if a connection is closed by the client when the channel is
> > > > muted. After the channel is unmuted, it seems Selector.select() will
> > > detect
> > > > this and close the socket.
> > > > It is true that before the channel is unmuted the socket will be in a
> > > > CLOSE_WAIT state though. So having an arbitrarily long muted duration
> > may
> > > > indeed cause problem.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <be...@gmail.com>
> > wrote:
> > > >
> > > >> Hi Rajini,
> > > >>
> > > >> Thanks for the detail explanation. Please see the reply below:
> > > >>
> > > >> 2. Limiting the throttle time to connection.max.idle.ms on the
> broker
> > > >> side is probably fine. However, clients may have a different
> > > configuration
> > > >> of connection.max.idle.ms and still reconnect before the throttle
> > time
> > > >> (which is the server side connection.max.idle.ms). It seems another
> > > back
> > > >> door for quota.
> > > >>
> > > >> 3. I agree we could just mute the server socket until
> > > >> connection.max.idle.ms if the massive CLOSE_WAIT is a big issue.
> This
> > > >> helps guarantee only connection_rate * connection.max.idle.ms
> sockets
> > > >> will be in CLOSE_WAIT state. For cooperative clients, unmuting the
> > > socket
> > > >> will not have negative impact.
> > > >>
> > > >> 4. My concern for capping the throttle time to metrics.window.ms is
> > > that
> > > >> we will not be able to enforce quota effectively. It might be useful
> > to
> > > >> explain this with a real example we are trying to solve. We have a
> > > >> MapReduce job pushing data to a Kafka cluster. The MapReduce job has
> > > >> hundreds of producers and each of them sends a normal sized
> > > ProduceRequest
> > > >> (~2 MB) to each of the brokers in the cluster. Apparently the client
> > id
> > > >> will ran out of bytes quota pretty quickly, and the broker started
> to
> > > >> throttle the producers. The throttle time could actually be pretty
> > long
> > > >> (e.g. a few minute). At that point, request queue time on the
> brokers
> > > was
> > > >> around 30 seconds. After that, a bunch of producer hit
> > > request.timeout.ms
> > > >> and reconnected and sent the next request again, which causes
> another
> > > spike
> > > >> and a longer queue.
> > > >>
> > > >> In the above case, unless we set the quota window to be pretty big,
> we
> > > >> will not be able to enforce the quota. And if we set the window size
> > to
> > > a
> > > >> large value, the request might be throttled for longer than
> > > >> connection.max.idle.ms.
> > > >>
> > > >> > We need a solution to improve flow control for well-behaved
> clients
> > > >> > which currently rely entirely on broker's throttling. The KIP
> > > addresses
> > > >> > this using co-operative clients that sleep for an unbounded
> throttle
> > > >> time.
> > > >> > I feel this is not ideal since the result is traffic with a lot of
> > > >> spikes.
> > > >> > Feedback from brokers to enable flow control in the client is a
> good
> > > >> idea,
> > > >> > but clients with excessive throttle times should really have been
> > > >> > configured with smaller batch sizes.
> > > >>
> > > >> This is not really about a single producer with large size, it is a
> > lot
> > > >> of small producers talking to the client at the same time. Reducing
> > the
> > > >> batch size does not help much here. Also note that after the spike
> > > >> traffic at the very beginning, the throttle time of the
> > ProduceRequests
> > > >> processed later are actually going to be increasing (for example,
> the
> > > first
> > > >> throttled request will be throttled for 1 second, the second
> throttled
> > > >> request will be throttled for 10 sec, etc.). Due to the throttle
> time
> > > >> variation, if every producer honors the throttle time, there will
> not
> > > be a
> > > >> next spike after the first produce.
> > > >>
> > > >> > We need a solution to enforce smaller quotas to protect the broker
> > > >> > from misbehaving clients. The KIP addresses this by muting
> channels
> > > for
> > > >> an
> > > >> > unbounded time. This introduces problems of channels in
> CLOSE_WAIT.
> > > And
> > > >> > doesn't really solve all issues with misbehaving clients since new
> > > >> > connections can be created to bypass quotas.
> > > >>
> > > >> Our current quota only protects cooperating clients because our
> quota
> > is
> > > >> really throttling the NEXT request after process a request even if
> > this
> > > >> request itself has already violated quota. The misbehaving client
> are
> > > not
> > > >> protected at all with the current quota mechanism. Like you
> > mentioned, a
> > > >> connection quota is required. We have been discussing about this at
> > > >> LinkedIn for some time. Doing it right requires some major changes
> > such
> > > as
> > > >> partially reading a request to identify the client id at network
> level
> > > and
> > > >> disconnect misbehaving clients.
> > > >>
> > > >> While handling misbehaving clients is important, we are not trying
> to
> > > >> address that in this KIP. This KIP is trying to improve the
> > > communication
> > > >> with good clients. Muting the channel is more of a migration plan so
> > > that
> > > >> we don't have regression on the old innocent (but good) clients.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jiangjie (Becket) Qin
> > > >>
> > > >>
> > > >> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <ju...@confluent.io> wrote:
> > > >>
> > > >>> Hi, Jiangjie,
> > > >>>
> > > >>> 3. If a client closes a socket connection, typically the server
> only
> > > >>> finds
> > > >>> this out by reading from the channel and getting a negative size
> > during
> > > >>> the
> > > >>> read. So, if a channel is muted by the server, the server won't be
> > able
> > > >>> to
> > > >>> detect the closing of the connection by the client after the socket
> > is
> > > >>> unmuted. That's probably what Rajini wants to convey.
> > > >>>
> > > >>> Thanks,
> > > >>>
> > > >>> Jun
> > > >>>
> > > >>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <be...@gmail.com>
> > > wrote:
> > > >>>
> > > >>> > Thanks Rajini.
> > > >>> >
> > > >>> > 1. Good point. We do need to bump up the protocol version so that
> > the
> > > >>> new
> > > >>> > clients do not wait for another throttle time when they are
> talking
> > > to
> > > >>> old
> > > >>> > brokers. I'll update the KIP.
> > > >>> >
> > > >>> > 2. That is true. But the client was not supposed to send request
> to
> > > the
> > > >>> > broker during that period anyways. So detecting the broker
> failure
> > > >>> later
> > > >>> > seems fine?
> > > >>> >
> > > >>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as the
> > current
> > > >>> state?
> > > >>> > Currently the broker will still mute the socket until it sends
> the
> > > >>> response
> > > >>> > back. If the clients disconnect while they are being throttled,
> the
> > > >>> closed
> > > >>> > socket will not be detected until the throttle time has passed.
> > > >>> >
> > > >>> > Jun also suggested to bound the time by metric.sample.window.ms
> in
> > > the
> > > >>> > ticket. I am not sure about the bound on throttle time. It seems
> a
> > > >>> little
> > > >>> > difficult to come up with a good bound. If the bound is too
> large,
> > it
> > > >>> does
> > > >>> > not really help solve the various timeout issue we may face. If
> the
> > > >>> bound
> > > >>> > is too low, the quota is essentially not honored. We may
> > potentially
> > > >>> treat
> > > >>> > different requests differently, but that seems too complicated
> and
> > > >>> error
> > > >>> > prone.
> > > >>> >
> > > >>> > IMO, the key improvement we want to make is to tell the clients
> how
> > > >>> long
> > > >>> > they will be throttled so the clients knows what happened so they
> > can
> > > >>> act
> > > >>> > accordingly instead of waiting naively. Muting the socket on the
> > > broker
> > > >>> > side is just in case of non-cooperating clients. For the existing
> > > >>> clients,
> > > >>> > it seems this does not have much impact compare with what we have
> > > now.
> > > >>> >
> > > >>> > Thanks,
> > > >>> >
> > > >>> > Jiangjie (Becket) Qin
> > > >>> >
> > > >>> >
> > > >>> >
> > > >>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <
> > > >>> rajinisivaram@gmail.com>
> > > >>> > wrote:
> > > >>> >
> > > >>> > > Hi Becket,
> > > >>> > >
> > > >>> > > Thank you for the KIP. A few comments:
> > > >>> > >
> > > >>> > > 1.KIP says:  "*No public interface changes are needed. We only
> > > >>> propose
> > > >>> > > behavior change on the broker side.*"
> > > >>> > >
> > > >>> > > But from the proposed changes, it sounds like clients will be
> > > >>> updated to
> > > >>> > > wait for throttle-time before sending next response, and also
> not
> > > >>> handle
> > > >>> > > idle disconnections during that time. Doesn't that mean that
> > > clients
> > > >>> need
> > > >>> > > to know that the broker has sent the response before
> throttling,
> > > >>> > requiring
> > > >>> > > protocol/version change?
> > > >>> > >
> > > >>> > >
> > > >>> > > 2. At the moment, broker failures are detected by clients (and
> > vice
> > > >>> > versa)
> > > >>> > > within connections.max.idle.ms. By removing this check for an
> > > >>> unlimited
> > > >>> > > throttle time, failure detection could be delayed.
> > > >>> > >
> > > >>> > >
> > > >>> > > 3. KIP says  "*Since this subsequent request is not actually
> > > handled
> > > >>> > until
> > > >>> > > the broker unmutes the channel, the client can hit
> > > >>> request.timeout.ms
> > > >>> > > <http://request.timeout.ms> and reconnect. However, this is no
> > > worse
> > > >>> > than
> > > >>> > > the current state.*"
> > > >>> > >
> > > >>> > > I think this could be worse than the current state because
> broker
> > > >>> doesn't
> > > >>> > > detect and close the channel for an unlimited throttle time,
> > while
> > > >>> new
> > > >>> > > connections will get accepted. As a result, lot of connections
> > > could
> > > >>> be
> > > >>> > in
> > > >>> > > CLOSE_WAIT state when throttle time is high.
> > > >>> > >
> > > >>> > >
> > > >>> > > Perhaps it is better to combine this KIP with a bound on
> throttle
> > > >>> time?
> > > >>> > >
> > > >>> > >
> > > >>> > > Regards,
> > > >>> > >
> > > >>> > >
> > > >>> > > Rajini
> > > >>> > >
> > > >>> > >
> > > >>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <
> becket.qin@gmail.com
> > >
> > > >>> wrote:
> > > >>> > >
> > > >>> > > > Thanks for the comment, Jun,
> > > >>> > > >
> > > >>> > > > 1. Yes, you are right. This could also happen with the
> current
> > > >>> quota
> > > >>> > > > mechanism because we are essentially muting the socket during
> > > >>> throttle
> > > >>> > > > time. There might be two ways to solve this.
> > > >>> > > > A) use another socket to send heartbeat.
> > > >>> > > > B) let the GroupCoordinator know that the client will not
> > > >>> heartbeat for
> > > >>> > > > some time.
> > > >>> > > > It seems the first solution is cleaner.
> > > >>> > > >
> > > >>> > > > 2. For consumer it seems returning an empty response is a
> > better
> > > >>> > option.
> > > >>> > > In
> > > >>> > > > the producer case, if there is a spike in traffic. The
> brokers
> > > >>> will see
> > > >>> > > > queued up requests, but that is hard to avoid unless we have
> > > >>> connection
> > > >>> > > > level quota, which is a bigger change and may be easier to
> > > discuss
> > > >>> it
> > > >>> > in
> > > >>> > > a
> > > >>> > > > separate KIP.
> > > >>> > > >
> > > >>> > > > Thanks,
> > > >>> > > >
> > > >>> > > > Jiangjie (Becket) Qin
> > > >>> > > >
> > > >>> > > >
> > > >>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <ju...@confluent.io>
> > > wrote:
> > > >>> > > >
> > > >>> > > > > Hi, Jiangjie,
> > > >>> > > > >
> > > >>> > > > > Thanks for bringing this up. A couple of quick thoughts.
> > > >>> > > > >
> > > >>> > > > > 1. If the throttle time is large, what can happen is that a
> > > >>> consumer
> > > >>> > > > won't
> > > >>> > > > > be able to heart beat to the group coordinator frequent
> > enough.
> > > >>> In
> > > >>> > that
> > > >>> > > > > case, even with this KIP, it seems there could be frequent
> > > >>> consumer
> > > >>> > > group
> > > >>> > > > > rebalances.
> > > >>> > > > >
> > > >>> > > > > 2. If we return a response immediately, for the consumer,
> do
> > we
> > > >>> > return
> > > >>> > > > the
> > > >>> > > > > requested data or an empty response? If we do the former,
> it
> > > may
> > > >>> not
> > > >>> > > > > protect against the case when there are multiple consumer
> > > >>> instances
> > > >>> > > > > associated with the same user/clientid.
> > > >>> > > > >
> > > >>> > > > > Jun
> > > >>> > > > >
> > > >>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <
> > > becket.qin@gmail.com
> > > >>> >
> > > >>> > > wrote:
> > > >>> > > > >
> > > >>> > > > > > Hi,
> > > >>> > > > > >
> > > >>> > > > > > We would like to start the discussion on KIP-219.
> > > >>> > > > > >
> > > >>> > > > > > The KIP tries to improve quota throttling time
> > communication
> > > >>> > between
> > > >>> > > > > > brokers and clients to avoid clients timeout in case of
> > long
> > > >>> > > throttling
> > > >>> > > > > > time.
> > > >>> > > > > >
> > > >>> > > > > > The KIP link is following:
> > > >>> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >>> > > > > 219+-+Improve+quota+
> > > >>> > > > > > communication
> > > >>> > > > > >
> > > >>> > > > > > Comments are welcome.
> > > >>> > > > > >
> > > >>> > > > > > Thanks,
> > > >>> > > > > >
> > > >>> > > > > > Jiangjie (Becket) Qin
> > > >>> > > > > >
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Becket Qin <be...@gmail.com>.
Hi Rajini,

We are using SSL so we could use user quota. But I am not sure if that
would solve the problem. The key issue in our case is that each broker can
only handle ~300 MB/s of incoming bytes, but the MapReduce job is trying to
push 1-2 GB/s, unless we can throttle the clients to 300 MB/s, the broker
cannot sustain. In order to do that, we need to be able to throttle
requests for more than request timeout, potentially a few minutes. It seems
neither user quota nor limited throttle time can achieve this.

Thanks,

Jiangjie (Becket) Qin

On Tue, Nov 14, 2017 at 7:44 AM, Rajini Sivaram <ra...@gmail.com>
wrote:

> Hi Becket,
>
> For the specific scenario that you described, would it be possible to use
> user quotas rather than client-id quotas? With user quotas, perhaps we can
> throttle more easily before reading requests as well (as you mentioned, the
> difficulty with client-id quota is that we have to read partial requests
> and handle client-ids at network layer making that a much bigger change).
> If your clients are using SASL/SSL, I was wondering whether a solution that
> improves user quotas and limits throttle time would work for you.
>
> Regards,
>
> Rajini
>
>
>
> On Thu, Nov 9, 2017 at 12:59 AM, Becket Qin <be...@gmail.com> wrote:
>
> > Since we will bump up the wire request version, another option is for
> > clients that are sending old request versions the broker can just keep
> the
> > current behavior. For clients sending the new request versions, the
> broker
> > can respond then mute the channel as described in the KIP wiki. In this
> > case, muting the channel is mostly for protection purpose. A correctly
> > implemented client should back off for throttle time before sending the
> > next request. The downside is that the broker needs to keep both logic
> and
> > it seems not gaining much benefit. So personally I prefer to just mute
> the
> > channel. But I am open to different opinions.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <be...@gmail.com> wrote:
> >
> > > Hi Jun,
> > >
> > > Hmm, even if a connection is closed by the client when the channel is
> > > muted. After the channel is unmuted, it seems Selector.select() will
> > detect
> > > this and close the socket.
> > > It is true that before the channel is unmuted the socket will be in a
> > > CLOSE_WAIT state though. So having an arbitrarily long muted duration
> may
> > > indeed cause problem.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <be...@gmail.com>
> wrote:
> > >
> > >> Hi Rajini,
> > >>
> > >> Thanks for the detail explanation. Please see the reply below:
> > >>
> > >> 2. Limiting the throttle time to connection.max.idle.ms on the broker
> > >> side is probably fine. However, clients may have a different
> > configuration
> > >> of connection.max.idle.ms and still reconnect before the throttle
> time
> > >> (which is the server side connection.max.idle.ms). It seems another
> > back
> > >> door for quota.
> > >>
> > >> 3. I agree we could just mute the server socket until
> > >> connection.max.idle.ms if the massive CLOSE_WAIT is a big issue. This
> > >> helps guarantee only connection_rate * connection.max.idle.ms sockets
> > >> will be in CLOSE_WAIT state. For cooperative clients, unmuting the
> > socket
> > >> will not have negative impact.
> > >>
> > >> 4. My concern for capping the throttle time to metrics.window.ms is
> > that
> > >> we will not be able to enforce quota effectively. It might be useful
> to
> > >> explain this with a real example we are trying to solve. We have a
> > >> MapReduce job pushing data to a Kafka cluster. The MapReduce job has
> > >> hundreds of producers and each of them sends a normal sized
> > ProduceRequest
> > >> (~2 MB) to each of the brokers in the cluster. Apparently the client
> id
> > >> will ran out of bytes quota pretty quickly, and the broker started to
> > >> throttle the producers. The throttle time could actually be pretty
> long
> > >> (e.g. a few minute). At that point, request queue time on the brokers
> > was
> > >> around 30 seconds. After that, a bunch of producer hit
> > request.timeout.ms
> > >> and reconnected and sent the next request again, which causes another
> > spike
> > >> and a longer queue.
> > >>
> > >> In the above case, unless we set the quota window to be pretty big, we
> > >> will not be able to enforce the quota. And if we set the window size
> to
> > a
> > >> large value, the request might be throttled for longer than
> > >> connection.max.idle.ms.
> > >>
> > >> > We need a solution to improve flow control for well-behaved clients
> > >> > which currently rely entirely on broker's throttling. The KIP
> > addresses
> > >> > this using co-operative clients that sleep for an unbounded throttle
> > >> time.
> > >> > I feel this is not ideal since the result is traffic with a lot of
> > >> spikes.
> > >> > Feedback from brokers to enable flow control in the client is a good
> > >> idea,
> > >> > but clients with excessive throttle times should really have been
> > >> > configured with smaller batch sizes.
> > >>
> > >> This is not really about a single producer with large size, it is a
> lot
> > >> of small producers talking to the client at the same time. Reducing
> the
> > >> batch size does not help much here. Also note that after the spike
> > >> traffic at the very beginning, the throttle time of the
> ProduceRequests
> > >> processed later are actually going to be increasing (for example, the
> > first
> > >> throttled request will be throttled for 1 second, the second throttled
> > >> request will be throttled for 10 sec, etc.). Due to the throttle time
> > >> variation, if every producer honors the throttle time, there will not
> > be a
> > >> next spike after the first produce.
> > >>
> > >> > We need a solution to enforce smaller quotas to protect the broker
> > >> > from misbehaving clients. The KIP addresses this by muting channels
> > for
> > >> an
> > >> > unbounded time. This introduces problems of channels in CLOSE_WAIT.
> > And
> > >> > doesn't really solve all issues with misbehaving clients since new
> > >> > connections can be created to bypass quotas.
> > >>
> > >> Our current quota only protects cooperating clients because our quota
> is
> > >> really throttling the NEXT request after process a request even if
> this
> > >> request itself has already violated quota. The misbehaving client are
> > not
> > >> protected at all with the current quota mechanism. Like you
> mentioned, a
> > >> connection quota is required. We have been discussing about this at
> > >> LinkedIn for some time. Doing it right requires some major changes
> such
> > as
> > >> partially reading a request to identify the client id at network level
> > and
> > >> disconnect misbehaving clients.
> > >>
> > >> While handling misbehaving clients is important, we are not trying to
> > >> address that in this KIP. This KIP is trying to improve the
> > communication
> > >> with good clients. Muting the channel is more of a migration plan so
> > that
> > >> we don't have regression on the old innocent (but good) clients.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >>
> > >> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <ju...@confluent.io> wrote:
> > >>
> > >>> Hi, Jiangjie,
> > >>>
> > >>> 3. If a client closes a socket connection, typically the server only
> > >>> finds
> > >>> this out by reading from the channel and getting a negative size
> during
> > >>> the
> > >>> read. So, if a channel is muted by the server, the server won't be
> able
> > >>> to
> > >>> detect the closing of the connection by the client after the socket
> is
> > >>> unmuted. That's probably what Rajini wants to convey.
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Jun
> > >>>
> > >>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <be...@gmail.com>
> > wrote:
> > >>>
> > >>> > Thanks Rajini.
> > >>> >
> > >>> > 1. Good point. We do need to bump up the protocol version so that
> the
> > >>> new
> > >>> > clients do not wait for another throttle time when they are talking
> > to
> > >>> old
> > >>> > brokers. I'll update the KIP.
> > >>> >
> > >>> > 2. That is true. But the client was not supposed to send request to
> > the
> > >>> > broker during that period anyways. So detecting the broker failure
> > >>> later
> > >>> > seems fine?
> > >>> >
> > >>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as the
> current
> > >>> state?
> > >>> > Currently the broker will still mute the socket until it sends the
> > >>> response
> > >>> > back. If the clients disconnect while they are being throttled, the
> > >>> closed
> > >>> > socket will not be detected until the throttle time has passed.
> > >>> >
> > >>> > Jun also suggested to bound the time by metric.sample.window.ms in
> > the
> > >>> > ticket. I am not sure about the bound on throttle time. It seems a
> > >>> little
> > >>> > difficult to come up with a good bound. If the bound is too large,
> it
> > >>> does
> > >>> > not really help solve the various timeout issue we may face. If the
> > >>> bound
> > >>> > is too low, the quota is essentially not honored. We may
> potentially
> > >>> treat
> > >>> > different requests differently, but that seems too complicated and
> > >>> error
> > >>> > prone.
> > >>> >
> > >>> > IMO, the key improvement we want to make is to tell the clients how
> > >>> long
> > >>> > they will be throttled so the clients knows what happened so they
> can
> > >>> act
> > >>> > accordingly instead of waiting naively. Muting the socket on the
> > broker
> > >>> > side is just in case of non-cooperating clients. For the existing
> > >>> clients,
> > >>> > it seems this does not have much impact compare with what we have
> > now.
> > >>> >
> > >>> > Thanks,
> > >>> >
> > >>> > Jiangjie (Becket) Qin
> > >>> >
> > >>> >
> > >>> >
> > >>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <
> > >>> rajinisivaram@gmail.com>
> > >>> > wrote:
> > >>> >
> > >>> > > Hi Becket,
> > >>> > >
> > >>> > > Thank you for the KIP. A few comments:
> > >>> > >
> > >>> > > 1.KIP says:  "*No public interface changes are needed. We only
> > >>> propose
> > >>> > > behavior change on the broker side.*"
> > >>> > >
> > >>> > > But from the proposed changes, it sounds like clients will be
> > >>> updated to
> > >>> > > wait for throttle-time before sending next response, and also not
> > >>> handle
> > >>> > > idle disconnections during that time. Doesn't that mean that
> > clients
> > >>> need
> > >>> > > to know that the broker has sent the response before throttling,
> > >>> > requiring
> > >>> > > protocol/version change?
> > >>> > >
> > >>> > >
> > >>> > > 2. At the moment, broker failures are detected by clients (and
> vice
> > >>> > versa)
> > >>> > > within connections.max.idle.ms. By removing this check for an
> > >>> unlimited
> > >>> > > throttle time, failure detection could be delayed.
> > >>> > >
> > >>> > >
> > >>> > > 3. KIP says  "*Since this subsequent request is not actually
> > handled
> > >>> > until
> > >>> > > the broker unmutes the channel, the client can hit
> > >>> request.timeout.ms
> > >>> > > <http://request.timeout.ms> and reconnect. However, this is no
> > worse
> > >>> > than
> > >>> > > the current state.*"
> > >>> > >
> > >>> > > I think this could be worse than the current state because broker
> > >>> doesn't
> > >>> > > detect and close the channel for an unlimited throttle time,
> while
> > >>> new
> > >>> > > connections will get accepted. As a result, lot of connections
> > could
> > >>> be
> > >>> > in
> > >>> > > CLOSE_WAIT state when throttle time is high.
> > >>> > >
> > >>> > >
> > >>> > > Perhaps it is better to combine this KIP with a bound on throttle
> > >>> time?
> > >>> > >
> > >>> > >
> > >>> > > Regards,
> > >>> > >
> > >>> > >
> > >>> > > Rajini
> > >>> > >
> > >>> > >
> > >>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <becket.qin@gmail.com
> >
> > >>> wrote:
> > >>> > >
> > >>> > > > Thanks for the comment, Jun,
> > >>> > > >
> > >>> > > > 1. Yes, you are right. This could also happen with the current
> > >>> quota
> > >>> > > > mechanism because we are essentially muting the socket during
> > >>> throttle
> > >>> > > > time. There might be two ways to solve this.
> > >>> > > > A) use another socket to send heartbeat.
> > >>> > > > B) let the GroupCoordinator know that the client will not
> > >>> heartbeat for
> > >>> > > > some time.
> > >>> > > > It seems the first solution is cleaner.
> > >>> > > >
> > >>> > > > 2. For consumer it seems returning an empty response is a
> better
> > >>> > option.
> > >>> > > In
> > >>> > > > the producer case, if there is a spike in traffic. The brokers
> > >>> will see
> > >>> > > > queued up requests, but that is hard to avoid unless we have
> > >>> connection
> > >>> > > > level quota, which is a bigger change and may be easier to
> > discuss
> > >>> it
> > >>> > in
> > >>> > > a
> > >>> > > > separate KIP.
> > >>> > > >
> > >>> > > > Thanks,
> > >>> > > >
> > >>> > > > Jiangjie (Becket) Qin
> > >>> > > >
> > >>> > > >
> > >>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <ju...@confluent.io>
> > wrote:
> > >>> > > >
> > >>> > > > > Hi, Jiangjie,
> > >>> > > > >
> > >>> > > > > Thanks for bringing this up. A couple of quick thoughts.
> > >>> > > > >
> > >>> > > > > 1. If the throttle time is large, what can happen is that a
> > >>> consumer
> > >>> > > > won't
> > >>> > > > > be able to heart beat to the group coordinator frequent
> enough.
> > >>> In
> > >>> > that
> > >>> > > > > case, even with this KIP, it seems there could be frequent
> > >>> consumer
> > >>> > > group
> > >>> > > > > rebalances.
> > >>> > > > >
> > >>> > > > > 2. If we return a response immediately, for the consumer, do
> we
> > >>> > return
> > >>> > > > the
> > >>> > > > > requested data or an empty response? If we do the former, it
> > may
> > >>> not
> > >>> > > > > protect against the case when there are multiple consumer
> > >>> instances
> > >>> > > > > associated with the same user/clientid.
> > >>> > > > >
> > >>> > > > > Jun
> > >>> > > > >
> > >>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <
> > becket.qin@gmail.com
> > >>> >
> > >>> > > wrote:
> > >>> > > > >
> > >>> > > > > > Hi,
> > >>> > > > > >
> > >>> > > > > > We would like to start the discussion on KIP-219.
> > >>> > > > > >
> > >>> > > > > > The KIP tries to improve quota throttling time
> communication
> > >>> > between
> > >>> > > > > > brokers and clients to avoid clients timeout in case of
> long
> > >>> > > throttling
> > >>> > > > > > time.
> > >>> > > > > >
> > >>> > > > > > The KIP link is following:
> > >>> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>> > > > > 219+-+Improve+quota+
> > >>> > > > > > communication
> > >>> > > > > >
> > >>> > > > > > Comments are welcome.
> > >>> > > > > >
> > >>> > > > > > Thanks,
> > >>> > > > > >
> > >>> > > > > > Jiangjie (Becket) Qin
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> > >
> >
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Rajini Sivaram <ra...@gmail.com>.
Hi Becket,

For the specific scenario that you described, would it be possible to use
user quotas rather than client-id quotas? With user quotas, perhaps we can
throttle more easily before reading requests as well (as you mentioned, the
difficulty with client-id quota is that we have to read partial requests
and handle client-ids at network layer making that a much bigger change).
If your clients are using SASL/SSL, I was wondering whether a solution that
improves user quotas and limits throttle time would work for you.

Regards,

Rajini



On Thu, Nov 9, 2017 at 12:59 AM, Becket Qin <be...@gmail.com> wrote:

> Since we will bump up the wire request version, another option is for
> clients that are sending old request versions the broker can just keep the
> current behavior. For clients sending the new request versions, the broker
> can respond then mute the channel as described in the KIP wiki. In this
> case, muting the channel is mostly for protection purpose. A correctly
> implemented client should back off for throttle time before sending the
> next request. The downside is that the broker needs to keep both logic and
> it seems not gaining much benefit. So personally I prefer to just mute the
> channel. But I am open to different opinions.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <be...@gmail.com> wrote:
>
> > Hi Jun,
> >
> > Hmm, even if a connection is closed by the client when the channel is
> > muted. After the channel is unmuted, it seems Selector.select() will
> detect
> > this and close the socket.
> > It is true that before the channel is unmuted the socket will be in a
> > CLOSE_WAIT state though. So having an arbitrarily long muted duration may
> > indeed cause problem.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <be...@gmail.com> wrote:
> >
> >> Hi Rajini,
> >>
> >> Thanks for the detail explanation. Please see the reply below:
> >>
> >> 2. Limiting the throttle time to connection.max.idle.ms on the broker
> >> side is probably fine. However, clients may have a different
> configuration
> >> of connection.max.idle.ms and still reconnect before the throttle time
> >> (which is the server side connection.max.idle.ms). It seems another
> back
> >> door for quota.
> >>
> >> 3. I agree we could just mute the server socket until
> >> connection.max.idle.ms if the massive CLOSE_WAIT is a big issue. This
> >> helps guarantee only connection_rate * connection.max.idle.ms sockets
> >> will be in CLOSE_WAIT state. For cooperative clients, unmuting the
> socket
> >> will not have negative impact.
> >>
> >> 4. My concern for capping the throttle time to metrics.window.ms is
> that
> >> we will not be able to enforce quota effectively. It might be useful to
> >> explain this with a real example we are trying to solve. We have a
> >> MapReduce job pushing data to a Kafka cluster. The MapReduce job has
> >> hundreds of producers and each of them sends a normal sized
> ProduceRequest
> >> (~2 MB) to each of the brokers in the cluster. Apparently the client id
> >> will ran out of bytes quota pretty quickly, and the broker started to
> >> throttle the producers. The throttle time could actually be pretty long
> >> (e.g. a few minute). At that point, request queue time on the brokers
> was
> >> around 30 seconds. After that, a bunch of producer hit
> request.timeout.ms
> >> and reconnected and sent the next request again, which causes another
> spike
> >> and a longer queue.
> >>
> >> In the above case, unless we set the quota window to be pretty big, we
> >> will not be able to enforce the quota. And if we set the window size to
> a
> >> large value, the request might be throttled for longer than
> >> connection.max.idle.ms.
> >>
> >> > We need a solution to improve flow control for well-behaved clients
> >> > which currently rely entirely on broker's throttling. The KIP
> addresses
> >> > this using co-operative clients that sleep for an unbounded throttle
> >> time.
> >> > I feel this is not ideal since the result is traffic with a lot of
> >> spikes.
> >> > Feedback from brokers to enable flow control in the client is a good
> >> idea,
> >> > but clients with excessive throttle times should really have been
> >> > configured with smaller batch sizes.
> >>
> >> This is not really about a single producer with large size, it is a lot
> >> of small producers talking to the client at the same time. Reducing the
> >> batch size does not help much here. Also note that after the spike
> >> traffic at the very beginning, the throttle time of the ProduceRequests
> >> processed later are actually going to be increasing (for example, the
> first
> >> throttled request will be throttled for 1 second, the second throttled
> >> request will be throttled for 10 sec, etc.). Due to the throttle time
> >> variation, if every producer honors the throttle time, there will not
> be a
> >> next spike after the first produce.
> >>
> >> > We need a solution to enforce smaller quotas to protect the broker
> >> > from misbehaving clients. The KIP addresses this by muting channels
> for
> >> an
> >> > unbounded time. This introduces problems of channels in CLOSE_WAIT.
> And
> >> > doesn't really solve all issues with misbehaving clients since new
> >> > connections can be created to bypass quotas.
> >>
> >> Our current quota only protects cooperating clients because our quota is
> >> really throttling the NEXT request after process a request even if this
> >> request itself has already violated quota. The misbehaving client are
> not
> >> protected at all with the current quota mechanism. Like you mentioned, a
> >> connection quota is required. We have been discussing about this at
> >> LinkedIn for some time. Doing it right requires some major changes such
> as
> >> partially reading a request to identify the client id at network level
> and
> >> disconnect misbehaving clients.
> >>
> >> While handling misbehaving clients is important, we are not trying to
> >> address that in this KIP. This KIP is trying to improve the
> communication
> >> with good clients. Muting the channel is more of a migration plan so
> that
> >> we don't have regression on the old innocent (but good) clients.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >>
> >> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <ju...@confluent.io> wrote:
> >>
> >>> Hi, Jiangjie,
> >>>
> >>> 3. If a client closes a socket connection, typically the server only
> >>> finds
> >>> this out by reading from the channel and getting a negative size during
> >>> the
> >>> read. So, if a channel is muted by the server, the server won't be able
> >>> to
> >>> detect the closing of the connection by the client after the socket is
> >>> unmuted. That's probably what Rajini wants to convey.
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <be...@gmail.com>
> wrote:
> >>>
> >>> > Thanks Rajini.
> >>> >
> >>> > 1. Good point. We do need to bump up the protocol version so that the
> >>> new
> >>> > clients do not wait for another throttle time when they are talking
> to
> >>> old
> >>> > brokers. I'll update the KIP.
> >>> >
> >>> > 2. That is true. But the client was not supposed to send request to
> the
> >>> > broker during that period anyways. So detecting the broker failure
> >>> later
> >>> > seems fine?
> >>> >
> >>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as the current
> >>> state?
> >>> > Currently the broker will still mute the socket until it sends the
> >>> response
> >>> > back. If the clients disconnect while they are being throttled, the
> >>> closed
> >>> > socket will not be detected until the throttle time has passed.
> >>> >
> >>> > Jun also suggested to bound the time by metric.sample.window.ms in
> the
> >>> > ticket. I am not sure about the bound on throttle time. It seems a
> >>> little
> >>> > difficult to come up with a good bound. If the bound is too large, it
> >>> does
> >>> > not really help solve the various timeout issue we may face. If the
> >>> bound
> >>> > is too low, the quota is essentially not honored. We may potentially
> >>> treat
> >>> > different requests differently, but that seems too complicated and
> >>> error
> >>> > prone.
> >>> >
> >>> > IMO, the key improvement we want to make is to tell the clients how
> >>> long
> >>> > they will be throttled so the clients knows what happened so they can
> >>> act
> >>> > accordingly instead of waiting naively. Muting the socket on the
> broker
> >>> > side is just in case of non-cooperating clients. For the existing
> >>> clients,
> >>> > it seems this does not have much impact compare with what we have
> now.
> >>> >
> >>> > Thanks,
> >>> >
> >>> > Jiangjie (Becket) Qin
> >>> >
> >>> >
> >>> >
> >>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <
> >>> rajinisivaram@gmail.com>
> >>> > wrote:
> >>> >
> >>> > > Hi Becket,
> >>> > >
> >>> > > Thank you for the KIP. A few comments:
> >>> > >
> >>> > > 1.KIP says:  "*No public interface changes are needed. We only
> >>> propose
> >>> > > behavior change on the broker side.*"
> >>> > >
> >>> > > But from the proposed changes, it sounds like clients will be
> >>> updated to
> >>> > > wait for throttle-time before sending next response, and also not
> >>> handle
> >>> > > idle disconnections during that time. Doesn't that mean that
> clients
> >>> need
> >>> > > to know that the broker has sent the response before throttling,
> >>> > requiring
> >>> > > protocol/version change?
> >>> > >
> >>> > >
> >>> > > 2. At the moment, broker failures are detected by clients (and vice
> >>> > versa)
> >>> > > within connections.max.idle.ms. By removing this check for an
> >>> unlimited
> >>> > > throttle time, failure detection could be delayed.
> >>> > >
> >>> > >
> >>> > > 3. KIP says  "*Since this subsequent request is not actually
> handled
> >>> > until
> >>> > > the broker unmutes the channel, the client can hit
> >>> request.timeout.ms
> >>> > > <http://request.timeout.ms> and reconnect. However, this is no
> worse
> >>> > than
> >>> > > the current state.*"
> >>> > >
> >>> > > I think this could be worse than the current state because broker
> >>> doesn't
> >>> > > detect and close the channel for an unlimited throttle time, while
> >>> new
> >>> > > connections will get accepted. As a result, lot of connections
> could
> >>> be
> >>> > in
> >>> > > CLOSE_WAIT state when throttle time is high.
> >>> > >
> >>> > >
> >>> > > Perhaps it is better to combine this KIP with a bound on throttle
> >>> time?
> >>> > >
> >>> > >
> >>> > > Regards,
> >>> > >
> >>> > >
> >>> > > Rajini
> >>> > >
> >>> > >
> >>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <be...@gmail.com>
> >>> wrote:
> >>> > >
> >>> > > > Thanks for the comment, Jun,
> >>> > > >
> >>> > > > 1. Yes, you are right. This could also happen with the current
> >>> quota
> >>> > > > mechanism because we are essentially muting the socket during
> >>> throttle
> >>> > > > time. There might be two ways to solve this.
> >>> > > > A) use another socket to send heartbeat.
> >>> > > > B) let the GroupCoordinator know that the client will not
> >>> heartbeat for
> >>> > > > some time.
> >>> > > > It seems the first solution is cleaner.
> >>> > > >
> >>> > > > 2. For consumer it seems returning an empty response is a better
> >>> > option.
> >>> > > In
> >>> > > > the producer case, if there is a spike in traffic. The brokers
> >>> will see
> >>> > > > queued up requests, but that is hard to avoid unless we have
> >>> connection
> >>> > > > level quota, which is a bigger change and may be easier to
> discuss
> >>> it
> >>> > in
> >>> > > a
> >>> > > > separate KIP.
> >>> > > >
> >>> > > > Thanks,
> >>> > > >
> >>> > > > Jiangjie (Becket) Qin
> >>> > > >
> >>> > > >
> >>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <ju...@confluent.io>
> wrote:
> >>> > > >
> >>> > > > > Hi, Jiangjie,
> >>> > > > >
> >>> > > > > Thanks for bringing this up. A couple of quick thoughts.
> >>> > > > >
> >>> > > > > 1. If the throttle time is large, what can happen is that a
> >>> consumer
> >>> > > > won't
> >>> > > > > be able to heart beat to the group coordinator frequent enough.
> >>> In
> >>> > that
> >>> > > > > case, even with this KIP, it seems there could be frequent
> >>> consumer
> >>> > > group
> >>> > > > > rebalances.
> >>> > > > >
> >>> > > > > 2. If we return a response immediately, for the consumer, do we
> >>> > return
> >>> > > > the
> >>> > > > > requested data or an empty response? If we do the former, it
> may
> >>> not
> >>> > > > > protect against the case when there are multiple consumer
> >>> instances
> >>> > > > > associated with the same user/clientid.
> >>> > > > >
> >>> > > > > Jun
> >>> > > > >
> >>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <
> becket.qin@gmail.com
> >>> >
> >>> > > wrote:
> >>> > > > >
> >>> > > > > > Hi,
> >>> > > > > >
> >>> > > > > > We would like to start the discussion on KIP-219.
> >>> > > > > >
> >>> > > > > > The KIP tries to improve quota throttling time communication
> >>> > between
> >>> > > > > > brokers and clients to avoid clients timeout in case of long
> >>> > > throttling
> >>> > > > > > time.
> >>> > > > > >
> >>> > > > > > The KIP link is following:
> >>> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>> > > > > 219+-+Improve+quota+
> >>> > > > > > communication
> >>> > > > > >
> >>> > > > > > Comments are welcome.
> >>> > > > > >
> >>> > > > > > Thanks,
> >>> > > > > >
> >>> > > > > > Jiangjie (Becket) Qin
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Becket Qin <be...@gmail.com>.
Since we will bump up the wire request version, another option is for
clients that are sending old request versions the broker can just keep the
current behavior. For clients sending the new request versions, the broker
can respond then mute the channel as described in the KIP wiki. In this
case, muting the channel is mostly for protection purpose. A correctly
implemented client should back off for throttle time before sending the
next request. The downside is that the broker needs to keep both logic and
it seems not gaining much benefit. So personally I prefer to just mute the
channel. But I am open to different opinions.

Thanks,

Jiangjie (Becket) Qin

On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <be...@gmail.com> wrote:

> Hi Jun,
>
> Hmm, even if a connection is closed by the client when the channel is
> muted. After the channel is unmuted, it seems Selector.select() will detect
> this and close the socket.
> It is true that before the channel is unmuted the socket will be in a
> CLOSE_WAIT state though. So having an arbitrarily long muted duration may
> indeed cause problem.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <be...@gmail.com> wrote:
>
>> Hi Rajini,
>>
>> Thanks for the detail explanation. Please see the reply below:
>>
>> 2. Limiting the throttle time to connection.max.idle.ms on the broker
>> side is probably fine. However, clients may have a different configuration
>> of connection.max.idle.ms and still reconnect before the throttle time
>> (which is the server side connection.max.idle.ms). It seems another back
>> door for quota.
>>
>> 3. I agree we could just mute the server socket until
>> connection.max.idle.ms if the massive CLOSE_WAIT is a big issue. This
>> helps guarantee only connection_rate * connection.max.idle.ms sockets
>> will be in CLOSE_WAIT state. For cooperative clients, unmuting the socket
>> will not have negative impact.
>>
>> 4. My concern for capping the throttle time to metrics.window.ms is that
>> we will not be able to enforce quota effectively. It might be useful to
>> explain this with a real example we are trying to solve. We have a
>> MapReduce job pushing data to a Kafka cluster. The MapReduce job has
>> hundreds of producers and each of them sends a normal sized ProduceRequest
>> (~2 MB) to each of the brokers in the cluster. Apparently the client id
>> will ran out of bytes quota pretty quickly, and the broker started to
>> throttle the producers. The throttle time could actually be pretty long
>> (e.g. a few minute). At that point, request queue time on the brokers was
>> around 30 seconds. After that, a bunch of producer hit request.timeout.ms
>> and reconnected and sent the next request again, which causes another spike
>> and a longer queue.
>>
>> In the above case, unless we set the quota window to be pretty big, we
>> will not be able to enforce the quota. And if we set the window size to a
>> large value, the request might be throttled for longer than
>> connection.max.idle.ms.
>>
>> > We need a solution to improve flow control for well-behaved clients
>> > which currently rely entirely on broker's throttling. The KIP addresses
>> > this using co-operative clients that sleep for an unbounded throttle
>> time.
>> > I feel this is not ideal since the result is traffic with a lot of
>> spikes.
>> > Feedback from brokers to enable flow control in the client is a good
>> idea,
>> > but clients with excessive throttle times should really have been
>> > configured with smaller batch sizes.
>>
>> This is not really about a single producer with large size, it is a lot
>> of small producers talking to the client at the same time. Reducing the
>> batch size does not help much here. Also note that after the spike
>> traffic at the very beginning, the throttle time of the ProduceRequests
>> processed later are actually going to be increasing (for example, the first
>> throttled request will be throttled for 1 second, the second throttled
>> request will be throttled for 10 sec, etc.). Due to the throttle time
>> variation, if every producer honors the throttle time, there will not be a
>> next spike after the first produce.
>>
>> > We need a solution to enforce smaller quotas to protect the broker
>> > from misbehaving clients. The KIP addresses this by muting channels for
>> an
>> > unbounded time. This introduces problems of channels in CLOSE_WAIT. And
>> > doesn't really solve all issues with misbehaving clients since new
>> > connections can be created to bypass quotas.
>>
>> Our current quota only protects cooperating clients because our quota is
>> really throttling the NEXT request after process a request even if this
>> request itself has already violated quota. The misbehaving client are not
>> protected at all with the current quota mechanism. Like you mentioned, a
>> connection quota is required. We have been discussing about this at
>> LinkedIn for some time. Doing it right requires some major changes such as
>> partially reading a request to identify the client id at network level and
>> disconnect misbehaving clients.
>>
>> While handling misbehaving clients is important, we are not trying to
>> address that in this KIP. This KIP is trying to improve the communication
>> with good clients. Muting the channel is more of a migration plan so that
>> we don't have regression on the old innocent (but good) clients.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <ju...@confluent.io> wrote:
>>
>>> Hi, Jiangjie,
>>>
>>> 3. If a client closes a socket connection, typically the server only
>>> finds
>>> this out by reading from the channel and getting a negative size during
>>> the
>>> read. So, if a channel is muted by the server, the server won't be able
>>> to
>>> detect the closing of the connection by the client after the socket is
>>> unmuted. That's probably what Rajini wants to convey.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <be...@gmail.com> wrote:
>>>
>>> > Thanks Rajini.
>>> >
>>> > 1. Good point. We do need to bump up the protocol version so that the
>>> new
>>> > clients do not wait for another throttle time when they are talking to
>>> old
>>> > brokers. I'll update the KIP.
>>> >
>>> > 2. That is true. But the client was not supposed to send request to the
>>> > broker during that period anyways. So detecting the broker failure
>>> later
>>> > seems fine?
>>> >
>>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as the current
>>> state?
>>> > Currently the broker will still mute the socket until it sends the
>>> response
>>> > back. If the clients disconnect while they are being throttled, the
>>> closed
>>> > socket will not be detected until the throttle time has passed.
>>> >
>>> > Jun also suggested to bound the time by metric.sample.window.ms in the
>>> > ticket. I am not sure about the bound on throttle time. It seems a
>>> little
>>> > difficult to come up with a good bound. If the bound is too large, it
>>> does
>>> > not really help solve the various timeout issue we may face. If the
>>> bound
>>> > is too low, the quota is essentially not honored. We may potentially
>>> treat
>>> > different requests differently, but that seems too complicated and
>>> error
>>> > prone.
>>> >
>>> > IMO, the key improvement we want to make is to tell the clients how
>>> long
>>> > they will be throttled so the clients knows what happened so they can
>>> act
>>> > accordingly instead of waiting naively. Muting the socket on the broker
>>> > side is just in case of non-cooperating clients. For the existing
>>> clients,
>>> > it seems this does not have much impact compare with what we have now.
>>> >
>>> > Thanks,
>>> >
>>> > Jiangjie (Becket) Qin
>>> >
>>> >
>>> >
>>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <
>>> rajinisivaram@gmail.com>
>>> > wrote:
>>> >
>>> > > Hi Becket,
>>> > >
>>> > > Thank you for the KIP. A few comments:
>>> > >
>>> > > 1.KIP says:  "*No public interface changes are needed. We only
>>> propose
>>> > > behavior change on the broker side.*"
>>> > >
>>> > > But from the proposed changes, it sounds like clients will be
>>> updated to
>>> > > wait for throttle-time before sending next response, and also not
>>> handle
>>> > > idle disconnections during that time. Doesn't that mean that clients
>>> need
>>> > > to know that the broker has sent the response before throttling,
>>> > requiring
>>> > > protocol/version change?
>>> > >
>>> > >
>>> > > 2. At the moment, broker failures are detected by clients (and vice
>>> > versa)
>>> > > within connections.max.idle.ms. By removing this check for an
>>> unlimited
>>> > > throttle time, failure detection could be delayed.
>>> > >
>>> > >
>>> > > 3. KIP says  "*Since this subsequent request is not actually handled
>>> > until
>>> > > the broker unmutes the channel, the client can hit
>>> request.timeout.ms
>>> > > <http://request.timeout.ms> and reconnect. However, this is no worse
>>> > than
>>> > > the current state.*"
>>> > >
>>> > > I think this could be worse than the current state because broker
>>> doesn't
>>> > > detect and close the channel for an unlimited throttle time, while
>>> new
>>> > > connections will get accepted. As a result, lot of connections could
>>> be
>>> > in
>>> > > CLOSE_WAIT state when throttle time is high.
>>> > >
>>> > >
>>> > > Perhaps it is better to combine this KIP with a bound on throttle
>>> time?
>>> > >
>>> > >
>>> > > Regards,
>>> > >
>>> > >
>>> > > Rajini
>>> > >
>>> > >
>>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <be...@gmail.com>
>>> wrote:
>>> > >
>>> > > > Thanks for the comment, Jun,
>>> > > >
>>> > > > 1. Yes, you are right. This could also happen with the current
>>> quota
>>> > > > mechanism because we are essentially muting the socket during
>>> throttle
>>> > > > time. There might be two ways to solve this.
>>> > > > A) use another socket to send heartbeat.
>>> > > > B) let the GroupCoordinator know that the client will not
>>> heartbeat for
>>> > > > some time.
>>> > > > It seems the first solution is cleaner.
>>> > > >
>>> > > > 2. For consumer it seems returning an empty response is a better
>>> > option.
>>> > > In
>>> > > > the producer case, if there is a spike in traffic. The brokers
>>> will see
>>> > > > queued up requests, but that is hard to avoid unless we have
>>> connection
>>> > > > level quota, which is a bigger change and may be easier to discuss
>>> it
>>> > in
>>> > > a
>>> > > > separate KIP.
>>> > > >
>>> > > > Thanks,
>>> > > >
>>> > > > Jiangjie (Becket) Qin
>>> > > >
>>> > > >
>>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <ju...@confluent.io> wrote:
>>> > > >
>>> > > > > Hi, Jiangjie,
>>> > > > >
>>> > > > > Thanks for bringing this up. A couple of quick thoughts.
>>> > > > >
>>> > > > > 1. If the throttle time is large, what can happen is that a
>>> consumer
>>> > > > won't
>>> > > > > be able to heart beat to the group coordinator frequent enough.
>>> In
>>> > that
>>> > > > > case, even with this KIP, it seems there could be frequent
>>> consumer
>>> > > group
>>> > > > > rebalances.
>>> > > > >
>>> > > > > 2. If we return a response immediately, for the consumer, do we
>>> > return
>>> > > > the
>>> > > > > requested data or an empty response? If we do the former, it may
>>> not
>>> > > > > protect against the case when there are multiple consumer
>>> instances
>>> > > > > associated with the same user/clientid.
>>> > > > >
>>> > > > > Jun
>>> > > > >
>>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <becket.qin@gmail.com
>>> >
>>> > > wrote:
>>> > > > >
>>> > > > > > Hi,
>>> > > > > >
>>> > > > > > We would like to start the discussion on KIP-219.
>>> > > > > >
>>> > > > > > The KIP tries to improve quota throttling time communication
>>> > between
>>> > > > > > brokers and clients to avoid clients timeout in case of long
>>> > > throttling
>>> > > > > > time.
>>> > > > > >
>>> > > > > > The KIP link is following:
>>> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> > > > > 219+-+Improve+quota+
>>> > > > > > communication
>>> > > > > >
>>> > > > > > Comments are welcome.
>>> > > > > >
>>> > > > > > Thanks,
>>> > > > > >
>>> > > > > > Jiangjie (Becket) Qin
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Becket Qin <be...@gmail.com>.
Hi Jun,

Hmm, even if a connection is closed by the client when the channel is
muted. After the channel is unmuted, it seems Selector.select() will detect
this and close the socket.
It is true that before the channel is unmuted the socket will be in a
CLOSE_WAIT state though. So having an arbitrarily long muted duration may
indeed cause problem.

Thanks,

Jiangjie (Becket) Qin

On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <be...@gmail.com> wrote:

> Hi Rajini,
>
> Thanks for the detail explanation. Please see the reply below:
>
> 2. Limiting the throttle time to connection.max.idle.ms on the broker
> side is probably fine. However, clients may have a different configuration
> of connection.max.idle.ms and still reconnect before the throttle time
> (which is the server side connection.max.idle.ms). It seems another back
> door for quota.
>
> 3. I agree we could just mute the server socket until
> connection.max.idle.ms if the massive CLOSE_WAIT is a big issue. This
> helps guarantee only connection_rate * connection.max.idle.ms sockets
> will be in CLOSE_WAIT state. For cooperative clients, unmuting the socket
> will not have negative impact.
>
> 4. My concern for capping the throttle time to metrics.window.ms is that
> we will not be able to enforce quota effectively. It might be useful to
> explain this with a real example we are trying to solve. We have a
> MapReduce job pushing data to a Kafka cluster. The MapReduce job has
> hundreds of producers and each of them sends a normal sized ProduceRequest
> (~2 MB) to each of the brokers in the cluster. Apparently the client id
> will ran out of bytes quota pretty quickly, and the broker started to
> throttle the producers. The throttle time could actually be pretty long
> (e.g. a few minute). At that point, request queue time on the brokers was
> around 30 seconds. After that, a bunch of producer hit request.timeout.ms
> and reconnected and sent the next request again, which causes another spike
> and a longer queue.
>
> In the above case, unless we set the quota window to be pretty big, we
> will not be able to enforce the quota. And if we set the window size to a
> large value, the request might be throttled for longer than
> connection.max.idle.ms.
>
> > We need a solution to improve flow control for well-behaved clients
> > which currently rely entirely on broker's throttling. The KIP addresses
> > this using co-operative clients that sleep for an unbounded throttle
> time.
> > I feel this is not ideal since the result is traffic with a lot of
> spikes.
> > Feedback from brokers to enable flow control in the client is a good
> idea,
> > but clients with excessive throttle times should really have been
> > configured with smaller batch sizes.
>
> This is not really about a single producer with large size, it is a lot of
> small producers talking to the client at the same time. Reducing the batch
> size does not help much here. Also note that after the spike traffic at
> the very beginning, the throttle time of the ProduceRequests processed
> later are actually going to be increasing (for example, the first throttled
> request will be throttled for 1 second, the second throttled request will
> be throttled for 10 sec, etc.). Due to the throttle time variation, if
> every producer honors the throttle time, there will not be a next spike
> after the first produce.
>
> > We need a solution to enforce smaller quotas to protect the broker
> > from misbehaving clients. The KIP addresses this by muting channels for
> an
> > unbounded time. This introduces problems of channels in CLOSE_WAIT. And
> > doesn't really solve all issues with misbehaving clients since new
> > connections can be created to bypass quotas.
>
> Our current quota only protects cooperating clients because our quota is
> really throttling the NEXT request after process a request even if this
> request itself has already violated quota. The misbehaving client are not
> protected at all with the current quota mechanism. Like you mentioned, a
> connection quota is required. We have been discussing about this at
> LinkedIn for some time. Doing it right requires some major changes such as
> partially reading a request to identify the client id at network level and
> disconnect misbehaving clients.
>
> While handling misbehaving clients is important, we are not trying to
> address that in this KIP. This KIP is trying to improve the communication
> with good clients. Muting the channel is more of a migration plan so that
> we don't have regression on the old innocent (but good) clients.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <ju...@confluent.io> wrote:
>
>> Hi, Jiangjie,
>>
>> 3. If a client closes a socket connection, typically the server only finds
>> this out by reading from the channel and getting a negative size during
>> the
>> read. So, if a channel is muted by the server, the server won't be able to
>> detect the closing of the connection by the client after the socket is
>> unmuted. That's probably what Rajini wants to convey.
>>
>> Thanks,
>>
>> Jun
>>
>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <be...@gmail.com> wrote:
>>
>> > Thanks Rajini.
>> >
>> > 1. Good point. We do need to bump up the protocol version so that the
>> new
>> > clients do not wait for another throttle time when they are talking to
>> old
>> > brokers. I'll update the KIP.
>> >
>> > 2. That is true. But the client was not supposed to send request to the
>> > broker during that period anyways. So detecting the broker failure later
>> > seems fine?
>> >
>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as the current
>> state?
>> > Currently the broker will still mute the socket until it sends the
>> response
>> > back. If the clients disconnect while they are being throttled, the
>> closed
>> > socket will not be detected until the throttle time has passed.
>> >
>> > Jun also suggested to bound the time by metric.sample.window.ms in the
>> > ticket. I am not sure about the bound on throttle time. It seems a
>> little
>> > difficult to come up with a good bound. If the bound is too large, it
>> does
>> > not really help solve the various timeout issue we may face. If the
>> bound
>> > is too low, the quota is essentially not honored. We may potentially
>> treat
>> > different requests differently, but that seems too complicated and error
>> > prone.
>> >
>> > IMO, the key improvement we want to make is to tell the clients how long
>> > they will be throttled so the clients knows what happened so they can
>> act
>> > accordingly instead of waiting naively. Muting the socket on the broker
>> > side is just in case of non-cooperating clients. For the existing
>> clients,
>> > it seems this does not have much impact compare with what we have now.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> >
>> >
>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <rajinisivaram@gmail.com
>> >
>> > wrote:
>> >
>> > > Hi Becket,
>> > >
>> > > Thank you for the KIP. A few comments:
>> > >
>> > > 1.KIP says:  "*No public interface changes are needed. We only propose
>> > > behavior change on the broker side.*"
>> > >
>> > > But from the proposed changes, it sounds like clients will be updated
>> to
>> > > wait for throttle-time before sending next response, and also not
>> handle
>> > > idle disconnections during that time. Doesn't that mean that clients
>> need
>> > > to know that the broker has sent the response before throttling,
>> > requiring
>> > > protocol/version change?
>> > >
>> > >
>> > > 2. At the moment, broker failures are detected by clients (and vice
>> > versa)
>> > > within connections.max.idle.ms. By removing this check for an
>> unlimited
>> > > throttle time, failure detection could be delayed.
>> > >
>> > >
>> > > 3. KIP says  "*Since this subsequent request is not actually handled
>> > until
>> > > the broker unmutes the channel, the client can hit request.timeout.ms
>> > > <http://request.timeout.ms> and reconnect. However, this is no worse
>> > than
>> > > the current state.*"
>> > >
>> > > I think this could be worse than the current state because broker
>> doesn't
>> > > detect and close the channel for an unlimited throttle time, while new
>> > > connections will get accepted. As a result, lot of connections could
>> be
>> > in
>> > > CLOSE_WAIT state when throttle time is high.
>> > >
>> > >
>> > > Perhaps it is better to combine this KIP with a bound on throttle
>> time?
>> > >
>> > >
>> > > Regards,
>> > >
>> > >
>> > > Rajini
>> > >
>> > >
>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <be...@gmail.com>
>> wrote:
>> > >
>> > > > Thanks for the comment, Jun,
>> > > >
>> > > > 1. Yes, you are right. This could also happen with the current quota
>> > > > mechanism because we are essentially muting the socket during
>> throttle
>> > > > time. There might be two ways to solve this.
>> > > > A) use another socket to send heartbeat.
>> > > > B) let the GroupCoordinator know that the client will not heartbeat
>> for
>> > > > some time.
>> > > > It seems the first solution is cleaner.
>> > > >
>> > > > 2. For consumer it seems returning an empty response is a better
>> > option.
>> > > In
>> > > > the producer case, if there is a spike in traffic. The brokers will
>> see
>> > > > queued up requests, but that is hard to avoid unless we have
>> connection
>> > > > level quota, which is a bigger change and may be easier to discuss
>> it
>> > in
>> > > a
>> > > > separate KIP.
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Jiangjie (Becket) Qin
>> > > >
>> > > >
>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <ju...@confluent.io> wrote:
>> > > >
>> > > > > Hi, Jiangjie,
>> > > > >
>> > > > > Thanks for bringing this up. A couple of quick thoughts.
>> > > > >
>> > > > > 1. If the throttle time is large, what can happen is that a
>> consumer
>> > > > won't
>> > > > > be able to heart beat to the group coordinator frequent enough. In
>> > that
>> > > > > case, even with this KIP, it seems there could be frequent
>> consumer
>> > > group
>> > > > > rebalances.
>> > > > >
>> > > > > 2. If we return a response immediately, for the consumer, do we
>> > return
>> > > > the
>> > > > > requested data or an empty response? If we do the former, it may
>> not
>> > > > > protect against the case when there are multiple consumer
>> instances
>> > > > > associated with the same user/clientid.
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <be...@gmail.com>
>> > > wrote:
>> > > > >
>> > > > > > Hi,
>> > > > > >
>> > > > > > We would like to start the discussion on KIP-219.
>> > > > > >
>> > > > > > The KIP tries to improve quota throttling time communication
>> > between
>> > > > > > brokers and clients to avoid clients timeout in case of long
>> > > throttling
>> > > > > > time.
>> > > > > >
>> > > > > > The KIP link is following:
>> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > 219+-+Improve+quota+
>> > > > > > communication
>> > > > > >
>> > > > > > Comments are welcome.
>> > > > > >
>> > > > > > Thanks,
>> > > > > >
>> > > > > > Jiangjie (Becket) Qin
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Becket Qin <be...@gmail.com>.
Hi Rajini,

Thanks for the detail explanation. Please see the reply below:

2. Limiting the throttle time to connection.max.idle.ms on the broker side
is probably fine. However, clients may have a different configuration of
connection.max.idle.ms and still reconnect before the throttle time (which
is the server side connection.max.idle.ms). It seems another back door for
quota.

3. I agree we could just mute the server socket until connection.max.idle.ms
if the massive CLOSE_WAIT is a big issue. This helps guarantee only
connection_rate * connection.max.idle.ms sockets will be in CLOSE_WAIT
state. For cooperative clients, unmuting the socket will not have negative
impact.

4. My concern for capping the throttle time to metrics.window.ms is that we
will not be able to enforce quota effectively. It might be useful to
explain this with a real example we are trying to solve. We have a
MapReduce job pushing data to a Kafka cluster. The MapReduce job has
hundreds of producers and each of them sends a normal sized ProduceRequest
(~2 MB) to each of the brokers in the cluster. Apparently the client id
will ran out of bytes quota pretty quickly, and the broker started to
throttle the producers. The throttle time could actually be pretty long
(e.g. a few minute). At that point, request queue time on the brokers was
around 30 seconds. After that, a bunch of producer hit request.timeout.ms
and reconnected and sent the next request again, which causes another spike
and a longer queue.

In the above case, unless we set the quota window to be pretty big, we will
not be able to enforce the quota. And if we set the window size to a large
value, the request might be throttled for longer than connection.max.idle.ms
.

> We need a solution to improve flow control for well-behaved clients
> which currently rely entirely on broker's throttling. The KIP addresses
> this using co-operative clients that sleep for an unbounded throttle time.
> I feel this is not ideal since the result is traffic with a lot of spikes.
> Feedback from brokers to enable flow control in the client is a good idea,
> but clients with excessive throttle times should really have been
> configured with smaller batch sizes.

This is not really about a single producer with large size, it is a lot of
small producers talking to the client at the same time. Reducing the batch
size does not help much here. Also note that after the spike traffic at the
very beginning, the throttle time of the ProduceRequests processed later
are actually going to be increasing (for example, the first throttled
request will be throttled for 1 second, the second throttled request will
be throttled for 10 sec, etc.). Due to the throttle time variation, if
every producer honors the throttle time, there will not be a next spike
after the first produce.

> We need a solution to enforce smaller quotas to protect the broker
> from misbehaving clients. The KIP addresses this by muting channels for an
> unbounded time. This introduces problems of channels in CLOSE_WAIT. And
> doesn't really solve all issues with misbehaving clients since new
> connections can be created to bypass quotas.

Our current quota only protects cooperating clients because our quota is
really throttling the NEXT request after process a request even if this
request itself has already violated quota. The misbehaving client are not
protected at all with the current quota mechanism. Like you mentioned, a
connection quota is required. We have been discussing about this at
LinkedIn for some time. Doing it right requires some major changes such as
partially reading a request to identify the client id at network level and
disconnect misbehaving clients.

While handling misbehaving clients is important, we are not trying to
address that in this KIP. This KIP is trying to improve the communication
with good clients. Muting the channel is more of a migration plan so that
we don't have regression on the old innocent (but good) clients.

Thanks,

Jiangjie (Becket) Qin


On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <ju...@confluent.io> wrote:

> Hi, Jiangjie,
>
> 3. If a client closes a socket connection, typically the server only finds
> this out by reading from the channel and getting a negative size during the
> read. So, if a channel is muted by the server, the server won't be able to
> detect the closing of the connection by the client after the socket is
> unmuted. That's probably what Rajini wants to convey.
>
> Thanks,
>
> Jun
>
> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <be...@gmail.com> wrote:
>
> > Thanks Rajini.
> >
> > 1. Good point. We do need to bump up the protocol version so that the new
> > clients do not wait for another throttle time when they are talking to
> old
> > brokers. I'll update the KIP.
> >
> > 2. That is true. But the client was not supposed to send request to the
> > broker during that period anyways. So detecting the broker failure later
> > seems fine?
> >
> > 3. Wouldn't the CLOSE_WAIT handler number be the same as the current
> state?
> > Currently the broker will still mute the socket until it sends the
> response
> > back. If the clients disconnect while they are being throttled, the
> closed
> > socket will not be detected until the throttle time has passed.
> >
> > Jun also suggested to bound the time by metric.sample.window.ms in the
> > ticket. I am not sure about the bound on throttle time. It seems a little
> > difficult to come up with a good bound. If the bound is too large, it
> does
> > not really help solve the various timeout issue we may face. If the bound
> > is too low, the quota is essentially not honored. We may potentially
> treat
> > different requests differently, but that seems too complicated and error
> > prone.
> >
> > IMO, the key improvement we want to make is to tell the clients how long
> > they will be throttled so the clients knows what happened so they can act
> > accordingly instead of waiting naively. Muting the socket on the broker
> > side is just in case of non-cooperating clients. For the existing
> clients,
> > it seems this does not have much impact compare with what we have now.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <ra...@gmail.com>
> > wrote:
> >
> > > Hi Becket,
> > >
> > > Thank you for the KIP. A few comments:
> > >
> > > 1.KIP says:  "*No public interface changes are needed. We only propose
> > > behavior change on the broker side.*"
> > >
> > > But from the proposed changes, it sounds like clients will be updated
> to
> > > wait for throttle-time before sending next response, and also not
> handle
> > > idle disconnections during that time. Doesn't that mean that clients
> need
> > > to know that the broker has sent the response before throttling,
> > requiring
> > > protocol/version change?
> > >
> > >
> > > 2. At the moment, broker failures are detected by clients (and vice
> > versa)
> > > within connections.max.idle.ms. By removing this check for an
> unlimited
> > > throttle time, failure detection could be delayed.
> > >
> > >
> > > 3. KIP says  "*Since this subsequent request is not actually handled
> > until
> > > the broker unmutes the channel, the client can hit request.timeout.ms
> > > <http://request.timeout.ms> and reconnect. However, this is no worse
> > than
> > > the current state.*"
> > >
> > > I think this could be worse than the current state because broker
> doesn't
> > > detect and close the channel for an unlimited throttle time, while new
> > > connections will get accepted. As a result, lot of connections could be
> > in
> > > CLOSE_WAIT state when throttle time is high.
> > >
> > >
> > > Perhaps it is better to combine this KIP with a bound on throttle time?
> > >
> > >
> > > Regards,
> > >
> > >
> > > Rajini
> > >
> > >
> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <be...@gmail.com>
> wrote:
> > >
> > > > Thanks for the comment, Jun,
> > > >
> > > > 1. Yes, you are right. This could also happen with the current quota
> > > > mechanism because we are essentially muting the socket during
> throttle
> > > > time. There might be two ways to solve this.
> > > > A) use another socket to send heartbeat.
> > > > B) let the GroupCoordinator know that the client will not heartbeat
> for
> > > > some time.
> > > > It seems the first solution is cleaner.
> > > >
> > > > 2. For consumer it seems returning an empty response is a better
> > option.
> > > In
> > > > the producer case, if there is a spike in traffic. The brokers will
> see
> > > > queued up requests, but that is hard to avoid unless we have
> connection
> > > > level quota, which is a bigger change and may be easier to discuss it
> > in
> > > a
> > > > separate KIP.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > >
> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <ju...@confluent.io> wrote:
> > > >
> > > > > Hi, Jiangjie,
> > > > >
> > > > > Thanks for bringing this up. A couple of quick thoughts.
> > > > >
> > > > > 1. If the throttle time is large, what can happen is that a
> consumer
> > > > won't
> > > > > be able to heart beat to the group coordinator frequent enough. In
> > that
> > > > > case, even with this KIP, it seems there could be frequent consumer
> > > group
> > > > > rebalances.
> > > > >
> > > > > 2. If we return a response immediately, for the consumer, do we
> > return
> > > > the
> > > > > requested data or an empty response? If we do the former, it may
> not
> > > > > protect against the case when there are multiple consumer instances
> > > > > associated with the same user/clientid.
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <be...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > We would like to start the discussion on KIP-219.
> > > > > >
> > > > > > The KIP tries to improve quota throttling time communication
> > between
> > > > > > brokers and clients to avoid clients timeout in case of long
> > > throttling
> > > > > > time.
> > > > > >
> > > > > > The KIP link is following:
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 219+-+Improve+quota+
> > > > > > communication
> > > > > >
> > > > > > Comments are welcome.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Jun Rao <ju...@confluent.io>.
Hi, Jiangjie,

3. If a client closes a socket connection, typically the server only finds
this out by reading from the channel and getting a negative size during the
read. So, if a channel is muted by the server, the server won't be able to
detect the closing of the connection by the client after the socket is
unmuted. That's probably what Rajini wants to convey.

Thanks,

Jun

On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <be...@gmail.com> wrote:

> Thanks Rajini.
>
> 1. Good point. We do need to bump up the protocol version so that the new
> clients do not wait for another throttle time when they are talking to old
> brokers. I'll update the KIP.
>
> 2. That is true. But the client was not supposed to send request to the
> broker during that period anyways. So detecting the broker failure later
> seems fine?
>
> 3. Wouldn't the CLOSE_WAIT handler number be the same as the current state?
> Currently the broker will still mute the socket until it sends the response
> back. If the clients disconnect while they are being throttled, the closed
> socket will not be detected until the throttle time has passed.
>
> Jun also suggested to bound the time by metric.sample.window.ms in the
> ticket. I am not sure about the bound on throttle time. It seems a little
> difficult to come up with a good bound. If the bound is too large, it does
> not really help solve the various timeout issue we may face. If the bound
> is too low, the quota is essentially not honored. We may potentially treat
> different requests differently, but that seems too complicated and error
> prone.
>
> IMO, the key improvement we want to make is to tell the clients how long
> they will be throttled so the clients knows what happened so they can act
> accordingly instead of waiting naively. Muting the socket on the broker
> side is just in case of non-cooperating clients. For the existing clients,
> it seems this does not have much impact compare with what we have now.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <ra...@gmail.com>
> wrote:
>
> > Hi Becket,
> >
> > Thank you for the KIP. A few comments:
> >
> > 1.KIP says:  "*No public interface changes are needed. We only propose
> > behavior change on the broker side.*"
> >
> > But from the proposed changes, it sounds like clients will be updated to
> > wait for throttle-time before sending next response, and also not handle
> > idle disconnections during that time. Doesn't that mean that clients need
> > to know that the broker has sent the response before throttling,
> requiring
> > protocol/version change?
> >
> >
> > 2. At the moment, broker failures are detected by clients (and vice
> versa)
> > within connections.max.idle.ms. By removing this check for an unlimited
> > throttle time, failure detection could be delayed.
> >
> >
> > 3. KIP says  "*Since this subsequent request is not actually handled
> until
> > the broker unmutes the channel, the client can hit request.timeout.ms
> > <http://request.timeout.ms> and reconnect. However, this is no worse
> than
> > the current state.*"
> >
> > I think this could be worse than the current state because broker doesn't
> > detect and close the channel for an unlimited throttle time, while new
> > connections will get accepted. As a result, lot of connections could be
> in
> > CLOSE_WAIT state when throttle time is high.
> >
> >
> > Perhaps it is better to combine this KIP with a bound on throttle time?
> >
> >
> > Regards,
> >
> >
> > Rajini
> >
> >
> > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <be...@gmail.com> wrote:
> >
> > > Thanks for the comment, Jun,
> > >
> > > 1. Yes, you are right. This could also happen with the current quota
> > > mechanism because we are essentially muting the socket during throttle
> > > time. There might be two ways to solve this.
> > > A) use another socket to send heartbeat.
> > > B) let the GroupCoordinator know that the client will not heartbeat for
> > > some time.
> > > It seems the first solution is cleaner.
> > >
> > > 2. For consumer it seems returning an empty response is a better
> option.
> > In
> > > the producer case, if there is a spike in traffic. The brokers will see
> > > queued up requests, but that is hard to avoid unless we have connection
> > > level quota, which is a bigger change and may be easier to discuss it
> in
> > a
> > > separate KIP.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <ju...@confluent.io> wrote:
> > >
> > > > Hi, Jiangjie,
> > > >
> > > > Thanks for bringing this up. A couple of quick thoughts.
> > > >
> > > > 1. If the throttle time is large, what can happen is that a consumer
> > > won't
> > > > be able to heart beat to the group coordinator frequent enough. In
> that
> > > > case, even with this KIP, it seems there could be frequent consumer
> > group
> > > > rebalances.
> > > >
> > > > 2. If we return a response immediately, for the consumer, do we
> return
> > > the
> > > > requested data or an empty response? If we do the former, it may not
> > > > protect against the case when there are multiple consumer instances
> > > > associated with the same user/clientid.
> > > >
> > > > Jun
> > > >
> > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <be...@gmail.com>
> > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > We would like to start the discussion on KIP-219.
> > > > >
> > > > > The KIP tries to improve quota throttling time communication
> between
> > > > > brokers and clients to avoid clients timeout in case of long
> > throttling
> > > > > time.
> > > > >
> > > > > The KIP link is following:
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 219+-+Improve+quota+
> > > > > communication
> > > > >
> > > > > Comments are welcome.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Rajini Sivaram <ra...@gmail.com>.
Hi Becket,

2. Perhaps it is fine, though it may be better to limit throttle time to
*connections.max.idle.ms* <http://connections.max.idle.ms>, which is
typically a rather large number.

3. The maximum CLOSE_WAIT time is currently bounded by *connections.max.idle.ms
<http://connections.max.idle.ms>*. It is a large value, but we will
close client
connections after that even if the connection has been muted. The KIP
proposes to disable idle check for throttled clients and that could be an
issue.

4. I agree that it is hard to come up with a limit for throttle time that
works for all cases. And this is particularly hard since we need a set of
timeouts, sizes and quota limits which all make sense together. I think
Jun's suggestion of basing this limit on the total metric window size makes
sense because we currently don't have any way of enforcing quotas beyond
that (since you can always create a new connection to bypass quotas for
which we dont have any previous metrics).


4a) For request quotas, we already limit throttle time to quota window size.


4b) For fetch requests, limiting throttle time to total metric window size
and returning a smaller amount of data in each response could work well. This
reduces spikes in traffic and would allow clients to heartbeat, rebalance
etc. The minimum achievable quota would then be based on maximum message
size rather than maximum request size. Perhaps that would be sufficient?


4c) For produce requests, I agree that just a bound on throttle time is not
sufficient.


   - We need a solution to improve flow control for well-behaved clients
   which currently rely entirely on broker's throttling. The KIP addresses
   this using co-operative clients that sleep for an unbounded throttle time.
   I feel this is not ideal since the result is traffic with a lot of spikes.
   Feedback from brokers to enable flow control in the client is a good idea,
   but clients with excessive throttle times should really have been
   configured with smaller batch sizes.
   - We need a solution to enforce smaller quotas to protect the broker
   from misbehaving clients. The KIP addresses this by muting channels for an
   unbounded time. This introduces problems of channels in CLOSE_WAIT. And
   doesn't really solve all issues with misbehaving clients since new
   connections can be created to bypass quotas.

I feel that it is a bit difficult to get this right without addressing the
issue with new connections that bypass quotas (issue in the current
implementation rather than the KIP).

Regards,

Rajini


On Sat, Nov 4, 2017 at 3:11 AM, Becket Qin <be...@gmail.com> wrote:

> Thanks Rajini.
>
> 1. Good point. We do need to bump up the protocol version so that the new
> clients do not wait for another throttle time when they are talking to old
> brokers. I'll update the KIP.
>
> 2. That is true. But the client was not supposed to send request to the
> broker during that period anyways. So detecting the broker failure later
> seems fine?
>
> 3. Wouldn't the CLOSE_WAIT handler number be the same as the current state?
> Currently the broker will still mute the socket until it sends the response
> back. If the clients disconnect while they are being throttled, the closed
> socket will not be detected until the throttle time has passed.
>
> Jun also suggested to bound the time by metric.sample.window.ms in the
> ticket. I am not sure about the bound on throttle time. It seems a little
> difficult to come up with a good bound. If the bound is too large, it does
> not really help solve the various timeout issue we may face. If the bound
> is too low, the quota is essentially not honored. We may potentially treat
> different requests differently, but that seems too complicated and error
> prone.
>
> IMO, the key improvement we want to make is to tell the clients how long
> they will be throttled so the clients knows what happened so they can act
> accordingly instead of waiting naively. Muting the socket on the broker
> side is just in case of non-cooperating clients. For the existing clients,
> it seems this does not have much impact compare with what we have now.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <ra...@gmail.com>
> wrote:
>
> > Hi Becket,
> >
> > Thank you for the KIP. A few comments:
> >
> > 1.KIP says:  "*No public interface changes are needed. We only propose
> > behavior change on the broker side.*"
> >
> > But from the proposed changes, it sounds like clients will be updated to
> > wait for throttle-time before sending next response, and also not handle
> > idle disconnections during that time. Doesn't that mean that clients need
> > to know that the broker has sent the response before throttling,
> requiring
> > protocol/version change?
> >
> >
> > 2. At the moment, broker failures are detected by clients (and vice
> versa)
> > within connections.max.idle.ms. By removing this check for an unlimited
> > throttle time, failure detection could be delayed.
> >
> >
> > 3. KIP says  "*Since this subsequent request is not actually handled
> until
> > the broker unmutes the channel, the client can hit request.timeout.ms
> > <http://request.timeout.ms> and reconnect. However, this is no worse
> than
> > the current state.*"
> >
> > I think this could be worse than the current state because broker doesn't
> > detect and close the channel for an unlimited throttle time, while new
> > connections will get accepted. As a result, lot of connections could be
> in
> > CLOSE_WAIT state when throttle time is high.
> >
> >
> > Perhaps it is better to combine this KIP with a bound on throttle time?
> >
> >
> > Regards,
> >
> >
> > Rajini
> >
> >
> > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <be...@gmail.com> wrote:
> >
> > > Thanks for the comment, Jun,
> > >
> > > 1. Yes, you are right. This could also happen with the current quota
> > > mechanism because we are essentially muting the socket during throttle
> > > time. There might be two ways to solve this.
> > > A) use another socket to send heartbeat.
> > > B) let the GroupCoordinator know that the client will not heartbeat for
> > > some time.
> > > It seems the first solution is cleaner.
> > >
> > > 2. For consumer it seems returning an empty response is a better
> option.
> > In
> > > the producer case, if there is a spike in traffic. The brokers will see
> > > queued up requests, but that is hard to avoid unless we have connection
> > > level quota, which is a bigger change and may be easier to discuss it
> in
> > a
> > > separate KIP.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <ju...@confluent.io> wrote:
> > >
> > > > Hi, Jiangjie,
> > > >
> > > > Thanks for bringing this up. A couple of quick thoughts.
> > > >
> > > > 1. If the throttle time is large, what can happen is that a consumer
> > > won't
> > > > be able to heart beat to the group coordinator frequent enough. In
> that
> > > > case, even with this KIP, it seems there could be frequent consumer
> > group
> > > > rebalances.
> > > >
> > > > 2. If we return a response immediately, for the consumer, do we
> return
> > > the
> > > > requested data or an empty response? If we do the former, it may not
> > > > protect against the case when there are multiple consumer instances
> > > > associated with the same user/clientid.
> > > >
> > > > Jun
> > > >
> > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <be...@gmail.com>
> > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > We would like to start the discussion on KIP-219.
> > > > >
> > > > > The KIP tries to improve quota throttling time communication
> between
> > > > > brokers and clients to avoid clients timeout in case of long
> > throttling
> > > > > time.
> > > > >
> > > > > The KIP link is following:
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 219+-+Improve+quota+
> > > > > communication
> > > > >
> > > > > Comments are welcome.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Becket Qin <be...@gmail.com>.
Thanks Rajini.

1. Good point. We do need to bump up the protocol version so that the new
clients do not wait for another throttle time when they are talking to old
brokers. I'll update the KIP.

2. That is true. But the client was not supposed to send request to the
broker during that period anyways. So detecting the broker failure later
seems fine?

3. Wouldn't the CLOSE_WAIT handler number be the same as the current state?
Currently the broker will still mute the socket until it sends the response
back. If the clients disconnect while they are being throttled, the closed
socket will not be detected until the throttle time has passed.

Jun also suggested to bound the time by metric.sample.window.ms in the
ticket. I am not sure about the bound on throttle time. It seems a little
difficult to come up with a good bound. If the bound is too large, it does
not really help solve the various timeout issue we may face. If the bound
is too low, the quota is essentially not honored. We may potentially treat
different requests differently, but that seems too complicated and error
prone.

IMO, the key improvement we want to make is to tell the clients how long
they will be throttled so the clients knows what happened so they can act
accordingly instead of waiting naively. Muting the socket on the broker
side is just in case of non-cooperating clients. For the existing clients,
it seems this does not have much impact compare with what we have now.

Thanks,

Jiangjie (Becket) Qin



On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram <ra...@gmail.com>
wrote:

> Hi Becket,
>
> Thank you for the KIP. A few comments:
>
> 1.KIP says:  "*No public interface changes are needed. We only propose
> behavior change on the broker side.*"
>
> But from the proposed changes, it sounds like clients will be updated to
> wait for throttle-time before sending next response, and also not handle
> idle disconnections during that time. Doesn't that mean that clients need
> to know that the broker has sent the response before throttling, requiring
> protocol/version change?
>
>
> 2. At the moment, broker failures are detected by clients (and vice versa)
> within connections.max.idle.ms. By removing this check for an unlimited
> throttle time, failure detection could be delayed.
>
>
> 3. KIP says  "*Since this subsequent request is not actually handled until
> the broker unmutes the channel, the client can hit request.timeout.ms
> <http://request.timeout.ms> and reconnect. However, this is no worse than
> the current state.*"
>
> I think this could be worse than the current state because broker doesn't
> detect and close the channel for an unlimited throttle time, while new
> connections will get accepted. As a result, lot of connections could be in
> CLOSE_WAIT state when throttle time is high.
>
>
> Perhaps it is better to combine this KIP with a bound on throttle time?
>
>
> Regards,
>
>
> Rajini
>
>
> On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <be...@gmail.com> wrote:
>
> > Thanks for the comment, Jun,
> >
> > 1. Yes, you are right. This could also happen with the current quota
> > mechanism because we are essentially muting the socket during throttle
> > time. There might be two ways to solve this.
> > A) use another socket to send heartbeat.
> > B) let the GroupCoordinator know that the client will not heartbeat for
> > some time.
> > It seems the first solution is cleaner.
> >
> > 2. For consumer it seems returning an empty response is a better option.
> In
> > the producer case, if there is a spike in traffic. The brokers will see
> > queued up requests, but that is hard to avoid unless we have connection
> > level quota, which is a bigger change and may be easier to discuss it in
> a
> > separate KIP.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <ju...@confluent.io> wrote:
> >
> > > Hi, Jiangjie,
> > >
> > > Thanks for bringing this up. A couple of quick thoughts.
> > >
> > > 1. If the throttle time is large, what can happen is that a consumer
> > won't
> > > be able to heart beat to the group coordinator frequent enough. In that
> > > case, even with this KIP, it seems there could be frequent consumer
> group
> > > rebalances.
> > >
> > > 2. If we return a response immediately, for the consumer, do we return
> > the
> > > requested data or an empty response? If we do the former, it may not
> > > protect against the case when there are multiple consumer instances
> > > associated with the same user/clientid.
> > >
> > > Jun
> > >
> > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <be...@gmail.com>
> wrote:
> > >
> > > > Hi,
> > > >
> > > > We would like to start the discussion on KIP-219.
> > > >
> > > > The KIP tries to improve quota throttling time communication between
> > > > brokers and clients to avoid clients timeout in case of long
> throttling
> > > > time.
> > > >
> > > > The KIP link is following:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 219+-+Improve+quota+
> > > > communication
> > > >
> > > > Comments are welcome.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Rajini Sivaram <ra...@gmail.com>.
Hi Becket,

Thank you for the KIP. A few comments:

1.KIP says:  "*No public interface changes are needed. We only propose
behavior change on the broker side.*"

But from the proposed changes, it sounds like clients will be updated to
wait for throttle-time before sending next response, and also not handle
idle disconnections during that time. Doesn't that mean that clients need
to know that the broker has sent the response before throttling, requiring
protocol/version change?


2. At the moment, broker failures are detected by clients (and vice versa)
within connections.max.idle.ms. By removing this check for an unlimited
throttle time, failure detection could be delayed.


3. KIP says  "*Since this subsequent request is not actually handled until
the broker unmutes the channel, the client can hit request.timeout.ms
<http://request.timeout.ms> and reconnect. However, this is no worse than
the current state.*"

I think this could be worse than the current state because broker doesn't
detect and close the channel for an unlimited throttle time, while new
connections will get accepted. As a result, lot of connections could be in
CLOSE_WAIT state when throttle time is high.


Perhaps it is better to combine this KIP with a bound on throttle time?


Regards,


Rajini


On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <be...@gmail.com> wrote:

> Thanks for the comment, Jun,
>
> 1. Yes, you are right. This could also happen with the current quota
> mechanism because we are essentially muting the socket during throttle
> time. There might be two ways to solve this.
> A) use another socket to send heartbeat.
> B) let the GroupCoordinator know that the client will not heartbeat for
> some time.
> It seems the first solution is cleaner.
>
> 2. For consumer it seems returning an empty response is a better option. In
> the producer case, if there is a spike in traffic. The brokers will see
> queued up requests, but that is hard to avoid unless we have connection
> level quota, which is a bigger change and may be easier to discuss it in a
> separate KIP.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <ju...@confluent.io> wrote:
>
> > Hi, Jiangjie,
> >
> > Thanks for bringing this up. A couple of quick thoughts.
> >
> > 1. If the throttle time is large, what can happen is that a consumer
> won't
> > be able to heart beat to the group coordinator frequent enough. In that
> > case, even with this KIP, it seems there could be frequent consumer group
> > rebalances.
> >
> > 2. If we return a response immediately, for the consumer, do we return
> the
> > requested data or an empty response? If we do the former, it may not
> > protect against the case when there are multiple consumer instances
> > associated with the same user/clientid.
> >
> > Jun
> >
> > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <be...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > We would like to start the discussion on KIP-219.
> > >
> > > The KIP tries to improve quota throttling time communication between
> > > brokers and clients to avoid clients timeout in case of long throttling
> > > time.
> > >
> > > The KIP link is following:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 219+-+Improve+quota+
> > > communication
> > >
> > > Comments are welcome.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> >
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Becket Qin <be...@gmail.com>.
Thanks for the comment, Jun,

1. Yes, you are right. This could also happen with the current quota
mechanism because we are essentially muting the socket during throttle
time. There might be two ways to solve this.
A) use another socket to send heartbeat.
B) let the GroupCoordinator know that the client will not heartbeat for
some time.
It seems the first solution is cleaner.

2. For consumer it seems returning an empty response is a better option. In
the producer case, if there is a spike in traffic. The brokers will see
queued up requests, but that is hard to avoid unless we have connection
level quota, which is a bigger change and may be easier to discuss it in a
separate KIP.

Thanks,

Jiangjie (Becket) Qin


On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <ju...@confluent.io> wrote:

> Hi, Jiangjie,
>
> Thanks for bringing this up. A couple of quick thoughts.
>
> 1. If the throttle time is large, what can happen is that a consumer won't
> be able to heart beat to the group coordinator frequent enough. In that
> case, even with this KIP, it seems there could be frequent consumer group
> rebalances.
>
> 2. If we return a response immediately, for the consumer, do we return the
> requested data or an empty response? If we do the former, it may not
> protect against the case when there are multiple consumer instances
> associated with the same user/clientid.
>
> Jun
>
> On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <be...@gmail.com> wrote:
>
> > Hi,
> >
> > We would like to start the discussion on KIP-219.
> >
> > The KIP tries to improve quota throttling time communication between
> > brokers and clients to avoid clients timeout in case of long throttling
> > time.
> >
> > The KIP link is following:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 219+-+Improve+quota+
> > communication
> >
> > Comments are welcome.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
>

Re: [DISCUSS] KIP-219 - Improve Quota Communication

Posted by Jun Rao <ju...@confluent.io>.
Hi, Jiangjie,

Thanks for bringing this up. A couple of quick thoughts.

1. If the throttle time is large, what can happen is that a consumer won't
be able to heart beat to the group coordinator frequent enough. In that
case, even with this KIP, it seems there could be frequent consumer group
rebalances.

2. If we return a response immediately, for the consumer, do we return the
requested data or an empty response? If we do the former, it may not
protect against the case when there are multiple consumer instances
associated with the same user/clientid.

Jun

On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin <be...@gmail.com> wrote:

> Hi,
>
> We would like to start the discussion on KIP-219.
>
> The KIP tries to improve quota throttling time communication between
> brokers and clients to avoid clients timeout in case of long throttling
> time.
>
> The KIP link is following:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-219+-+Improve+quota+
> communication
>
> Comments are welcome.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>