You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Mickael Maison <mi...@gmail.com> on 2017/01/23 17:46:43 UTC

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

I've updated the KIP to address all the comments raised here and from
the "DISCUSS" thread.
See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+Bound+Fetch+memory+usage+in+the+consumer

Now, I'd like to restart the vote.

On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
<ra...@googlemail.com> wrote:
> Hi Mickael,
>
> I am +1 on the overall approach of this KIP, but have a couple of comments
> (sorry, should have brought them up on the discuss thread earlier):
>
> 1. Perhaps it would be better to do this after KAFKA-4137
> <https://issues.apache.org/jira/browse/KAFKA-4137> is implemented? At the
> moment, coordinator shares the same NetworkClient (and hence the same
> Selector) with consumer connections used for fetching records. Since
> freeing of memory relies on consuming applications invoking poll() after
> processing previous records and potentially after committing offsets, it
> will be good to ensure that coordinator is not blocked for read by fetch
> responses. This may be simpler once coordinator has its own Selector.
>
> 2. The KIP says: *Once messages are returned to the user, messages are
> deleted from the MemoryPool so new messages can be stored.*
> Can you expand that a bit? I am assuming that partial buffers never get
> freed when some messages are returned to the user since the consumer is
> still holding a reference to the buffer. Would buffers be freed when
> fetches for all the partitions in a response are parsed, but perhaps not
> yet returned to the user (i.e., is the memory freed when a reference to the
> response buffer is no longer required)? It will be good to document the
> (approximate) maximum memory requirement for the non-compressed case. There
> is data read from the socket, cached in the Fetcher and (as Radai has
> pointed out), the records still with the user application.
>
>
> On Tue, Dec 6, 2016 at 2:04 AM, radai <ra...@gmail.com> wrote:
>
>> +1 (non-binding).
>>
>> small nit pick - just because you returned a response to user doesnt mean
>> the memory id no longer used. for some cases the actual "point of
>> termination" may be the deserializer (really impl-dependant), but
>> generally, wouldnt it be "nice" to have an explicit dispose() call on
>> responses (with the addition that getting the next batch of data from a
>> consumer automatically disposes the previous results)
>>
>> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <EC...@uk.ibm.com> wrote:
>>
>> > +1 (non binding)
>> > --------------------------------------------------
>> > Edoardo Comar
>> > IBM MessageHub
>> > ecomar@uk.ibm.com
>> > IBM UK Ltd, Hursley Park, SO21 2JN
>> >
>> > IBM United Kingdom Limited Registered in England and Wales with number
>> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
>> PO6
>> > 3AU
>> >
>> >
>> >
>> > From:   Mickael Maison <mi...@gmail.com>
>> > To:     dev@kafka.apache.org
>> > Date:   05/12/2016 14:38
>> > Subject:        [VOTE] KIP-81: Bound Fetch memory usage in the consumer
>> >
>> >
>> >
>> > Hi all,
>> >
>> > I'd like to start the vote for KIP-81:
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >
>> >
>> > Thank you
>> >
>> >
>> >
>> >
>> > Unless stated otherwise above:
>> > IBM United Kingdom Limited - Registered in England and Wales with number
>> > 741598.
>> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
>> 3AU
>> >
>>
>
>
>
> --
> Regards,
>
> Rajini

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Mickael Maison <mi...@gmail.com>.
Thanks to everybody who voted and reviewed the KIP !

The vote passed with 5 binding +1s (Jason, Guozhang, Jun, Ismael,
Becket) and 3 non-binding +1s (Edoardo, Rajini, Radai)


On Mon, Apr 3, 2017 at 5:59 PM, Becket Qin <be...@gmail.com> wrote:
> +1. Thanks for the KIP.
>
> On Mon, Apr 3, 2017 at 4:29 AM, Rajini Sivaram <ra...@gmail.com>
> wrote:
>
>> +1 (non-binding)
>>
>> On Fri, Mar 31, 2017 at 5:36 PM, radai <ra...@gmail.com> wrote:
>>
>> > possible priorities:
>> >
>> > 1. keepalives/coordination
>> > 2. inter-broker-traffic
>> > 3. produce traffic
>> > 4. consume traffic
>> >
>> > (dont want to start a debate, just to illustrate there may be >2 of them
>> so
>> > int is better than bool)
>> >
>> > On Fri, Mar 31, 2017 at 9:10 AM, Ismael Juma <is...@juma.me.uk> wrote:
>> >
>> > > +1 from me too, thanks for the KIP.
>> > >
>> > > Ismael
>> > >
>> > > On Fri, Mar 31, 2017 at 5:06 PM, Jun Rao <ju...@confluent.io> wrote:
>> > >
>> > > > Hi, Mickael,
>> > > >
>> > > > Thanks for the KIP. +1 from me too.
>> > > >
>> > > > Jun
>> > > >
>> > > > On Thu, Mar 30, 2017 at 4:40 AM, Mickael Maison <
>> > > mickael.maison@gmail.com>
>> > > > wrote:
>> > > >
>> > > > > Thanks for the suggestion.
>> > > > >
>> > > > > Currently, I can't think of a scenario when we would need multiple
>> > > > > priority "levels". If in the future it makes sense to have some, I
>> > > > > think we could just make the change without a new KIP as these APIs
>> > > > > are not public.
>> > > > > So I'd be more inclined to keep the boolean for now.
>> > > > >
>> > > > > On Wed, Mar 29, 2017 at 6:13 PM, Edoardo Comar <EC...@uk.ibm.com>
>> > > > wrote:
>> > > > > > Hi Mickael,
>> > > > > > as discussed we could change the priority parameter to be an int
>> > > rather
>> > > > > > than a boolean.
>> > > > > >
>> > > > > > That's a bit more extensible
>> > > > > > --------------------------------------------------
>> > > > > > Edoardo Comar
>> > > > > > IBM MessageHub
>> > > > > > ecomar@uk.ibm.com
>> > > > > > IBM UK Ltd, Hursley Park, SO21 2JN
>> > > > > >
>> > > > > > IBM United Kingdom Limited Registered in England and Wales with
>> > > number
>> > > > > > 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
>> > > Hants.
>> > > > > PO6
>> > > > > > 3AU
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > From:   Guozhang Wang <wa...@gmail.com>
>> > > > > > To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
>> > > > > > Date:   28/03/2017 19:02
>> > > > > > Subject:        Re: [VOTE] KIP-81: Bound Fetch memory usage in
>> the
>> > > > > > consumer
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > 1) Makes sense.
>> > > > > > 2) Makes sense. Thanks!
>> > > > > >
>> > > > > > On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison
>> > > > > > <mi...@gmail.com>
>> > > > > > wrote:
>> > > > > >
>> > > > > >> Hi Guozhang,
>> > > > > >>
>> > > > > >> Thanks for the feedback.
>> > > > > >>
>> > > > > >> 1) By MemoryPool, I mean the implementation added in KIP-72.
>> That
>> > > will
>> > > > > >> most likely be SimpleMemoryPool, but the PR for KIP-72 has not
>> > been
>> > > > > >> merged yet.
>> > > > > >> I've updated the KIP to make it more obvious.
>> > > > > >>
>> > > > > >> 2) I was thinking to pass in the priority when creating the
>> > > > > >> Coordinator Node (in
>> > > > > >> https://github.com/apache/kafka/blob/trunk/clients/src/
>> > > > > >> main/java/org/apache/kafka/clients/consumer/internals/
>> > > > > >> AbstractCoordinator.java#L582)
>> > > > > >> Then when calling Selector.connect() (in
>> > > > > >> https://github.com/apache/kafka/blob/trunk/clients/src/
>> > > > > >> main/java/org/apache/kafka/clients/NetworkClient.java#L643)
>> > > > > >> retrieve it and pass it in the Selector so it uses it when
>> > building
>> > > > > >> the Channel.
>> > > > > >> The idea was to avoid having to deduce the connection is for the
>> > > > > >> Coordinator from the ID but instead have it explicitly set by
>> > > > > >> AbstractCoordinator (and pass it all the way down to the
>> Channel)
>> > > > > >>
>> > > > > >> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <
>> > wangguoz@gmail.com>
>> > > > > > wrote:
>> > > > > >> > Mickael,
>> > > > > >> >
>> > > > > >> > Sorry for the late review of the KIP. I'm +1 on the proposed
>> > > change
>> > > > as
>> > > > > >> > well. Just a few minor comments on the wiki itself:
>> > > > > >> >
>> > > > > >> > 1. By the "MemoryPool" are you referring to a new class impl
>> or
>> > to
>> > > > > >> reusing "
>> > > > > >> > org.apache.kafka.clients.producer.internals.BufferPool"? I
>> > assume
>> > > > it
>> > > > > > was
>> > > > > >> > the latter case, and if yes, could you update the wiki page to
>> > > make
>> > > > it
>> > > > > >> > clear?
>> > > > > >> >
>> > > > > >> > 2. I think it is sufficient to add the priority to
>> KafkaChannel
>> > > > class,
>> > > > > >> but
>> > > > > >> > not needed in Node (but one may need to add this parameter to
>> > > > > > Selector#
>> > > > > >> > connect). Could you point me to which usage of Node needs to
>> > > access
>> > > > > > the
>> > > > > >> > priority?
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > Guozhang
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <
>> > > > > >> mickael.maison@gmail.com>
>> > > > > >> > wrote:
>> > > > > >> >
>> > > > > >> >> Thanks Jason for the feedback! Yes it makes sense to always
>> use
>> > > the
>> > > > > >> >> MemoryPool is we can. I've updated the KIP with the
>> suggestion
>> > > > > >> >>
>> > > > > >> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <
>> > > > jason@confluent.io
>> > > > > >
>> > > > > >> >> wrote:
>> > > > > >> >> > Just a minor comment. The KIP suggests that coordinator
>> > > responses
>> > > > > > are
>> > > > > >> >> > always allocated outside of the memory pool, but maybe we
>> can
>> > > > > > reserve
>> > > > > >> >> that
>> > > > > >> >> > capability for only when the pool does not have enough
>> space?
>> > > It
>> > > > > >> seems a
>> > > > > >> >> > little nicer to use the pool if we can. If that seems
>> > > reasonable,
>> > > > > > I'm
>> > > > > >> +1
>> > > > > >> >> on
>> > > > > >> >> > the KIP. Thanks for the effort!
>> > > > > >> >> >
>> > > > > >> >> > -Jason
>> > > > > >> >> >
>> > > > > >> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
>> > > > > >> >> mickael.maison@gmail.com>
>> > > > > >> >> > wrote:
>> > > > > >> >> >
>> > > > > >> >> >> Yes I agree, having a generic flag is more future proof.
>> > > > > >> >> >> I'll update the KIP in the coming days.
>> > > > > >> >> >>
>> > > > > >> >> >> Thanks
>> > > > > >> >> >>
>> > > > > >> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson
>> > > > > > <jason@confluent.io
>> > > > > >> >
>> > > > > >> >> >> wrote:
>> > > > > >> >> >> > Hey Mickael,
>> > > > > >> >> >> >
>> > > > > >> >> >> > The suggestion to add something to Node makes sense. I
>> > could
>> > > > > >> imagine
>> > > > > >> >> for
>> > > > > >> >> >> > example adding a flag to indicate that the connection
>> has
>> > a
>> > > > > > higher
>> > > > > >> >> >> > "priority," meaning that we can allocate outside of the
>> > > memory
>> > > > > >> pool if
>> > > > > >> >> >> > necessary. That would still be generic even if the only
>> > use
>> > > > case
>> > > > > > is
>> > > > > >> >> the
>> > > > > >> >> >> > consumer coordinator. We might also face a similar
>> problem
>> > > > when
>> > > > > > the
>> > > > > >> >> >> > producer is sending requests to the transaction
>> > coordinator
>> > > > for
>> > > > > >> >> KIP-98.
>> > > > > >> >> >> > What do you think?
>> > > > > >> >> >> >
>> > > > > >> >> >> > Thanks,
>> > > > > >> >> >> > Jason
>> > > > > >> >> >> >
>> > > > > >> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
>> > > > > >> >> >> mickael.maison@gmail.com>
>> > > > > >> >> >> > wrote:
>> > > > > >> >> >> >
>> > > > > >> >> >> >> Apologies for the late response.
>> > > > > >> >> >> >>
>> > > > > >> >> >> >> Thanks Jason for the suggestion. Yes you are right, the
>> > > > > >> Coordinator
>> > > > > >> >> >> >> connection is "tagged" with a different id, so we could
>> > > > > > retrieve
>> > > > > >> it
>> > > > > >> >> in
>> > > > > >> >> >> >> NetworkReceive to make the distinction.
>> > > > > >> >> >> >> However, currently the coordinator connection are made
>> > > > > > different
>> > > > > >> by
>> > > > > >> >> >> using:
>> > > > > >> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node(
>> ).id()
>> > > > > >> >> >> >> for the Node id.
>> > > > > >> >> >> >>
>> > > > > >> >> >> >> So to identify Coordinator connections, we'd have to
>> > check
>> > > > that
>> > > > > >> the
>> > > > > >> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE
>> > > which
>> > > > > > is a
>> > > > > >> >> bit
>> > > > > >> >> >> >> hacky ...
>> > > > > >> >> >> >>
>> > > > > >> >> >> >> Maybe we could add a constructor to Node that allows to
>> > > pass
>> > > > in
>> > > > > > a
>> > > > > >> >> >> >> sourceId String. That way we could make all the
>> > coordinator
>> > > > > >> >> >> >> connections explicit (by setting it to
>> "Coordinator-[ID]"
>> > > for
>> > > > > >> >> >> >> example).
>> > > > > >> >> >> >> What do you think ?
>> > > > > >> >> >> >>
>> > > > > >> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
>> > > > > >> >> jason@confluent.io>
>> > > > > >> >> >> >> wrote:
>> > > > > >> >> >> >> > Good point. The consumer does use a separate
>> connection
>> > > to
>> > > > > > the
>> > > > > >> >> >> >> coordinator,
>> > > > > >> >> >> >> > so perhaps the connection itself could be tagged for
>> > > normal
>> > > > > > heap
>> > > > > >> >> >> >> allocation?
>> > > > > >> >> >> >> >
>> > > > > >> >> >> >> > -Jason
>> > > > > >> >> >> >> >
>> > > > > >> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
>> > > > > >> >> >> >> onurkaraman.apache@gmail.com
>> > > > > >> >> >> >> >> wrote:
>> > > > > >> >> >> >> >
>> > > > > >> >> >> >> >> I only did a quick scan but I wanted to point out
>> > what I
>> > > > > > think
>> > > > > >> is
>> > > > > >> >> an
>> > > > > >> >> >> >> >> incorrect assumption in the KIP's caveats:
>> > > > > >> >> >> >> >> "
>> > > > > >> >> >> >> >> There is a risk using the MemoryPool that, after we
>> > fill
>> > > > up
>> > > > > > the
>> > > > > >> >> >> memory
>> > > > > >> >> >> >> with
>> > > > > >> >> >> >> >> fetch data, we can starve the coordinator's
>> connection
>> > > > > >> >> >> >> >> ...
>> > > > > >> >> >> >> >> To alleviate this issue, only messages larger than
>> 1Kb
>> > > > will
>> > > > > > be
>> > > > > >> >> >> >> allocated in
>> > > > > >> >> >> >> >> the MemoryPool. Smaller messages will be allocated
>> > > > directly
>> > > > > > on
>> > > > > >> the
>> > > > > >> >> >> Heap
>> > > > > >> >> >> >> >> like before. This allows group/heartbeat messages to
>> > > avoid
>> > > > > >> being
>> > > > > >> >> >> >> delayed if
>> > > > > >> >> >> >> >> the MemoryPool fills up.
>> > > > > >> >> >> >> >> "
>> > > > > >> >> >> >> >>
>> > > > > >> >> >> >> >> So it sounds like there's an incorrect assumption
>> that
>> > > > > >> responses
>> > > > > >> >> from
>> > > > > >> >> >> >> the
>> > > > > >> >> >> >> >> coordinator will always be small (< 1Kb as mentioned
>> > in
>> > > > the
>> > > > > >> >> caveat).
>> > > > > >> >> >> >> There
>> > > > > >> >> >> >> >> are now a handful of request types between clients
>> and
>> > > the
>> > > > > >> >> >> coordinator:
>> > > > > >> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat,
>> > > > OffsetCommit,
>> > > > > >> >> >> OffsetFetch,
>> > > > > >> >> >> >> >> ListGroups, DescribeGroups}. While true (at least
>> > today)
>> > > > for
>> > > > > >> >> >> >> >> HeartbeatResponse and a few others, I don't think we
>> > can
>> > > > > > assume
>> > > > > >> >> >> >> >> JoinGroupResponse, SyncGroupResponse,
>> > > > > > DescribeGroupsResponse,
>> > > > > >> and
>> > > > > >> >> >> >> >> OffsetFetchResponse will be small, as they are
>> > > effectively
>> > > > > >> >> bounded by
>> > > > > >> >> >> >> the
>> > > > > >> >> >> >> >> max message size allowed by the broker for the
>> > > > > >> __consumer_offsets
>> > > > > >> >> >> topic
>> > > > > >> >> >> >> >> which by default is 1MB.
>> > > > > >> >> >> >> >>
>> > > > > >> >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
>> > > > > >> >> >> >> mickael.maison@gmail.com>
>> > > > > >> >> >> >> >> wrote:
>> > > > > >> >> >> >> >>
>> > > > > >> >> >> >> >> > I've updated the KIP to address all the comments
>> > > raised
>> > > > > > here
>> > > > > >> and
>> > > > > >> >> >> from
>> > > > > >> >> >> >> >> > the "DISCUSS" thread.
>> > > > > >> >> >> >> >> > See:
>> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > >> >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> > > > > >> >> >> >> >> >
>> > > > > >> >> >> >> >> > Now, I'd like to restart the vote.
>> > > > > >> >> >> >> >> >
>> > > > > >> >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
>> > > > > >> >> >> >> >> > <ra...@googlemail.com> wrote:
>> > > > > >> >> >> >> >> > > Hi Mickael,
>> > > > > >> >> >> >> >> > >
>> > > > > >> >> >> >> >> > > I am +1 on the overall approach of this KIP, but
>> > > have
>> > > > a
>> > > > > >> >> couple of
>> > > > > >> >> >> >> >> > comments
>> > > > > >> >> >> >> >> > > (sorry, should have brought them up on the
>> discuss
>> > > > > > thread
>> > > > > >> >> >> earlier):
>> > > > > >> >> >> >> >> > >
>> > > > > >> >> >> >> >> > > 1. Perhaps it would be better to do this after
>> > > > > > KAFKA-4137
>> > > > > >> >> >> >> >> > > <https://issues.apache.org/
>> jira/browse/KAFKA-4137
>> > >
>> > > is
>> > > > > >> >> >> implemented?
>> > > > > >> >> >> >> At
>> > > > > >> >> >> >> >> > the
>> > > > > >> >> >> >> >> > > moment, coordinator shares the same
>> NetworkClient
>> > > (and
>> > > > > >> hence
>> > > > > >> >> the
>> > > > > >> >> >> >> same
>> > > > > >> >> >> >> >> > > Selector) with consumer connections used for
>> > > fetching
>> > > > > >> records.
>> > > > > >> >> >> Since
>> > > > > >> >> >> >> >> > > freeing of memory relies on consuming
>> applications
>> > > > > > invoking
>> > > > > >> >> >> poll()
>> > > > > >> >> >> >> >> after
>> > > > > >> >> >> >> >> > > processing previous records and potentially
>> after
>> > > > > >> committing
>> > > > > >> >> >> >> offsets,
>> > > > > >> >> >> >> >> it
>> > > > > >> >> >> >> >> > > will be good to ensure that coordinator is not
>> > > blocked
>> > > > > > for
>> > > > > >> >> read
>> > > > > >> >> >> by
>> > > > > >> >> >> >> >> fetch
>> > > > > >> >> >> >> >> > > responses. This may be simpler once coordinator
>> > has
>> > > > its
>> > > > > > own
>> > > > > >> >> >> >> Selector.
>> > > > > >> >> >> >> >> > >
>> > > > > >> >> >> >> >> > > 2. The KIP says: *Once messages are returned to
>> > the
>> > > > > > user,
>> > > > > >> >> >> messages
>> > > > > >> >> >> >> are
>> > > > > >> >> >> >> >> > > deleted from the MemoryPool so new messages can
>> be
>> > > > > > stored.*
>> > > > > >> >> >> >> >> > > Can you expand that a bit? I am assuming that
>> > > partial
>> > > > > >> buffers
>> > > > > >> >> >> never
>> > > > > >> >> >> >> get
>> > > > > >> >> >> >> >> > > freed when some messages are returned to the
>> user
>> > > > since
>> > > > > > the
>> > > > > >> >> >> >> consumer is
>> > > > > >> >> >> >> >> > > still holding a reference to the buffer. Would
>> > > buffers
>> > > > > > be
>> > > > > >> >> freed
>> > > > > >> >> >> when
>> > > > > >> >> >> >> >> > > fetches for all the partitions in a response are
>> > > > parsed,
>> > > > > >> but
>> > > > > >> >> >> perhaps
>> > > > > >> >> >> >> >> not
>> > > > > >> >> >> >> >> > > yet returned to the user (i.e., is the memory
>> > freed
>> > > > when
>> > > > > > a
>> > > > > >> >> >> >> reference to
>> > > > > >> >> >> >> >> > the
>> > > > > >> >> >> >> >> > > response buffer is no longer required)? It will
>> be
>> > > > good
>> > > > > > to
>> > > > > >> >> >> document
>> > > > > >> >> >> >> the
>> > > > > >> >> >> >> >> > > (approximate) maximum memory requirement for the
>> > > > > >> >> non-compressed
>> > > > > >> >> >> >> case.
>> > > > > >> >> >> >> >> > There
>> > > > > >> >> >> >> >> > > is data read from the socket, cached in the
>> > Fetcher
>> > > > and
>> > > > > > (as
>> > > > > >> >> Radai
>> > > > > >> >> >> >> has
>> > > > > >> >> >> >> >> > > pointed out), the records still with the user
>> > > > > > application.
>> > > > > >> >> >> >> >> > >
>> > > > > >> >> >> >> >> > >
>> > > > > >> >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
>> > > > > >> >> >> radai.rosenblatt@gmail.com>
>> > > > > >> >> >> >> >> > wrote:
>> > > > > >> >> >> >> >> > >
>> > > > > >> >> >> >> >> > >> +1 (non-binding).
>> > > > > >> >> >> >> >> > >>
>> > > > > >> >> >> >> >> > >> small nit pick - just because you returned a
>> > > response
>> > > > > > to
>> > > > > >> user
>> > > > > >> >> >> >> doesnt
>> > > > > >> >> >> >> >> > mean
>> > > > > >> >> >> >> >> > >> the memory id no longer used. for some cases
>> the
>> > > > actual
>> > > > > >> >> "point
>> > > > > >> >> >> of
>> > > > > >> >> >> >> >> > >> termination" may be the deserializer (really
>> > > > > >> impl-dependant),
>> > > > > >> >> >> but
>> > > > > >> >> >> >> >> > >> generally, wouldnt it be "nice" to have an
>> > explicit
>> > > > > >> dispose()
>> > > > > >> >> >> call
>> > > > > >> >> >> >> on
>> > > > > >> >> >> >> >> > >> responses (with the addition that getting the
>> > next
>> > > > > > batch
>> > > > > >> of
>> > > > > >> >> data
>> > > > > >> >> >> >> from
>> > > > > >> >> >> >> >> a
>> > > > > >> >> >> >> >> > >> consumer automatically disposes the previous
>> > > results)
>> > > > > >> >> >> >> >> > >>
>> > > > > >> >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
>> > > > > >> >> >> ECOMAR@uk.ibm.com>
>> > > > > >> >> >> >> >> > wrote:
>> > > > > >> >> >> >> >> > >>
>> > > > > >> >> >> >> >> > >> > +1 (non binding)
>> > > > > >> >> >> >> >> > >> > ------------------------------
>> > > --------------------
>> > > > > >> >> >> >> >> > >> > Edoardo Comar
>> > > > > >> >> >> >> >> > >> > IBM MessageHub
>> > > > > >> >> >> >> >> > >> > ecomar@uk.ibm.com
>> > > > > >> >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >> > IBM United Kingdom Limited Registered in
>> > England
>> > > > and
>> > > > > >> Wales
>> > > > > >> >> >> with
>> > > > > >> >> >> >> >> number
>> > > > > >> >> >> >> >> > >> > 741598 Registered office: PO Box 41, North
>> > > Harbour,
>> > > > > >> >> >> Portsmouth,
>> > > > > >> >> >> >> >> Hants.
>> > > > > >> >> >> >> >> > >> PO6
>> > > > > >> >> >> >> >> > >> > 3AU
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >> > From:   Mickael Maison <
>> > mickael.maison@gmail.com
>> > > >
>> > > > > >> >> >> >> >> > >> > To:     dev@kafka.apache.org
>> > > > > >> >> >> >> >> > >> > Date:   05/12/2016 14:38
>> > > > > >> >> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch
>> > memory
>> > > > > > usage
>> > > > > >> in
>> > > > > >> >> the
>> > > > > >> >> >> >> >> > consumer
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >> > Hi all,
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >> > I'd like to start the vote for KIP-81:
>> > > > > >> >> >> >> >> > >> >
>> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > >> >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+
>> usage+in+the+consumer
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >> > Thank you
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >> > Unless stated otherwise above:
>> > > > > >> >> >> >> >> > >> > IBM United Kingdom Limited - Registered in
>> > > England
>> > > > > > and
>> > > > > >> >> Wales
>> > > > > >> >> >> with
>> > > > > >> >> >> >> >> > number
>> > > > > >> >> >> >> >> > >> > 741598.
>> > > > > >> >> >> >> >> > >> > Registered office: PO Box 41, North Harbour,
>> > > > > > Portsmouth,
>> > > > > >> >> >> >> Hampshire
>> > > > > >> >> >> >> >> PO6
>> > > > > >> >> >> >> >> > >> 3AU
>> > > > > >> >> >> >> >> > >> >
>> > > > > >> >> >> >> >> > >>
>> > > > > >> >> >> >> >> > >
>> > > > > >> >> >> >> >> > >
>> > > > > >> >> >> >> >> > >
>> > > > > >> >> >> >> >> > > --
>> > > > > >> >> >> >> >> > > Regards,
>> > > > > >> >> >> >> >> > >
>> > > > > >> >> >> >> >> > > Rajini
>> > > > > >> >> >> >> >> >
>> > > > > >> >> >> >> >>
>> > > > > >> >> >> >>
>> > > > > >> >> >>
>> > > > > >> >>
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > --
>> > > > > >> > -- Guozhang
>> > > > > >>
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > -- Guozhang
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > Unless stated otherwise above:
>> > > > > > IBM United Kingdom Limited - Registered in England and Wales with
>> > > > number
>> > > > > > 741598.
>> > > > > > Registered office: PO Box 41, North Harbour, Portsmouth,
>> Hampshire
>> > > PO6
>> > > > > 3AU
>> > > > >
>> > > >
>> > >
>> >
>>

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Becket Qin <be...@gmail.com>.
+1. Thanks for the KIP.

On Mon, Apr 3, 2017 at 4:29 AM, Rajini Sivaram <ra...@gmail.com>
wrote:

> +1 (non-binding)
>
> On Fri, Mar 31, 2017 at 5:36 PM, radai <ra...@gmail.com> wrote:
>
> > possible priorities:
> >
> > 1. keepalives/coordination
> > 2. inter-broker-traffic
> > 3. produce traffic
> > 4. consume traffic
> >
> > (dont want to start a debate, just to illustrate there may be >2 of them
> so
> > int is better than bool)
> >
> > On Fri, Mar 31, 2017 at 9:10 AM, Ismael Juma <is...@juma.me.uk> wrote:
> >
> > > +1 from me too, thanks for the KIP.
> > >
> > > Ismael
> > >
> > > On Fri, Mar 31, 2017 at 5:06 PM, Jun Rao <ju...@confluent.io> wrote:
> > >
> > > > Hi, Mickael,
> > > >
> > > > Thanks for the KIP. +1 from me too.
> > > >
> > > > Jun
> > > >
> > > > On Thu, Mar 30, 2017 at 4:40 AM, Mickael Maison <
> > > mickael.maison@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks for the suggestion.
> > > > >
> > > > > Currently, I can't think of a scenario when we would need multiple
> > > > > priority "levels". If in the future it makes sense to have some, I
> > > > > think we could just make the change without a new KIP as these APIs
> > > > > are not public.
> > > > > So I'd be more inclined to keep the boolean for now.
> > > > >
> > > > > On Wed, Mar 29, 2017 at 6:13 PM, Edoardo Comar <EC...@uk.ibm.com>
> > > > wrote:
> > > > > > Hi Mickael,
> > > > > > as discussed we could change the priority parameter to be an int
> > > rather
> > > > > > than a boolean.
> > > > > >
> > > > > > That's a bit more extensible
> > > > > > --------------------------------------------------
> > > > > > Edoardo Comar
> > > > > > IBM MessageHub
> > > > > > ecomar@uk.ibm.com
> > > > > > IBM UK Ltd, Hursley Park, SO21 2JN
> > > > > >
> > > > > > IBM United Kingdom Limited Registered in England and Wales with
> > > number
> > > > > > 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
> > > Hants.
> > > > > PO6
> > > > > > 3AU
> > > > > >
> > > > > >
> > > > > >
> > > > > > From:   Guozhang Wang <wa...@gmail.com>
> > > > > > To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
> > > > > > Date:   28/03/2017 19:02
> > > > > > Subject:        Re: [VOTE] KIP-81: Bound Fetch memory usage in
> the
> > > > > > consumer
> > > > > >
> > > > > >
> > > > > >
> > > > > > 1) Makes sense.
> > > > > > 2) Makes sense. Thanks!
> > > > > >
> > > > > > On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison
> > > > > > <mi...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Guozhang,
> > > > > >>
> > > > > >> Thanks for the feedback.
> > > > > >>
> > > > > >> 1) By MemoryPool, I mean the implementation added in KIP-72.
> That
> > > will
> > > > > >> most likely be SimpleMemoryPool, but the PR for KIP-72 has not
> > been
> > > > > >> merged yet.
> > > > > >> I've updated the KIP to make it more obvious.
> > > > > >>
> > > > > >> 2) I was thinking to pass in the priority when creating the
> > > > > >> Coordinator Node (in
> > > > > >> https://github.com/apache/kafka/blob/trunk/clients/src/
> > > > > >> main/java/org/apache/kafka/clients/consumer/internals/
> > > > > >> AbstractCoordinator.java#L582)
> > > > > >> Then when calling Selector.connect() (in
> > > > > >> https://github.com/apache/kafka/blob/trunk/clients/src/
> > > > > >> main/java/org/apache/kafka/clients/NetworkClient.java#L643)
> > > > > >> retrieve it and pass it in the Selector so it uses it when
> > building
> > > > > >> the Channel.
> > > > > >> The idea was to avoid having to deduce the connection is for the
> > > > > >> Coordinator from the ID but instead have it explicitly set by
> > > > > >> AbstractCoordinator (and pass it all the way down to the
> Channel)
> > > > > >>
> > > > > >> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <
> > wangguoz@gmail.com>
> > > > > > wrote:
> > > > > >> > Mickael,
> > > > > >> >
> > > > > >> > Sorry for the late review of the KIP. I'm +1 on the proposed
> > > change
> > > > as
> > > > > >> > well. Just a few minor comments on the wiki itself:
> > > > > >> >
> > > > > >> > 1. By the "MemoryPool" are you referring to a new class impl
> or
> > to
> > > > > >> reusing "
> > > > > >> > org.apache.kafka.clients.producer.internals.BufferPool"? I
> > assume
> > > > it
> > > > > > was
> > > > > >> > the latter case, and if yes, could you update the wiki page to
> > > make
> > > > it
> > > > > >> > clear?
> > > > > >> >
> > > > > >> > 2. I think it is sufficient to add the priority to
> KafkaChannel
> > > > class,
> > > > > >> but
> > > > > >> > not needed in Node (but one may need to add this parameter to
> > > > > > Selector#
> > > > > >> > connect). Could you point me to which usage of Node needs to
> > > access
> > > > > > the
> > > > > >> > priority?
> > > > > >> >
> > > > > >> >
> > > > > >> > Guozhang
> > > > > >> >
> > > > > >> >
> > > > > >> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <
> > > > > >> mickael.maison@gmail.com>
> > > > > >> > wrote:
> > > > > >> >
> > > > > >> >> Thanks Jason for the feedback! Yes it makes sense to always
> use
> > > the
> > > > > >> >> MemoryPool is we can. I've updated the KIP with the
> suggestion
> > > > > >> >>
> > > > > >> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <
> > > > jason@confluent.io
> > > > > >
> > > > > >> >> wrote:
> > > > > >> >> > Just a minor comment. The KIP suggests that coordinator
> > > responses
> > > > > > are
> > > > > >> >> > always allocated outside of the memory pool, but maybe we
> can
> > > > > > reserve
> > > > > >> >> that
> > > > > >> >> > capability for only when the pool does not have enough
> space?
> > > It
> > > > > >> seems a
> > > > > >> >> > little nicer to use the pool if we can. If that seems
> > > reasonable,
> > > > > > I'm
> > > > > >> +1
> > > > > >> >> on
> > > > > >> >> > the KIP. Thanks for the effort!
> > > > > >> >> >
> > > > > >> >> > -Jason
> > > > > >> >> >
> > > > > >> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
> > > > > >> >> mickael.maison@gmail.com>
> > > > > >> >> > wrote:
> > > > > >> >> >
> > > > > >> >> >> Yes I agree, having a generic flag is more future proof.
> > > > > >> >> >> I'll update the KIP in the coming days.
> > > > > >> >> >>
> > > > > >> >> >> Thanks
> > > > > >> >> >>
> > > > > >> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson
> > > > > > <jason@confluent.io
> > > > > >> >
> > > > > >> >> >> wrote:
> > > > > >> >> >> > Hey Mickael,
> > > > > >> >> >> >
> > > > > >> >> >> > The suggestion to add something to Node makes sense. I
> > could
> > > > > >> imagine
> > > > > >> >> for
> > > > > >> >> >> > example adding a flag to indicate that the connection
> has
> > a
> > > > > > higher
> > > > > >> >> >> > "priority," meaning that we can allocate outside of the
> > > memory
> > > > > >> pool if
> > > > > >> >> >> > necessary. That would still be generic even if the only
> > use
> > > > case
> > > > > > is
> > > > > >> >> the
> > > > > >> >> >> > consumer coordinator. We might also face a similar
> problem
> > > > when
> > > > > > the
> > > > > >> >> >> > producer is sending requests to the transaction
> > coordinator
> > > > for
> > > > > >> >> KIP-98.
> > > > > >> >> >> > What do you think?
> > > > > >> >> >> >
> > > > > >> >> >> > Thanks,
> > > > > >> >> >> > Jason
> > > > > >> >> >> >
> > > > > >> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
> > > > > >> >> >> mickael.maison@gmail.com>
> > > > > >> >> >> > wrote:
> > > > > >> >> >> >
> > > > > >> >> >> >> Apologies for the late response.
> > > > > >> >> >> >>
> > > > > >> >> >> >> Thanks Jason for the suggestion. Yes you are right, the
> > > > > >> Coordinator
> > > > > >> >> >> >> connection is "tagged" with a different id, so we could
> > > > > > retrieve
> > > > > >> it
> > > > > >> >> in
> > > > > >> >> >> >> NetworkReceive to make the distinction.
> > > > > >> >> >> >> However, currently the coordinator connection are made
> > > > > > different
> > > > > >> by
> > > > > >> >> >> using:
> > > > > >> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node(
> ).id()
> > > > > >> >> >> >> for the Node id.
> > > > > >> >> >> >>
> > > > > >> >> >> >> So to identify Coordinator connections, we'd have to
> > check
> > > > that
> > > > > >> the
> > > > > >> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE
> > > which
> > > > > > is a
> > > > > >> >> bit
> > > > > >> >> >> >> hacky ...
> > > > > >> >> >> >>
> > > > > >> >> >> >> Maybe we could add a constructor to Node that allows to
> > > pass
> > > > in
> > > > > > a
> > > > > >> >> >> >> sourceId String. That way we could make all the
> > coordinator
> > > > > >> >> >> >> connections explicit (by setting it to
> "Coordinator-[ID]"
> > > for
> > > > > >> >> >> >> example).
> > > > > >> >> >> >> What do you think ?
> > > > > >> >> >> >>
> > > > > >> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
> > > > > >> >> jason@confluent.io>
> > > > > >> >> >> >> wrote:
> > > > > >> >> >> >> > Good point. The consumer does use a separate
> connection
> > > to
> > > > > > the
> > > > > >> >> >> >> coordinator,
> > > > > >> >> >> >> > so perhaps the connection itself could be tagged for
> > > normal
> > > > > > heap
> > > > > >> >> >> >> allocation?
> > > > > >> >> >> >> >
> > > > > >> >> >> >> > -Jason
> > > > > >> >> >> >> >
> > > > > >> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> > > > > >> >> >> >> onurkaraman.apache@gmail.com
> > > > > >> >> >> >> >> wrote:
> > > > > >> >> >> >> >
> > > > > >> >> >> >> >> I only did a quick scan but I wanted to point out
> > what I
> > > > > > think
> > > > > >> is
> > > > > >> >> an
> > > > > >> >> >> >> >> incorrect assumption in the KIP's caveats:
> > > > > >> >> >> >> >> "
> > > > > >> >> >> >> >> There is a risk using the MemoryPool that, after we
> > fill
> > > > up
> > > > > > the
> > > > > >> >> >> memory
> > > > > >> >> >> >> with
> > > > > >> >> >> >> >> fetch data, we can starve the coordinator's
> connection
> > > > > >> >> >> >> >> ...
> > > > > >> >> >> >> >> To alleviate this issue, only messages larger than
> 1Kb
> > > > will
> > > > > > be
> > > > > >> >> >> >> allocated in
> > > > > >> >> >> >> >> the MemoryPool. Smaller messages will be allocated
> > > > directly
> > > > > > on
> > > > > >> the
> > > > > >> >> >> Heap
> > > > > >> >> >> >> >> like before. This allows group/heartbeat messages to
> > > avoid
> > > > > >> being
> > > > > >> >> >> >> delayed if
> > > > > >> >> >> >> >> the MemoryPool fills up.
> > > > > >> >> >> >> >> "
> > > > > >> >> >> >> >>
> > > > > >> >> >> >> >> So it sounds like there's an incorrect assumption
> that
> > > > > >> responses
> > > > > >> >> from
> > > > > >> >> >> >> the
> > > > > >> >> >> >> >> coordinator will always be small (< 1Kb as mentioned
> > in
> > > > the
> > > > > >> >> caveat).
> > > > > >> >> >> >> There
> > > > > >> >> >> >> >> are now a handful of request types between clients
> and
> > > the
> > > > > >> >> >> coordinator:
> > > > > >> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat,
> > > > OffsetCommit,
> > > > > >> >> >> OffsetFetch,
> > > > > >> >> >> >> >> ListGroups, DescribeGroups}. While true (at least
> > today)
> > > > for
> > > > > >> >> >> >> >> HeartbeatResponse and a few others, I don't think we
> > can
> > > > > > assume
> > > > > >> >> >> >> >> JoinGroupResponse, SyncGroupResponse,
> > > > > > DescribeGroupsResponse,
> > > > > >> and
> > > > > >> >> >> >> >> OffsetFetchResponse will be small, as they are
> > > effectively
> > > > > >> >> bounded by
> > > > > >> >> >> >> the
> > > > > >> >> >> >> >> max message size allowed by the broker for the
> > > > > >> __consumer_offsets
> > > > > >> >> >> topic
> > > > > >> >> >> >> >> which by default is 1MB.
> > > > > >> >> >> >> >>
> > > > > >> >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
> > > > > >> >> >> >> mickael.maison@gmail.com>
> > > > > >> >> >> >> >> wrote:
> > > > > >> >> >> >> >>
> > > > > >> >> >> >> >> > I've updated the KIP to address all the comments
> > > raised
> > > > > > here
> > > > > >> and
> > > > > >> >> >> from
> > > > > >> >> >> >> >> > the "DISCUSS" thread.
> > > > > >> >> >> >> >> > See:
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > >> >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> > > > > >> >> >> >> >> >
> > > > > >> >> >> >> >> > Now, I'd like to restart the vote.
> > > > > >> >> >> >> >> >
> > > > > >> >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> > > > > >> >> >> >> >> > <ra...@googlemail.com> wrote:
> > > > > >> >> >> >> >> > > Hi Mickael,
> > > > > >> >> >> >> >> > >
> > > > > >> >> >> >> >> > > I am +1 on the overall approach of this KIP, but
> > > have
> > > > a
> > > > > >> >> couple of
> > > > > >> >> >> >> >> > comments
> > > > > >> >> >> >> >> > > (sorry, should have brought them up on the
> discuss
> > > > > > thread
> > > > > >> >> >> earlier):
> > > > > >> >> >> >> >> > >
> > > > > >> >> >> >> >> > > 1. Perhaps it would be better to do this after
> > > > > > KAFKA-4137
> > > > > >> >> >> >> >> > > <https://issues.apache.org/
> jira/browse/KAFKA-4137
> > >
> > > is
> > > > > >> >> >> implemented?
> > > > > >> >> >> >> At
> > > > > >> >> >> >> >> > the
> > > > > >> >> >> >> >> > > moment, coordinator shares the same
> NetworkClient
> > > (and
> > > > > >> hence
> > > > > >> >> the
> > > > > >> >> >> >> same
> > > > > >> >> >> >> >> > > Selector) with consumer connections used for
> > > fetching
> > > > > >> records.
> > > > > >> >> >> Since
> > > > > >> >> >> >> >> > > freeing of memory relies on consuming
> applications
> > > > > > invoking
> > > > > >> >> >> poll()
> > > > > >> >> >> >> >> after
> > > > > >> >> >> >> >> > > processing previous records and potentially
> after
> > > > > >> committing
> > > > > >> >> >> >> offsets,
> > > > > >> >> >> >> >> it
> > > > > >> >> >> >> >> > > will be good to ensure that coordinator is not
> > > blocked
> > > > > > for
> > > > > >> >> read
> > > > > >> >> >> by
> > > > > >> >> >> >> >> fetch
> > > > > >> >> >> >> >> > > responses. This may be simpler once coordinator
> > has
> > > > its
> > > > > > own
> > > > > >> >> >> >> Selector.
> > > > > >> >> >> >> >> > >
> > > > > >> >> >> >> >> > > 2. The KIP says: *Once messages are returned to
> > the
> > > > > > user,
> > > > > >> >> >> messages
> > > > > >> >> >> >> are
> > > > > >> >> >> >> >> > > deleted from the MemoryPool so new messages can
> be
> > > > > > stored.*
> > > > > >> >> >> >> >> > > Can you expand that a bit? I am assuming that
> > > partial
> > > > > >> buffers
> > > > > >> >> >> never
> > > > > >> >> >> >> get
> > > > > >> >> >> >> >> > > freed when some messages are returned to the
> user
> > > > since
> > > > > > the
> > > > > >> >> >> >> consumer is
> > > > > >> >> >> >> >> > > still holding a reference to the buffer. Would
> > > buffers
> > > > > > be
> > > > > >> >> freed
> > > > > >> >> >> when
> > > > > >> >> >> >> >> > > fetches for all the partitions in a response are
> > > > parsed,
> > > > > >> but
> > > > > >> >> >> perhaps
> > > > > >> >> >> >> >> not
> > > > > >> >> >> >> >> > > yet returned to the user (i.e., is the memory
> > freed
> > > > when
> > > > > > a
> > > > > >> >> >> >> reference to
> > > > > >> >> >> >> >> > the
> > > > > >> >> >> >> >> > > response buffer is no longer required)? It will
> be
> > > > good
> > > > > > to
> > > > > >> >> >> document
> > > > > >> >> >> >> the
> > > > > >> >> >> >> >> > > (approximate) maximum memory requirement for the
> > > > > >> >> non-compressed
> > > > > >> >> >> >> case.
> > > > > >> >> >> >> >> > There
> > > > > >> >> >> >> >> > > is data read from the socket, cached in the
> > Fetcher
> > > > and
> > > > > > (as
> > > > > >> >> Radai
> > > > > >> >> >> >> has
> > > > > >> >> >> >> >> > > pointed out), the records still with the user
> > > > > > application.
> > > > > >> >> >> >> >> > >
> > > > > >> >> >> >> >> > >
> > > > > >> >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
> > > > > >> >> >> radai.rosenblatt@gmail.com>
> > > > > >> >> >> >> >> > wrote:
> > > > > >> >> >> >> >> > >
> > > > > >> >> >> >> >> > >> +1 (non-binding).
> > > > > >> >> >> >> >> > >>
> > > > > >> >> >> >> >> > >> small nit pick - just because you returned a
> > > response
> > > > > > to
> > > > > >> user
> > > > > >> >> >> >> doesnt
> > > > > >> >> >> >> >> > mean
> > > > > >> >> >> >> >> > >> the memory id no longer used. for some cases
> the
> > > > actual
> > > > > >> >> "point
> > > > > >> >> >> of
> > > > > >> >> >> >> >> > >> termination" may be the deserializer (really
> > > > > >> impl-dependant),
> > > > > >> >> >> but
> > > > > >> >> >> >> >> > >> generally, wouldnt it be "nice" to have an
> > explicit
> > > > > >> dispose()
> > > > > >> >> >> call
> > > > > >> >> >> >> on
> > > > > >> >> >> >> >> > >> responses (with the addition that getting the
> > next
> > > > > > batch
> > > > > >> of
> > > > > >> >> data
> > > > > >> >> >> >> from
> > > > > >> >> >> >> >> a
> > > > > >> >> >> >> >> > >> consumer automatically disposes the previous
> > > results)
> > > > > >> >> >> >> >> > >>
> > > > > >> >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
> > > > > >> >> >> ECOMAR@uk.ibm.com>
> > > > > >> >> >> >> >> > wrote:
> > > > > >> >> >> >> >> > >>
> > > > > >> >> >> >> >> > >> > +1 (non binding)
> > > > > >> >> >> >> >> > >> > ------------------------------
> > > --------------------
> > > > > >> >> >> >> >> > >> > Edoardo Comar
> > > > > >> >> >> >> >> > >> > IBM MessageHub
> > > > > >> >> >> >> >> > >> > ecomar@uk.ibm.com
> > > > > >> >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >> > IBM United Kingdom Limited Registered in
> > England
> > > > and
> > > > > >> Wales
> > > > > >> >> >> with
> > > > > >> >> >> >> >> number
> > > > > >> >> >> >> >> > >> > 741598 Registered office: PO Box 41, North
> > > Harbour,
> > > > > >> >> >> Portsmouth,
> > > > > >> >> >> >> >> Hants.
> > > > > >> >> >> >> >> > >> PO6
> > > > > >> >> >> >> >> > >> > 3AU
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >> > From:   Mickael Maison <
> > mickael.maison@gmail.com
> > > >
> > > > > >> >> >> >> >> > >> > To:     dev@kafka.apache.org
> > > > > >> >> >> >> >> > >> > Date:   05/12/2016 14:38
> > > > > >> >> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch
> > memory
> > > > > > usage
> > > > > >> in
> > > > > >> >> the
> > > > > >> >> >> >> >> > consumer
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >> > Hi all,
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >> > I'd like to start the vote for KIP-81:
> > > > > >> >> >> >> >> > >> >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > >> >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+
> usage+in+the+consumer
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >> > Thank you
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >> > Unless stated otherwise above:
> > > > > >> >> >> >> >> > >> > IBM United Kingdom Limited - Registered in
> > > England
> > > > > > and
> > > > > >> >> Wales
> > > > > >> >> >> with
> > > > > >> >> >> >> >> > number
> > > > > >> >> >> >> >> > >> > 741598.
> > > > > >> >> >> >> >> > >> > Registered office: PO Box 41, North Harbour,
> > > > > > Portsmouth,
> > > > > >> >> >> >> Hampshire
> > > > > >> >> >> >> >> PO6
> > > > > >> >> >> >> >> > >> 3AU
> > > > > >> >> >> >> >> > >> >
> > > > > >> >> >> >> >> > >>
> > > > > >> >> >> >> >> > >
> > > > > >> >> >> >> >> > >
> > > > > >> >> >> >> >> > >
> > > > > >> >> >> >> >> > > --
> > > > > >> >> >> >> >> > > Regards,
> > > > > >> >> >> >> >> > >
> > > > > >> >> >> >> >> > > Rajini
> > > > > >> >> >> >> >> >
> > > > > >> >> >> >> >>
> > > > > >> >> >> >>
> > > > > >> >> >>
> > > > > >> >>
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > --
> > > > > >> > -- Guozhang
> > > > > >>
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > > >
> > > > > >
> > > > > > Unless stated otherwise above:
> > > > > > IBM United Kingdom Limited - Registered in England and Wales with
> > > > number
> > > > > > 741598.
> > > > > > Registered office: PO Box 41, North Harbour, Portsmouth,
> Hampshire
> > > PO6
> > > > > 3AU
> > > > >
> > > >
> > >
> >
>

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Rajini Sivaram <ra...@gmail.com>.
+1 (non-binding)

On Fri, Mar 31, 2017 at 5:36 PM, radai <ra...@gmail.com> wrote:

> possible priorities:
>
> 1. keepalives/coordination
> 2. inter-broker-traffic
> 3. produce traffic
> 4. consume traffic
>
> (dont want to start a debate, just to illustrate there may be >2 of them so
> int is better than bool)
>
> On Fri, Mar 31, 2017 at 9:10 AM, Ismael Juma <is...@juma.me.uk> wrote:
>
> > +1 from me too, thanks for the KIP.
> >
> > Ismael
> >
> > On Fri, Mar 31, 2017 at 5:06 PM, Jun Rao <ju...@confluent.io> wrote:
> >
> > > Hi, Mickael,
> > >
> > > Thanks for the KIP. +1 from me too.
> > >
> > > Jun
> > >
> > > On Thu, Mar 30, 2017 at 4:40 AM, Mickael Maison <
> > mickael.maison@gmail.com>
> > > wrote:
> > >
> > > > Thanks for the suggestion.
> > > >
> > > > Currently, I can't think of a scenario when we would need multiple
> > > > priority "levels". If in the future it makes sense to have some, I
> > > > think we could just make the change without a new KIP as these APIs
> > > > are not public.
> > > > So I'd be more inclined to keep the boolean for now.
> > > >
> > > > On Wed, Mar 29, 2017 at 6:13 PM, Edoardo Comar <EC...@uk.ibm.com>
> > > wrote:
> > > > > Hi Mickael,
> > > > > as discussed we could change the priority parameter to be an int
> > rather
> > > > > than a boolean.
> > > > >
> > > > > That's a bit more extensible
> > > > > --------------------------------------------------
> > > > > Edoardo Comar
> > > > > IBM MessageHub
> > > > > ecomar@uk.ibm.com
> > > > > IBM UK Ltd, Hursley Park, SO21 2JN
> > > > >
> > > > > IBM United Kingdom Limited Registered in England and Wales with
> > number
> > > > > 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
> > Hants.
> > > > PO6
> > > > > 3AU
> > > > >
> > > > >
> > > > >
> > > > > From:   Guozhang Wang <wa...@gmail.com>
> > > > > To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
> > > > > Date:   28/03/2017 19:02
> > > > > Subject:        Re: [VOTE] KIP-81: Bound Fetch memory usage in the
> > > > > consumer
> > > > >
> > > > >
> > > > >
> > > > > 1) Makes sense.
> > > > > 2) Makes sense. Thanks!
> > > > >
> > > > > On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison
> > > > > <mi...@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Hi Guozhang,
> > > > >>
> > > > >> Thanks for the feedback.
> > > > >>
> > > > >> 1) By MemoryPool, I mean the implementation added in KIP-72. That
> > will
> > > > >> most likely be SimpleMemoryPool, but the PR for KIP-72 has not
> been
> > > > >> merged yet.
> > > > >> I've updated the KIP to make it more obvious.
> > > > >>
> > > > >> 2) I was thinking to pass in the priority when creating the
> > > > >> Coordinator Node (in
> > > > >> https://github.com/apache/kafka/blob/trunk/clients/src/
> > > > >> main/java/org/apache/kafka/clients/consumer/internals/
> > > > >> AbstractCoordinator.java#L582)
> > > > >> Then when calling Selector.connect() (in
> > > > >> https://github.com/apache/kafka/blob/trunk/clients/src/
> > > > >> main/java/org/apache/kafka/clients/NetworkClient.java#L643)
> > > > >> retrieve it and pass it in the Selector so it uses it when
> building
> > > > >> the Channel.
> > > > >> The idea was to avoid having to deduce the connection is for the
> > > > >> Coordinator from the ID but instead have it explicitly set by
> > > > >> AbstractCoordinator (and pass it all the way down to the Channel)
> > > > >>
> > > > >> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <
> wangguoz@gmail.com>
> > > > > wrote:
> > > > >> > Mickael,
> > > > >> >
> > > > >> > Sorry for the late review of the KIP. I'm +1 on the proposed
> > change
> > > as
> > > > >> > well. Just a few minor comments on the wiki itself:
> > > > >> >
> > > > >> > 1. By the "MemoryPool" are you referring to a new class impl or
> to
> > > > >> reusing "
> > > > >> > org.apache.kafka.clients.producer.internals.BufferPool"? I
> assume
> > > it
> > > > > was
> > > > >> > the latter case, and if yes, could you update the wiki page to
> > make
> > > it
> > > > >> > clear?
> > > > >> >
> > > > >> > 2. I think it is sufficient to add the priority to KafkaChannel
> > > class,
> > > > >> but
> > > > >> > not needed in Node (but one may need to add this parameter to
> > > > > Selector#
> > > > >> > connect). Could you point me to which usage of Node needs to
> > access
> > > > > the
> > > > >> > priority?
> > > > >> >
> > > > >> >
> > > > >> > Guozhang
> > > > >> >
> > > > >> >
> > > > >> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <
> > > > >> mickael.maison@gmail.com>
> > > > >> > wrote:
> > > > >> >
> > > > >> >> Thanks Jason for the feedback! Yes it makes sense to always use
> > the
> > > > >> >> MemoryPool is we can. I've updated the KIP with the suggestion
> > > > >> >>
> > > > >> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <
> > > jason@confluent.io
> > > > >
> > > > >> >> wrote:
> > > > >> >> > Just a minor comment. The KIP suggests that coordinator
> > responses
> > > > > are
> > > > >> >> > always allocated outside of the memory pool, but maybe we can
> > > > > reserve
> > > > >> >> that
> > > > >> >> > capability for only when the pool does not have enough space?
> > It
> > > > >> seems a
> > > > >> >> > little nicer to use the pool if we can. If that seems
> > reasonable,
> > > > > I'm
> > > > >> +1
> > > > >> >> on
> > > > >> >> > the KIP. Thanks for the effort!
> > > > >> >> >
> > > > >> >> > -Jason
> > > > >> >> >
> > > > >> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
> > > > >> >> mickael.maison@gmail.com>
> > > > >> >> > wrote:
> > > > >> >> >
> > > > >> >> >> Yes I agree, having a generic flag is more future proof.
> > > > >> >> >> I'll update the KIP in the coming days.
> > > > >> >> >>
> > > > >> >> >> Thanks
> > > > >> >> >>
> > > > >> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson
> > > > > <jason@confluent.io
> > > > >> >
> > > > >> >> >> wrote:
> > > > >> >> >> > Hey Mickael,
> > > > >> >> >> >
> > > > >> >> >> > The suggestion to add something to Node makes sense. I
> could
> > > > >> imagine
> > > > >> >> for
> > > > >> >> >> > example adding a flag to indicate that the connection has
> a
> > > > > higher
> > > > >> >> >> > "priority," meaning that we can allocate outside of the
> > memory
> > > > >> pool if
> > > > >> >> >> > necessary. That would still be generic even if the only
> use
> > > case
> > > > > is
> > > > >> >> the
> > > > >> >> >> > consumer coordinator. We might also face a similar problem
> > > when
> > > > > the
> > > > >> >> >> > producer is sending requests to the transaction
> coordinator
> > > for
> > > > >> >> KIP-98.
> > > > >> >> >> > What do you think?
> > > > >> >> >> >
> > > > >> >> >> > Thanks,
> > > > >> >> >> > Jason
> > > > >> >> >> >
> > > > >> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
> > > > >> >> >> mickael.maison@gmail.com>
> > > > >> >> >> > wrote:
> > > > >> >> >> >
> > > > >> >> >> >> Apologies for the late response.
> > > > >> >> >> >>
> > > > >> >> >> >> Thanks Jason for the suggestion. Yes you are right, the
> > > > >> Coordinator
> > > > >> >> >> >> connection is "tagged" with a different id, so we could
> > > > > retrieve
> > > > >> it
> > > > >> >> in
> > > > >> >> >> >> NetworkReceive to make the distinction.
> > > > >> >> >> >> However, currently the coordinator connection are made
> > > > > different
> > > > >> by
> > > > >> >> >> using:
> > > > >> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
> > > > >> >> >> >> for the Node id.
> > > > >> >> >> >>
> > > > >> >> >> >> So to identify Coordinator connections, we'd have to
> check
> > > that
> > > > >> the
> > > > >> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE
> > which
> > > > > is a
> > > > >> >> bit
> > > > >> >> >> >> hacky ...
> > > > >> >> >> >>
> > > > >> >> >> >> Maybe we could add a constructor to Node that allows to
> > pass
> > > in
> > > > > a
> > > > >> >> >> >> sourceId String. That way we could make all the
> coordinator
> > > > >> >> >> >> connections explicit (by setting it to "Coordinator-[ID]"
> > for
> > > > >> >> >> >> example).
> > > > >> >> >> >> What do you think ?
> > > > >> >> >> >>
> > > > >> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
> > > > >> >> jason@confluent.io>
> > > > >> >> >> >> wrote:
> > > > >> >> >> >> > Good point. The consumer does use a separate connection
> > to
> > > > > the
> > > > >> >> >> >> coordinator,
> > > > >> >> >> >> > so perhaps the connection itself could be tagged for
> > normal
> > > > > heap
> > > > >> >> >> >> allocation?
> > > > >> >> >> >> >
> > > > >> >> >> >> > -Jason
> > > > >> >> >> >> >
> > > > >> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> > > > >> >> >> >> onurkaraman.apache@gmail.com
> > > > >> >> >> >> >> wrote:
> > > > >> >> >> >> >
> > > > >> >> >> >> >> I only did a quick scan but I wanted to point out
> what I
> > > > > think
> > > > >> is
> > > > >> >> an
> > > > >> >> >> >> >> incorrect assumption in the KIP's caveats:
> > > > >> >> >> >> >> "
> > > > >> >> >> >> >> There is a risk using the MemoryPool that, after we
> fill
> > > up
> > > > > the
> > > > >> >> >> memory
> > > > >> >> >> >> with
> > > > >> >> >> >> >> fetch data, we can starve the coordinator's connection
> > > > >> >> >> >> >> ...
> > > > >> >> >> >> >> To alleviate this issue, only messages larger than 1Kb
> > > will
> > > > > be
> > > > >> >> >> >> allocated in
> > > > >> >> >> >> >> the MemoryPool. Smaller messages will be allocated
> > > directly
> > > > > on
> > > > >> the
> > > > >> >> >> Heap
> > > > >> >> >> >> >> like before. This allows group/heartbeat messages to
> > avoid
> > > > >> being
> > > > >> >> >> >> delayed if
> > > > >> >> >> >> >> the MemoryPool fills up.
> > > > >> >> >> >> >> "
> > > > >> >> >> >> >>
> > > > >> >> >> >> >> So it sounds like there's an incorrect assumption that
> > > > >> responses
> > > > >> >> from
> > > > >> >> >> >> the
> > > > >> >> >> >> >> coordinator will always be small (< 1Kb as mentioned
> in
> > > the
> > > > >> >> caveat).
> > > > >> >> >> >> There
> > > > >> >> >> >> >> are now a handful of request types between clients and
> > the
> > > > >> >> >> coordinator:
> > > > >> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat,
> > > OffsetCommit,
> > > > >> >> >> OffsetFetch,
> > > > >> >> >> >> >> ListGroups, DescribeGroups}. While true (at least
> today)
> > > for
> > > > >> >> >> >> >> HeartbeatResponse and a few others, I don't think we
> can
> > > > > assume
> > > > >> >> >> >> >> JoinGroupResponse, SyncGroupResponse,
> > > > > DescribeGroupsResponse,
> > > > >> and
> > > > >> >> >> >> >> OffsetFetchResponse will be small, as they are
> > effectively
> > > > >> >> bounded by
> > > > >> >> >> >> the
> > > > >> >> >> >> >> max message size allowed by the broker for the
> > > > >> __consumer_offsets
> > > > >> >> >> topic
> > > > >> >> >> >> >> which by default is 1MB.
> > > > >> >> >> >> >>
> > > > >> >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
> > > > >> >> >> >> mickael.maison@gmail.com>
> > > > >> >> >> >> >> wrote:
> > > > >> >> >> >> >>
> > > > >> >> >> >> >> > I've updated the KIP to address all the comments
> > raised
> > > > > here
> > > > >> and
> > > > >> >> >> from
> > > > >> >> >> >> >> > the "DISCUSS" thread.
> > > > >> >> >> >> >> > See:
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > >> >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> > > > >> >> >> >> >> >
> > > > >> >> >> >> >> > Now, I'd like to restart the vote.
> > > > >> >> >> >> >> >
> > > > >> >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> > > > >> >> >> >> >> > <ra...@googlemail.com> wrote:
> > > > >> >> >> >> >> > > Hi Mickael,
> > > > >> >> >> >> >> > >
> > > > >> >> >> >> >> > > I am +1 on the overall approach of this KIP, but
> > have
> > > a
> > > > >> >> couple of
> > > > >> >> >> >> >> > comments
> > > > >> >> >> >> >> > > (sorry, should have brought them up on the discuss
> > > > > thread
> > > > >> >> >> earlier):
> > > > >> >> >> >> >> > >
> > > > >> >> >> >> >> > > 1. Perhaps it would be better to do this after
> > > > > KAFKA-4137
> > > > >> >> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137
> >
> > is
> > > > >> >> >> implemented?
> > > > >> >> >> >> At
> > > > >> >> >> >> >> > the
> > > > >> >> >> >> >> > > moment, coordinator shares the same NetworkClient
> > (and
> > > > >> hence
> > > > >> >> the
> > > > >> >> >> >> same
> > > > >> >> >> >> >> > > Selector) with consumer connections used for
> > fetching
> > > > >> records.
> > > > >> >> >> Since
> > > > >> >> >> >> >> > > freeing of memory relies on consuming applications
> > > > > invoking
> > > > >> >> >> poll()
> > > > >> >> >> >> >> after
> > > > >> >> >> >> >> > > processing previous records and potentially after
> > > > >> committing
> > > > >> >> >> >> offsets,
> > > > >> >> >> >> >> it
> > > > >> >> >> >> >> > > will be good to ensure that coordinator is not
> > blocked
> > > > > for
> > > > >> >> read
> > > > >> >> >> by
> > > > >> >> >> >> >> fetch
> > > > >> >> >> >> >> > > responses. This may be simpler once coordinator
> has
> > > its
> > > > > own
> > > > >> >> >> >> Selector.
> > > > >> >> >> >> >> > >
> > > > >> >> >> >> >> > > 2. The KIP says: *Once messages are returned to
> the
> > > > > user,
> > > > >> >> >> messages
> > > > >> >> >> >> are
> > > > >> >> >> >> >> > > deleted from the MemoryPool so new messages can be
> > > > > stored.*
> > > > >> >> >> >> >> > > Can you expand that a bit? I am assuming that
> > partial
> > > > >> buffers
> > > > >> >> >> never
> > > > >> >> >> >> get
> > > > >> >> >> >> >> > > freed when some messages are returned to the user
> > > since
> > > > > the
> > > > >> >> >> >> consumer is
> > > > >> >> >> >> >> > > still holding a reference to the buffer. Would
> > buffers
> > > > > be
> > > > >> >> freed
> > > > >> >> >> when
> > > > >> >> >> >> >> > > fetches for all the partitions in a response are
> > > parsed,
> > > > >> but
> > > > >> >> >> perhaps
> > > > >> >> >> >> >> not
> > > > >> >> >> >> >> > > yet returned to the user (i.e., is the memory
> freed
> > > when
> > > > > a
> > > > >> >> >> >> reference to
> > > > >> >> >> >> >> > the
> > > > >> >> >> >> >> > > response buffer is no longer required)? It will be
> > > good
> > > > > to
> > > > >> >> >> document
> > > > >> >> >> >> the
> > > > >> >> >> >> >> > > (approximate) maximum memory requirement for the
> > > > >> >> non-compressed
> > > > >> >> >> >> case.
> > > > >> >> >> >> >> > There
> > > > >> >> >> >> >> > > is data read from the socket, cached in the
> Fetcher
> > > and
> > > > > (as
> > > > >> >> Radai
> > > > >> >> >> >> has
> > > > >> >> >> >> >> > > pointed out), the records still with the user
> > > > > application.
> > > > >> >> >> >> >> > >
> > > > >> >> >> >> >> > >
> > > > >> >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
> > > > >> >> >> radai.rosenblatt@gmail.com>
> > > > >> >> >> >> >> > wrote:
> > > > >> >> >> >> >> > >
> > > > >> >> >> >> >> > >> +1 (non-binding).
> > > > >> >> >> >> >> > >>
> > > > >> >> >> >> >> > >> small nit pick - just because you returned a
> > response
> > > > > to
> > > > >> user
> > > > >> >> >> >> doesnt
> > > > >> >> >> >> >> > mean
> > > > >> >> >> >> >> > >> the memory id no longer used. for some cases the
> > > actual
> > > > >> >> "point
> > > > >> >> >> of
> > > > >> >> >> >> >> > >> termination" may be the deserializer (really
> > > > >> impl-dependant),
> > > > >> >> >> but
> > > > >> >> >> >> >> > >> generally, wouldnt it be "nice" to have an
> explicit
> > > > >> dispose()
> > > > >> >> >> call
> > > > >> >> >> >> on
> > > > >> >> >> >> >> > >> responses (with the addition that getting the
> next
> > > > > batch
> > > > >> of
> > > > >> >> data
> > > > >> >> >> >> from
> > > > >> >> >> >> >> a
> > > > >> >> >> >> >> > >> consumer automatically disposes the previous
> > results)
> > > > >> >> >> >> >> > >>
> > > > >> >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
> > > > >> >> >> ECOMAR@uk.ibm.com>
> > > > >> >> >> >> >> > wrote:
> > > > >> >> >> >> >> > >>
> > > > >> >> >> >> >> > >> > +1 (non binding)
> > > > >> >> >> >> >> > >> > ------------------------------
> > --------------------
> > > > >> >> >> >> >> > >> > Edoardo Comar
> > > > >> >> >> >> >> > >> > IBM MessageHub
> > > > >> >> >> >> >> > >> > ecomar@uk.ibm.com
> > > > >> >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >> > IBM United Kingdom Limited Registered in
> England
> > > and
> > > > >> Wales
> > > > >> >> >> with
> > > > >> >> >> >> >> number
> > > > >> >> >> >> >> > >> > 741598 Registered office: PO Box 41, North
> > Harbour,
> > > > >> >> >> Portsmouth,
> > > > >> >> >> >> >> Hants.
> > > > >> >> >> >> >> > >> PO6
> > > > >> >> >> >> >> > >> > 3AU
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >> > From:   Mickael Maison <
> mickael.maison@gmail.com
> > >
> > > > >> >> >> >> >> > >> > To:     dev@kafka.apache.org
> > > > >> >> >> >> >> > >> > Date:   05/12/2016 14:38
> > > > >> >> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch
> memory
> > > > > usage
> > > > >> in
> > > > >> >> the
> > > > >> >> >> >> >> > consumer
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >> > Hi all,
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >> > I'd like to start the vote for KIP-81:
> > > > >> >> >> >> >> > >> >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > >> >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >> > Thank you
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >> > Unless stated otherwise above:
> > > > >> >> >> >> >> > >> > IBM United Kingdom Limited - Registered in
> > England
> > > > > and
> > > > >> >> Wales
> > > > >> >> >> with
> > > > >> >> >> >> >> > number
> > > > >> >> >> >> >> > >> > 741598.
> > > > >> >> >> >> >> > >> > Registered office: PO Box 41, North Harbour,
> > > > > Portsmouth,
> > > > >> >> >> >> Hampshire
> > > > >> >> >> >> >> PO6
> > > > >> >> >> >> >> > >> 3AU
> > > > >> >> >> >> >> > >> >
> > > > >> >> >> >> >> > >>
> > > > >> >> >> >> >> > >
> > > > >> >> >> >> >> > >
> > > > >> >> >> >> >> > >
> > > > >> >> >> >> >> > > --
> > > > >> >> >> >> >> > > Regards,
> > > > >> >> >> >> >> > >
> > > > >> >> >> >> >> > > Rajini
> > > > >> >> >> >> >> >
> > > > >> >> >> >> >>
> > > > >> >> >> >>
> > > > >> >> >>
> > > > >> >>
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > --
> > > > >> > -- Guozhang
> > > > >>
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > > >
> > > > >
> > > > > Unless stated otherwise above:
> > > > > IBM United Kingdom Limited - Registered in England and Wales with
> > > number
> > > > > 741598.
> > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> > PO6
> > > > 3AU
> > > >
> > >
> >
>

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by radai <ra...@gmail.com>.
possible priorities:

1. keepalives/coordination
2. inter-broker-traffic
3. produce traffic
4. consume traffic

(dont want to start a debate, just to illustrate there may be >2 of them so
int is better than bool)

On Fri, Mar 31, 2017 at 9:10 AM, Ismael Juma <is...@juma.me.uk> wrote:

> +1 from me too, thanks for the KIP.
>
> Ismael
>
> On Fri, Mar 31, 2017 at 5:06 PM, Jun Rao <ju...@confluent.io> wrote:
>
> > Hi, Mickael,
> >
> > Thanks for the KIP. +1 from me too.
> >
> > Jun
> >
> > On Thu, Mar 30, 2017 at 4:40 AM, Mickael Maison <
> mickael.maison@gmail.com>
> > wrote:
> >
> > > Thanks for the suggestion.
> > >
> > > Currently, I can't think of a scenario when we would need multiple
> > > priority "levels". If in the future it makes sense to have some, I
> > > think we could just make the change without a new KIP as these APIs
> > > are not public.
> > > So I'd be more inclined to keep the boolean for now.
> > >
> > > On Wed, Mar 29, 2017 at 6:13 PM, Edoardo Comar <EC...@uk.ibm.com>
> > wrote:
> > > > Hi Mickael,
> > > > as discussed we could change the priority parameter to be an int
> rather
> > > > than a boolean.
> > > >
> > > > That's a bit more extensible
> > > > --------------------------------------------------
> > > > Edoardo Comar
> > > > IBM MessageHub
> > > > ecomar@uk.ibm.com
> > > > IBM UK Ltd, Hursley Park, SO21 2JN
> > > >
> > > > IBM United Kingdom Limited Registered in England and Wales with
> number
> > > > 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
> Hants.
> > > PO6
> > > > 3AU
> > > >
> > > >
> > > >
> > > > From:   Guozhang Wang <wa...@gmail.com>
> > > > To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
> > > > Date:   28/03/2017 19:02
> > > > Subject:        Re: [VOTE] KIP-81: Bound Fetch memory usage in the
> > > > consumer
> > > >
> > > >
> > > >
> > > > 1) Makes sense.
> > > > 2) Makes sense. Thanks!
> > > >
> > > > On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison
> > > > <mi...@gmail.com>
> > > > wrote:
> > > >
> > > >> Hi Guozhang,
> > > >>
> > > >> Thanks for the feedback.
> > > >>
> > > >> 1) By MemoryPool, I mean the implementation added in KIP-72. That
> will
> > > >> most likely be SimpleMemoryPool, but the PR for KIP-72 has not been
> > > >> merged yet.
> > > >> I've updated the KIP to make it more obvious.
> > > >>
> > > >> 2) I was thinking to pass in the priority when creating the
> > > >> Coordinator Node (in
> > > >> https://github.com/apache/kafka/blob/trunk/clients/src/
> > > >> main/java/org/apache/kafka/clients/consumer/internals/
> > > >> AbstractCoordinator.java#L582)
> > > >> Then when calling Selector.connect() (in
> > > >> https://github.com/apache/kafka/blob/trunk/clients/src/
> > > >> main/java/org/apache/kafka/clients/NetworkClient.java#L643)
> > > >> retrieve it and pass it in the Selector so it uses it when building
> > > >> the Channel.
> > > >> The idea was to avoid having to deduce the connection is for the
> > > >> Coordinator from the ID but instead have it explicitly set by
> > > >> AbstractCoordinator (and pass it all the way down to the Channel)
> > > >>
> > > >> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > >> > Mickael,
> > > >> >
> > > >> > Sorry for the late review of the KIP. I'm +1 on the proposed
> change
> > as
> > > >> > well. Just a few minor comments on the wiki itself:
> > > >> >
> > > >> > 1. By the "MemoryPool" are you referring to a new class impl or to
> > > >> reusing "
> > > >> > org.apache.kafka.clients.producer.internals.BufferPool"? I assume
> > it
> > > > was
> > > >> > the latter case, and if yes, could you update the wiki page to
> make
> > it
> > > >> > clear?
> > > >> >
> > > >> > 2. I think it is sufficient to add the priority to KafkaChannel
> > class,
> > > >> but
> > > >> > not needed in Node (but one may need to add this parameter to
> > > > Selector#
> > > >> > connect). Could you point me to which usage of Node needs to
> access
> > > > the
> > > >> > priority?
> > > >> >
> > > >> >
> > > >> > Guozhang
> > > >> >
> > > >> >
> > > >> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <
> > > >> mickael.maison@gmail.com>
> > > >> > wrote:
> > > >> >
> > > >> >> Thanks Jason for the feedback! Yes it makes sense to always use
> the
> > > >> >> MemoryPool is we can. I've updated the KIP with the suggestion
> > > >> >>
> > > >> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <
> > jason@confluent.io
> > > >
> > > >> >> wrote:
> > > >> >> > Just a minor comment. The KIP suggests that coordinator
> responses
> > > > are
> > > >> >> > always allocated outside of the memory pool, but maybe we can
> > > > reserve
> > > >> >> that
> > > >> >> > capability for only when the pool does not have enough space?
> It
> > > >> seems a
> > > >> >> > little nicer to use the pool if we can. If that seems
> reasonable,
> > > > I'm
> > > >> +1
> > > >> >> on
> > > >> >> > the KIP. Thanks for the effort!
> > > >> >> >
> > > >> >> > -Jason
> > > >> >> >
> > > >> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
> > > >> >> mickael.maison@gmail.com>
> > > >> >> > wrote:
> > > >> >> >
> > > >> >> >> Yes I agree, having a generic flag is more future proof.
> > > >> >> >> I'll update the KIP in the coming days.
> > > >> >> >>
> > > >> >> >> Thanks
> > > >> >> >>
> > > >> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson
> > > > <jason@confluent.io
> > > >> >
> > > >> >> >> wrote:
> > > >> >> >> > Hey Mickael,
> > > >> >> >> >
> > > >> >> >> > The suggestion to add something to Node makes sense. I could
> > > >> imagine
> > > >> >> for
> > > >> >> >> > example adding a flag to indicate that the connection has a
> > > > higher
> > > >> >> >> > "priority," meaning that we can allocate outside of the
> memory
> > > >> pool if
> > > >> >> >> > necessary. That would still be generic even if the only use
> > case
> > > > is
> > > >> >> the
> > > >> >> >> > consumer coordinator. We might also face a similar problem
> > when
> > > > the
> > > >> >> >> > producer is sending requests to the transaction coordinator
> > for
> > > >> >> KIP-98.
> > > >> >> >> > What do you think?
> > > >> >> >> >
> > > >> >> >> > Thanks,
> > > >> >> >> > Jason
> > > >> >> >> >
> > > >> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
> > > >> >> >> mickael.maison@gmail.com>
> > > >> >> >> > wrote:
> > > >> >> >> >
> > > >> >> >> >> Apologies for the late response.
> > > >> >> >> >>
> > > >> >> >> >> Thanks Jason for the suggestion. Yes you are right, the
> > > >> Coordinator
> > > >> >> >> >> connection is "tagged" with a different id, so we could
> > > > retrieve
> > > >> it
> > > >> >> in
> > > >> >> >> >> NetworkReceive to make the distinction.
> > > >> >> >> >> However, currently the coordinator connection are made
> > > > different
> > > >> by
> > > >> >> >> using:
> > > >> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
> > > >> >> >> >> for the Node id.
> > > >> >> >> >>
> > > >> >> >> >> So to identify Coordinator connections, we'd have to check
> > that
> > > >> the
> > > >> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE
> which
> > > > is a
> > > >> >> bit
> > > >> >> >> >> hacky ...
> > > >> >> >> >>
> > > >> >> >> >> Maybe we could add a constructor to Node that allows to
> pass
> > in
> > > > a
> > > >> >> >> >> sourceId String. That way we could make all the coordinator
> > > >> >> >> >> connections explicit (by setting it to "Coordinator-[ID]"
> for
> > > >> >> >> >> example).
> > > >> >> >> >> What do you think ?
> > > >> >> >> >>
> > > >> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
> > > >> >> jason@confluent.io>
> > > >> >> >> >> wrote:
> > > >> >> >> >> > Good point. The consumer does use a separate connection
> to
> > > > the
> > > >> >> >> >> coordinator,
> > > >> >> >> >> > so perhaps the connection itself could be tagged for
> normal
> > > > heap
> > > >> >> >> >> allocation?
> > > >> >> >> >> >
> > > >> >> >> >> > -Jason
> > > >> >> >> >> >
> > > >> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> > > >> >> >> >> onurkaraman.apache@gmail.com
> > > >> >> >> >> >> wrote:
> > > >> >> >> >> >
> > > >> >> >> >> >> I only did a quick scan but I wanted to point out what I
> > > > think
> > > >> is
> > > >> >> an
> > > >> >> >> >> >> incorrect assumption in the KIP's caveats:
> > > >> >> >> >> >> "
> > > >> >> >> >> >> There is a risk using the MemoryPool that, after we fill
> > up
> > > > the
> > > >> >> >> memory
> > > >> >> >> >> with
> > > >> >> >> >> >> fetch data, we can starve the coordinator's connection
> > > >> >> >> >> >> ...
> > > >> >> >> >> >> To alleviate this issue, only messages larger than 1Kb
> > will
> > > > be
> > > >> >> >> >> allocated in
> > > >> >> >> >> >> the MemoryPool. Smaller messages will be allocated
> > directly
> > > > on
> > > >> the
> > > >> >> >> Heap
> > > >> >> >> >> >> like before. This allows group/heartbeat messages to
> avoid
> > > >> being
> > > >> >> >> >> delayed if
> > > >> >> >> >> >> the MemoryPool fills up.
> > > >> >> >> >> >> "
> > > >> >> >> >> >>
> > > >> >> >> >> >> So it sounds like there's an incorrect assumption that
> > > >> responses
> > > >> >> from
> > > >> >> >> >> the
> > > >> >> >> >> >> coordinator will always be small (< 1Kb as mentioned in
> > the
> > > >> >> caveat).
> > > >> >> >> >> There
> > > >> >> >> >> >> are now a handful of request types between clients and
> the
> > > >> >> >> coordinator:
> > > >> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat,
> > OffsetCommit,
> > > >> >> >> OffsetFetch,
> > > >> >> >> >> >> ListGroups, DescribeGroups}. While true (at least today)
> > for
> > > >> >> >> >> >> HeartbeatResponse and a few others, I don't think we can
> > > > assume
> > > >> >> >> >> >> JoinGroupResponse, SyncGroupResponse,
> > > > DescribeGroupsResponse,
> > > >> and
> > > >> >> >> >> >> OffsetFetchResponse will be small, as they are
> effectively
> > > >> >> bounded by
> > > >> >> >> >> the
> > > >> >> >> >> >> max message size allowed by the broker for the
> > > >> __consumer_offsets
> > > >> >> >> topic
> > > >> >> >> >> >> which by default is 1MB.
> > > >> >> >> >> >>
> > > >> >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
> > > >> >> >> >> mickael.maison@gmail.com>
> > > >> >> >> >> >> wrote:
> > > >> >> >> >> >>
> > > >> >> >> >> >> > I've updated the KIP to address all the comments
> raised
> > > > here
> > > >> and
> > > >> >> >> from
> > > >> >> >> >> >> > the "DISCUSS" thread.
> > > >> >> >> >> >> > See:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >> >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> > > >> >> >> >> >> >
> > > >> >> >> >> >> > Now, I'd like to restart the vote.
> > > >> >> >> >> >> >
> > > >> >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> > > >> >> >> >> >> > <ra...@googlemail.com> wrote:
> > > >> >> >> >> >> > > Hi Mickael,
> > > >> >> >> >> >> > >
> > > >> >> >> >> >> > > I am +1 on the overall approach of this KIP, but
> have
> > a
> > > >> >> couple of
> > > >> >> >> >> >> > comments
> > > >> >> >> >> >> > > (sorry, should have brought them up on the discuss
> > > > thread
> > > >> >> >> earlier):
> > > >> >> >> >> >> > >
> > > >> >> >> >> >> > > 1. Perhaps it would be better to do this after
> > > > KAFKA-4137
> > > >> >> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137>
> is
> > > >> >> >> implemented?
> > > >> >> >> >> At
> > > >> >> >> >> >> > the
> > > >> >> >> >> >> > > moment, coordinator shares the same NetworkClient
> (and
> > > >> hence
> > > >> >> the
> > > >> >> >> >> same
> > > >> >> >> >> >> > > Selector) with consumer connections used for
> fetching
> > > >> records.
> > > >> >> >> Since
> > > >> >> >> >> >> > > freeing of memory relies on consuming applications
> > > > invoking
> > > >> >> >> poll()
> > > >> >> >> >> >> after
> > > >> >> >> >> >> > > processing previous records and potentially after
> > > >> committing
> > > >> >> >> >> offsets,
> > > >> >> >> >> >> it
> > > >> >> >> >> >> > > will be good to ensure that coordinator is not
> blocked
> > > > for
> > > >> >> read
> > > >> >> >> by
> > > >> >> >> >> >> fetch
> > > >> >> >> >> >> > > responses. This may be simpler once coordinator has
> > its
> > > > own
> > > >> >> >> >> Selector.
> > > >> >> >> >> >> > >
> > > >> >> >> >> >> > > 2. The KIP says: *Once messages are returned to the
> > > > user,
> > > >> >> >> messages
> > > >> >> >> >> are
> > > >> >> >> >> >> > > deleted from the MemoryPool so new messages can be
> > > > stored.*
> > > >> >> >> >> >> > > Can you expand that a bit? I am assuming that
> partial
> > > >> buffers
> > > >> >> >> never
> > > >> >> >> >> get
> > > >> >> >> >> >> > > freed when some messages are returned to the user
> > since
> > > > the
> > > >> >> >> >> consumer is
> > > >> >> >> >> >> > > still holding a reference to the buffer. Would
> buffers
> > > > be
> > > >> >> freed
> > > >> >> >> when
> > > >> >> >> >> >> > > fetches for all the partitions in a response are
> > parsed,
> > > >> but
> > > >> >> >> perhaps
> > > >> >> >> >> >> not
> > > >> >> >> >> >> > > yet returned to the user (i.e., is the memory freed
> > when
> > > > a
> > > >> >> >> >> reference to
> > > >> >> >> >> >> > the
> > > >> >> >> >> >> > > response buffer is no longer required)? It will be
> > good
> > > > to
> > > >> >> >> document
> > > >> >> >> >> the
> > > >> >> >> >> >> > > (approximate) maximum memory requirement for the
> > > >> >> non-compressed
> > > >> >> >> >> case.
> > > >> >> >> >> >> > There
> > > >> >> >> >> >> > > is data read from the socket, cached in the Fetcher
> > and
> > > > (as
> > > >> >> Radai
> > > >> >> >> >> has
> > > >> >> >> >> >> > > pointed out), the records still with the user
> > > > application.
> > > >> >> >> >> >> > >
> > > >> >> >> >> >> > >
> > > >> >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
> > > >> >> >> radai.rosenblatt@gmail.com>
> > > >> >> >> >> >> > wrote:
> > > >> >> >> >> >> > >
> > > >> >> >> >> >> > >> +1 (non-binding).
> > > >> >> >> >> >> > >>
> > > >> >> >> >> >> > >> small nit pick - just because you returned a
> response
> > > > to
> > > >> user
> > > >> >> >> >> doesnt
> > > >> >> >> >> >> > mean
> > > >> >> >> >> >> > >> the memory id no longer used. for some cases the
> > actual
> > > >> >> "point
> > > >> >> >> of
> > > >> >> >> >> >> > >> termination" may be the deserializer (really
> > > >> impl-dependant),
> > > >> >> >> but
> > > >> >> >> >> >> > >> generally, wouldnt it be "nice" to have an explicit
> > > >> dispose()
> > > >> >> >> call
> > > >> >> >> >> on
> > > >> >> >> >> >> > >> responses (with the addition that getting the next
> > > > batch
> > > >> of
> > > >> >> data
> > > >> >> >> >> from
> > > >> >> >> >> >> a
> > > >> >> >> >> >> > >> consumer automatically disposes the previous
> results)
> > > >> >> >> >> >> > >>
> > > >> >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
> > > >> >> >> ECOMAR@uk.ibm.com>
> > > >> >> >> >> >> > wrote:
> > > >> >> >> >> >> > >>
> > > >> >> >> >> >> > >> > +1 (non binding)
> > > >> >> >> >> >> > >> > ------------------------------
> --------------------
> > > >> >> >> >> >> > >> > Edoardo Comar
> > > >> >> >> >> >> > >> > IBM MessageHub
> > > >> >> >> >> >> > >> > ecomar@uk.ibm.com
> > > >> >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >> > IBM United Kingdom Limited Registered in England
> > and
> > > >> Wales
> > > >> >> >> with
> > > >> >> >> >> >> number
> > > >> >> >> >> >> > >> > 741598 Registered office: PO Box 41, North
> Harbour,
> > > >> >> >> Portsmouth,
> > > >> >> >> >> >> Hants.
> > > >> >> >> >> >> > >> PO6
> > > >> >> >> >> >> > >> > 3AU
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >> > From:   Mickael Maison <mickael.maison@gmail.com
> >
> > > >> >> >> >> >> > >> > To:     dev@kafka.apache.org
> > > >> >> >> >> >> > >> > Date:   05/12/2016 14:38
> > > >> >> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory
> > > > usage
> > > >> in
> > > >> >> the
> > > >> >> >> >> >> > consumer
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >> > Hi all,
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >> > I'd like to start the vote for KIP-81:
> > > >> >> >> >> >> > >> >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >> >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >> > Thank you
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >> > Unless stated otherwise above:
> > > >> >> >> >> >> > >> > IBM United Kingdom Limited - Registered in
> England
> > > > and
> > > >> >> Wales
> > > >> >> >> with
> > > >> >> >> >> >> > number
> > > >> >> >> >> >> > >> > 741598.
> > > >> >> >> >> >> > >> > Registered office: PO Box 41, North Harbour,
> > > > Portsmouth,
> > > >> >> >> >> Hampshire
> > > >> >> >> >> >> PO6
> > > >> >> >> >> >> > >> 3AU
> > > >> >> >> >> >> > >> >
> > > >> >> >> >> >> > >>
> > > >> >> >> >> >> > >
> > > >> >> >> >> >> > >
> > > >> >> >> >> >> > >
> > > >> >> >> >> >> > > --
> > > >> >> >> >> >> > > Regards,
> > > >> >> >> >> >> > >
> > > >> >> >> >> >> > > Rajini
> > > >> >> >> >> >> >
> > > >> >> >> >> >>
> > > >> >> >> >>
> > > >> >> >>
> > > >> >>
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > -- Guozhang
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > > >
> > > >
> > > > Unless stated otherwise above:
> > > > IBM United Kingdom Limited - Registered in England and Wales with
> > number
> > > > 741598.
> > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6
> > > 3AU
> > >
> >
>

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Ismael Juma <is...@juma.me.uk>.
+1 from me too, thanks for the KIP.

Ismael

On Fri, Mar 31, 2017 at 5:06 PM, Jun Rao <ju...@confluent.io> wrote:

> Hi, Mickael,
>
> Thanks for the KIP. +1 from me too.
>
> Jun
>
> On Thu, Mar 30, 2017 at 4:40 AM, Mickael Maison <mi...@gmail.com>
> wrote:
>
> > Thanks for the suggestion.
> >
> > Currently, I can't think of a scenario when we would need multiple
> > priority "levels". If in the future it makes sense to have some, I
> > think we could just make the change without a new KIP as these APIs
> > are not public.
> > So I'd be more inclined to keep the boolean for now.
> >
> > On Wed, Mar 29, 2017 at 6:13 PM, Edoardo Comar <EC...@uk.ibm.com>
> wrote:
> > > Hi Mickael,
> > > as discussed we could change the priority parameter to be an int rather
> > > than a boolean.
> > >
> > > That's a bit more extensible
> > > --------------------------------------------------
> > > Edoardo Comar
> > > IBM MessageHub
> > > ecomar@uk.ibm.com
> > > IBM UK Ltd, Hursley Park, SO21 2JN
> > >
> > > IBM United Kingdom Limited Registered in England and Wales with number
> > > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
> > PO6
> > > 3AU
> > >
> > >
> > >
> > > From:   Guozhang Wang <wa...@gmail.com>
> > > To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
> > > Date:   28/03/2017 19:02
> > > Subject:        Re: [VOTE] KIP-81: Bound Fetch memory usage in the
> > > consumer
> > >
> > >
> > >
> > > 1) Makes sense.
> > > 2) Makes sense. Thanks!
> > >
> > > On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison
> > > <mi...@gmail.com>
> > > wrote:
> > >
> > >> Hi Guozhang,
> > >>
> > >> Thanks for the feedback.
> > >>
> > >> 1) By MemoryPool, I mean the implementation added in KIP-72. That will
> > >> most likely be SimpleMemoryPool, but the PR for KIP-72 has not been
> > >> merged yet.
> > >> I've updated the KIP to make it more obvious.
> > >>
> > >> 2) I was thinking to pass in the priority when creating the
> > >> Coordinator Node (in
> > >> https://github.com/apache/kafka/blob/trunk/clients/src/
> > >> main/java/org/apache/kafka/clients/consumer/internals/
> > >> AbstractCoordinator.java#L582)
> > >> Then when calling Selector.connect() (in
> > >> https://github.com/apache/kafka/blob/trunk/clients/src/
> > >> main/java/org/apache/kafka/clients/NetworkClient.java#L643)
> > >> retrieve it and pass it in the Selector so it uses it when building
> > >> the Channel.
> > >> The idea was to avoid having to deduce the connection is for the
> > >> Coordinator from the ID but instead have it explicitly set by
> > >> AbstractCoordinator (and pass it all the way down to the Channel)
> > >>
> > >> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > >> > Mickael,
> > >> >
> > >> > Sorry for the late review of the KIP. I'm +1 on the proposed change
> as
> > >> > well. Just a few minor comments on the wiki itself:
> > >> >
> > >> > 1. By the "MemoryPool" are you referring to a new class impl or to
> > >> reusing "
> > >> > org.apache.kafka.clients.producer.internals.BufferPool"? I assume
> it
> > > was
> > >> > the latter case, and if yes, could you update the wiki page to make
> it
> > >> > clear?
> > >> >
> > >> > 2. I think it is sufficient to add the priority to KafkaChannel
> class,
> > >> but
> > >> > not needed in Node (but one may need to add this parameter to
> > > Selector#
> > >> > connect). Could you point me to which usage of Node needs to access
> > > the
> > >> > priority?
> > >> >
> > >> >
> > >> > Guozhang
> > >> >
> > >> >
> > >> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <
> > >> mickael.maison@gmail.com>
> > >> > wrote:
> > >> >
> > >> >> Thanks Jason for the feedback! Yes it makes sense to always use the
> > >> >> MemoryPool is we can. I've updated the KIP with the suggestion
> > >> >>
> > >> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <
> jason@confluent.io
> > >
> > >> >> wrote:
> > >> >> > Just a minor comment. The KIP suggests that coordinator responses
> > > are
> > >> >> > always allocated outside of the memory pool, but maybe we can
> > > reserve
> > >> >> that
> > >> >> > capability for only when the pool does not have enough space? It
> > >> seems a
> > >> >> > little nicer to use the pool if we can. If that seems reasonable,
> > > I'm
> > >> +1
> > >> >> on
> > >> >> > the KIP. Thanks for the effort!
> > >> >> >
> > >> >> > -Jason
> > >> >> >
> > >> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
> > >> >> mickael.maison@gmail.com>
> > >> >> > wrote:
> > >> >> >
> > >> >> >> Yes I agree, having a generic flag is more future proof.
> > >> >> >> I'll update the KIP in the coming days.
> > >> >> >>
> > >> >> >> Thanks
> > >> >> >>
> > >> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson
> > > <jason@confluent.io
> > >> >
> > >> >> >> wrote:
> > >> >> >> > Hey Mickael,
> > >> >> >> >
> > >> >> >> > The suggestion to add something to Node makes sense. I could
> > >> imagine
> > >> >> for
> > >> >> >> > example adding a flag to indicate that the connection has a
> > > higher
> > >> >> >> > "priority," meaning that we can allocate outside of the memory
> > >> pool if
> > >> >> >> > necessary. That would still be generic even if the only use
> case
> > > is
> > >> >> the
> > >> >> >> > consumer coordinator. We might also face a similar problem
> when
> > > the
> > >> >> >> > producer is sending requests to the transaction coordinator
> for
> > >> >> KIP-98.
> > >> >> >> > What do you think?
> > >> >> >> >
> > >> >> >> > Thanks,
> > >> >> >> > Jason
> > >> >> >> >
> > >> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
> > >> >> >> mickael.maison@gmail.com>
> > >> >> >> > wrote:
> > >> >> >> >
> > >> >> >> >> Apologies for the late response.
> > >> >> >> >>
> > >> >> >> >> Thanks Jason for the suggestion. Yes you are right, the
> > >> Coordinator
> > >> >> >> >> connection is "tagged" with a different id, so we could
> > > retrieve
> > >> it
> > >> >> in
> > >> >> >> >> NetworkReceive to make the distinction.
> > >> >> >> >> However, currently the coordinator connection are made
> > > different
> > >> by
> > >> >> >> using:
> > >> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
> > >> >> >> >> for the Node id.
> > >> >> >> >>
> > >> >> >> >> So to identify Coordinator connections, we'd have to check
> that
> > >> the
> > >> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which
> > > is a
> > >> >> bit
> > >> >> >> >> hacky ...
> > >> >> >> >>
> > >> >> >> >> Maybe we could add a constructor to Node that allows to pass
> in
> > > a
> > >> >> >> >> sourceId String. That way we could make all the coordinator
> > >> >> >> >> connections explicit (by setting it to "Coordinator-[ID]" for
> > >> >> >> >> example).
> > >> >> >> >> What do you think ?
> > >> >> >> >>
> > >> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
> > >> >> jason@confluent.io>
> > >> >> >> >> wrote:
> > >> >> >> >> > Good point. The consumer does use a separate connection to
> > > the
> > >> >> >> >> coordinator,
> > >> >> >> >> > so perhaps the connection itself could be tagged for normal
> > > heap
> > >> >> >> >> allocation?
> > >> >> >> >> >
> > >> >> >> >> > -Jason
> > >> >> >> >> >
> > >> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> > >> >> >> >> onurkaraman.apache@gmail.com
> > >> >> >> >> >> wrote:
> > >> >> >> >> >
> > >> >> >> >> >> I only did a quick scan but I wanted to point out what I
> > > think
> > >> is
> > >> >> an
> > >> >> >> >> >> incorrect assumption in the KIP's caveats:
> > >> >> >> >> >> "
> > >> >> >> >> >> There is a risk using the MemoryPool that, after we fill
> up
> > > the
> > >> >> >> memory
> > >> >> >> >> with
> > >> >> >> >> >> fetch data, we can starve the coordinator's connection
> > >> >> >> >> >> ...
> > >> >> >> >> >> To alleviate this issue, only messages larger than 1Kb
> will
> > > be
> > >> >> >> >> allocated in
> > >> >> >> >> >> the MemoryPool. Smaller messages will be allocated
> directly
> > > on
> > >> the
> > >> >> >> Heap
> > >> >> >> >> >> like before. This allows group/heartbeat messages to avoid
> > >> being
> > >> >> >> >> delayed if
> > >> >> >> >> >> the MemoryPool fills up.
> > >> >> >> >> >> "
> > >> >> >> >> >>
> > >> >> >> >> >> So it sounds like there's an incorrect assumption that
> > >> responses
> > >> >> from
> > >> >> >> >> the
> > >> >> >> >> >> coordinator will always be small (< 1Kb as mentioned in
> the
> > >> >> caveat).
> > >> >> >> >> There
> > >> >> >> >> >> are now a handful of request types between clients and the
> > >> >> >> coordinator:
> > >> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat,
> OffsetCommit,
> > >> >> >> OffsetFetch,
> > >> >> >> >> >> ListGroups, DescribeGroups}. While true (at least today)
> for
> > >> >> >> >> >> HeartbeatResponse and a few others, I don't think we can
> > > assume
> > >> >> >> >> >> JoinGroupResponse, SyncGroupResponse,
> > > DescribeGroupsResponse,
> > >> and
> > >> >> >> >> >> OffsetFetchResponse will be small, as they are effectively
> > >> >> bounded by
> > >> >> >> >> the
> > >> >> >> >> >> max message size allowed by the broker for the
> > >> __consumer_offsets
> > >> >> >> topic
> > >> >> >> >> >> which by default is 1MB.
> > >> >> >> >> >>
> > >> >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
> > >> >> >> >> mickael.maison@gmail.com>
> > >> >> >> >> >> wrote:
> > >> >> >> >> >>
> > >> >> >> >> >> > I've updated the KIP to address all the comments raised
> > > here
> > >> and
> > >> >> >> from
> > >> >> >> >> >> > the "DISCUSS" thread.
> > >> >> >> >> >> > See:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> > >> >> >> >> >> >
> > >> >> >> >> >> > Now, I'd like to restart the vote.
> > >> >> >> >> >> >
> > >> >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> > >> >> >> >> >> > <ra...@googlemail.com> wrote:
> > >> >> >> >> >> > > Hi Mickael,
> > >> >> >> >> >> > >
> > >> >> >> >> >> > > I am +1 on the overall approach of this KIP, but have
> a
> > >> >> couple of
> > >> >> >> >> >> > comments
> > >> >> >> >> >> > > (sorry, should have brought them up on the discuss
> > > thread
> > >> >> >> earlier):
> > >> >> >> >> >> > >
> > >> >> >> >> >> > > 1. Perhaps it would be better to do this after
> > > KAFKA-4137
> > >> >> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is
> > >> >> >> implemented?
> > >> >> >> >> At
> > >> >> >> >> >> > the
> > >> >> >> >> >> > > moment, coordinator shares the same NetworkClient (and
> > >> hence
> > >> >> the
> > >> >> >> >> same
> > >> >> >> >> >> > > Selector) with consumer connections used for fetching
> > >> records.
> > >> >> >> Since
> > >> >> >> >> >> > > freeing of memory relies on consuming applications
> > > invoking
> > >> >> >> poll()
> > >> >> >> >> >> after
> > >> >> >> >> >> > > processing previous records and potentially after
> > >> committing
> > >> >> >> >> offsets,
> > >> >> >> >> >> it
> > >> >> >> >> >> > > will be good to ensure that coordinator is not blocked
> > > for
> > >> >> read
> > >> >> >> by
> > >> >> >> >> >> fetch
> > >> >> >> >> >> > > responses. This may be simpler once coordinator has
> its
> > > own
> > >> >> >> >> Selector.
> > >> >> >> >> >> > >
> > >> >> >> >> >> > > 2. The KIP says: *Once messages are returned to the
> > > user,
> > >> >> >> messages
> > >> >> >> >> are
> > >> >> >> >> >> > > deleted from the MemoryPool so new messages can be
> > > stored.*
> > >> >> >> >> >> > > Can you expand that a bit? I am assuming that partial
> > >> buffers
> > >> >> >> never
> > >> >> >> >> get
> > >> >> >> >> >> > > freed when some messages are returned to the user
> since
> > > the
> > >> >> >> >> consumer is
> > >> >> >> >> >> > > still holding a reference to the buffer. Would buffers
> > > be
> > >> >> freed
> > >> >> >> when
> > >> >> >> >> >> > > fetches for all the partitions in a response are
> parsed,
> > >> but
> > >> >> >> perhaps
> > >> >> >> >> >> not
> > >> >> >> >> >> > > yet returned to the user (i.e., is the memory freed
> when
> > > a
> > >> >> >> >> reference to
> > >> >> >> >> >> > the
> > >> >> >> >> >> > > response buffer is no longer required)? It will be
> good
> > > to
> > >> >> >> document
> > >> >> >> >> the
> > >> >> >> >> >> > > (approximate) maximum memory requirement for the
> > >> >> non-compressed
> > >> >> >> >> case.
> > >> >> >> >> >> > There
> > >> >> >> >> >> > > is data read from the socket, cached in the Fetcher
> and
> > > (as
> > >> >> Radai
> > >> >> >> >> has
> > >> >> >> >> >> > > pointed out), the records still with the user
> > > application.
> > >> >> >> >> >> > >
> > >> >> >> >> >> > >
> > >> >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
> > >> >> >> radai.rosenblatt@gmail.com>
> > >> >> >> >> >> > wrote:
> > >> >> >> >> >> > >
> > >> >> >> >> >> > >> +1 (non-binding).
> > >> >> >> >> >> > >>
> > >> >> >> >> >> > >> small nit pick - just because you returned a response
> > > to
> > >> user
> > >> >> >> >> doesnt
> > >> >> >> >> >> > mean
> > >> >> >> >> >> > >> the memory id no longer used. for some cases the
> actual
> > >> >> "point
> > >> >> >> of
> > >> >> >> >> >> > >> termination" may be the deserializer (really
> > >> impl-dependant),
> > >> >> >> but
> > >> >> >> >> >> > >> generally, wouldnt it be "nice" to have an explicit
> > >> dispose()
> > >> >> >> call
> > >> >> >> >> on
> > >> >> >> >> >> > >> responses (with the addition that getting the next
> > > batch
> > >> of
> > >> >> data
> > >> >> >> >> from
> > >> >> >> >> >> a
> > >> >> >> >> >> > >> consumer automatically disposes the previous results)
> > >> >> >> >> >> > >>
> > >> >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
> > >> >> >> ECOMAR@uk.ibm.com>
> > >> >> >> >> >> > wrote:
> > >> >> >> >> >> > >>
> > >> >> >> >> >> > >> > +1 (non binding)
> > >> >> >> >> >> > >> > --------------------------------------------------
> > >> >> >> >> >> > >> > Edoardo Comar
> > >> >> >> >> >> > >> > IBM MessageHub
> > >> >> >> >> >> > >> > ecomar@uk.ibm.com
> > >> >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >> > IBM United Kingdom Limited Registered in England
> and
> > >> Wales
> > >> >> >> with
> > >> >> >> >> >> number
> > >> >> >> >> >> > >> > 741598 Registered office: PO Box 41, North Harbour,
> > >> >> >> Portsmouth,
> > >> >> >> >> >> Hants.
> > >> >> >> >> >> > >> PO6
> > >> >> >> >> >> > >> > 3AU
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >> > From:   Mickael Maison <mi...@gmail.com>
> > >> >> >> >> >> > >> > To:     dev@kafka.apache.org
> > >> >> >> >> >> > >> > Date:   05/12/2016 14:38
> > >> >> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory
> > > usage
> > >> in
> > >> >> the
> > >> >> >> >> >> > consumer
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >> > Hi all,
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >> > I'd like to start the vote for KIP-81:
> > >> >> >> >> >> > >> >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >> > Thank you
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >> > Unless stated otherwise above:
> > >> >> >> >> >> > >> > IBM United Kingdom Limited - Registered in England
> > > and
> > >> >> Wales
> > >> >> >> with
> > >> >> >> >> >> > number
> > >> >> >> >> >> > >> > 741598.
> > >> >> >> >> >> > >> > Registered office: PO Box 41, North Harbour,
> > > Portsmouth,
> > >> >> >> >> Hampshire
> > >> >> >> >> >> PO6
> > >> >> >> >> >> > >> 3AU
> > >> >> >> >> >> > >> >
> > >> >> >> >> >> > >>
> > >> >> >> >> >> > >
> > >> >> >> >> >> > >
> > >> >> >> >> >> > >
> > >> >> >> >> >> > > --
> > >> >> >> >> >> > > Regards,
> > >> >> >> >> >> > >
> > >> >> >> >> >> > > Rajini
> > >> >> >> >> >> >
> > >> >> >> >> >>
> > >> >> >> >>
> > >> >> >>
> > >> >>
> > >> >
> > >> >
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >>
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> > >
> > >
> > > Unless stated otherwise above:
> > > IBM United Kingdom Limited - Registered in England and Wales with
> number
> > > 741598.
> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> > 3AU
> >
>

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Jun Rao <ju...@confluent.io>.
Hi, Mickael,

Thanks for the KIP. +1 from me too.

Jun

On Thu, Mar 30, 2017 at 4:40 AM, Mickael Maison <mi...@gmail.com>
wrote:

> Thanks for the suggestion.
>
> Currently, I can't think of a scenario when we would need multiple
> priority "levels". If in the future it makes sense to have some, I
> think we could just make the change without a new KIP as these APIs
> are not public.
> So I'd be more inclined to keep the boolean for now.
>
> On Wed, Mar 29, 2017 at 6:13 PM, Edoardo Comar <EC...@uk.ibm.com> wrote:
> > Hi Mickael,
> > as discussed we could change the priority parameter to be an int rather
> > than a boolean.
> >
> > That's a bit more extensible
> > --------------------------------------------------
> > Edoardo Comar
> > IBM MessageHub
> > ecomar@uk.ibm.com
> > IBM UK Ltd, Hursley Park, SO21 2JN
> >
> > IBM United Kingdom Limited Registered in England and Wales with number
> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
> PO6
> > 3AU
> >
> >
> >
> > From:   Guozhang Wang <wa...@gmail.com>
> > To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
> > Date:   28/03/2017 19:02
> > Subject:        Re: [VOTE] KIP-81: Bound Fetch memory usage in the
> > consumer
> >
> >
> >
> > 1) Makes sense.
> > 2) Makes sense. Thanks!
> >
> > On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison
> > <mi...@gmail.com>
> > wrote:
> >
> >> Hi Guozhang,
> >>
> >> Thanks for the feedback.
> >>
> >> 1) By MemoryPool, I mean the implementation added in KIP-72. That will
> >> most likely be SimpleMemoryPool, but the PR for KIP-72 has not been
> >> merged yet.
> >> I've updated the KIP to make it more obvious.
> >>
> >> 2) I was thinking to pass in the priority when creating the
> >> Coordinator Node (in
> >> https://github.com/apache/kafka/blob/trunk/clients/src/
> >> main/java/org/apache/kafka/clients/consumer/internals/
> >> AbstractCoordinator.java#L582)
> >> Then when calling Selector.connect() (in
> >> https://github.com/apache/kafka/blob/trunk/clients/src/
> >> main/java/org/apache/kafka/clients/NetworkClient.java#L643)
> >> retrieve it and pass it in the Selector so it uses it when building
> >> the Channel.
> >> The idea was to avoid having to deduce the connection is for the
> >> Coordinator from the ID but instead have it explicitly set by
> >> AbstractCoordinator (and pass it all the way down to the Channel)
> >>
> >> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >> > Mickael,
> >> >
> >> > Sorry for the late review of the KIP. I'm +1 on the proposed change as
> >> > well. Just a few minor comments on the wiki itself:
> >> >
> >> > 1. By the "MemoryPool" are you referring to a new class impl or to
> >> reusing "
> >> > org.apache.kafka.clients.producer.internals.BufferPool"? I assume it
> > was
> >> > the latter case, and if yes, could you update the wiki page to make it
> >> > clear?
> >> >
> >> > 2. I think it is sufficient to add the priority to KafkaChannel class,
> >> but
> >> > not needed in Node (but one may need to add this parameter to
> > Selector#
> >> > connect). Could you point me to which usage of Node needs to access
> > the
> >> > priority?
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <
> >> mickael.maison@gmail.com>
> >> > wrote:
> >> >
> >> >> Thanks Jason for the feedback! Yes it makes sense to always use the
> >> >> MemoryPool is we can. I've updated the KIP with the suggestion
> >> >>
> >> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <jason@confluent.io
> >
> >> >> wrote:
> >> >> > Just a minor comment. The KIP suggests that coordinator responses
> > are
> >> >> > always allocated outside of the memory pool, but maybe we can
> > reserve
> >> >> that
> >> >> > capability for only when the pool does not have enough space? It
> >> seems a
> >> >> > little nicer to use the pool if we can. If that seems reasonable,
> > I'm
> >> +1
> >> >> on
> >> >> > the KIP. Thanks for the effort!
> >> >> >
> >> >> > -Jason
> >> >> >
> >> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
> >> >> mickael.maison@gmail.com>
> >> >> > wrote:
> >> >> >
> >> >> >> Yes I agree, having a generic flag is more future proof.
> >> >> >> I'll update the KIP in the coming days.
> >> >> >>
> >> >> >> Thanks
> >> >> >>
> >> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson
> > <jason@confluent.io
> >> >
> >> >> >> wrote:
> >> >> >> > Hey Mickael,
> >> >> >> >
> >> >> >> > The suggestion to add something to Node makes sense. I could
> >> imagine
> >> >> for
> >> >> >> > example adding a flag to indicate that the connection has a
> > higher
> >> >> >> > "priority," meaning that we can allocate outside of the memory
> >> pool if
> >> >> >> > necessary. That would still be generic even if the only use case
> > is
> >> >> the
> >> >> >> > consumer coordinator. We might also face a similar problem when
> > the
> >> >> >> > producer is sending requests to the transaction coordinator for
> >> >> KIP-98.
> >> >> >> > What do you think?
> >> >> >> >
> >> >> >> > Thanks,
> >> >> >> > Jason
> >> >> >> >
> >> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
> >> >> >> mickael.maison@gmail.com>
> >> >> >> > wrote:
> >> >> >> >
> >> >> >> >> Apologies for the late response.
> >> >> >> >>
> >> >> >> >> Thanks Jason for the suggestion. Yes you are right, the
> >> Coordinator
> >> >> >> >> connection is "tagged" with a different id, so we could
> > retrieve
> >> it
> >> >> in
> >> >> >> >> NetworkReceive to make the distinction.
> >> >> >> >> However, currently the coordinator connection are made
> > different
> >> by
> >> >> >> using:
> >> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
> >> >> >> >> for the Node id.
> >> >> >> >>
> >> >> >> >> So to identify Coordinator connections, we'd have to check that
> >> the
> >> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which
> > is a
> >> >> bit
> >> >> >> >> hacky ...
> >> >> >> >>
> >> >> >> >> Maybe we could add a constructor to Node that allows to pass in
> > a
> >> >> >> >> sourceId String. That way we could make all the coordinator
> >> >> >> >> connections explicit (by setting it to "Coordinator-[ID]" for
> >> >> >> >> example).
> >> >> >> >> What do you think ?
> >> >> >> >>
> >> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
> >> >> jason@confluent.io>
> >> >> >> >> wrote:
> >> >> >> >> > Good point. The consumer does use a separate connection to
> > the
> >> >> >> >> coordinator,
> >> >> >> >> > so perhaps the connection itself could be tagged for normal
> > heap
> >> >> >> >> allocation?
> >> >> >> >> >
> >> >> >> >> > -Jason
> >> >> >> >> >
> >> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> >> >> >> >> onurkaraman.apache@gmail.com
> >> >> >> >> >> wrote:
> >> >> >> >> >
> >> >> >> >> >> I only did a quick scan but I wanted to point out what I
> > think
> >> is
> >> >> an
> >> >> >> >> >> incorrect assumption in the KIP's caveats:
> >> >> >> >> >> "
> >> >> >> >> >> There is a risk using the MemoryPool that, after we fill up
> > the
> >> >> >> memory
> >> >> >> >> with
> >> >> >> >> >> fetch data, we can starve the coordinator's connection
> >> >> >> >> >> ...
> >> >> >> >> >> To alleviate this issue, only messages larger than 1Kb will
> > be
> >> >> >> >> allocated in
> >> >> >> >> >> the MemoryPool. Smaller messages will be allocated directly
> > on
> >> the
> >> >> >> Heap
> >> >> >> >> >> like before. This allows group/heartbeat messages to avoid
> >> being
> >> >> >> >> delayed if
> >> >> >> >> >> the MemoryPool fills up.
> >> >> >> >> >> "
> >> >> >> >> >>
> >> >> >> >> >> So it sounds like there's an incorrect assumption that
> >> responses
> >> >> from
> >> >> >> >> the
> >> >> >> >> >> coordinator will always be small (< 1Kb as mentioned in the
> >> >> caveat).
> >> >> >> >> There
> >> >> >> >> >> are now a handful of request types between clients and the
> >> >> >> coordinator:
> >> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit,
> >> >> >> OffsetFetch,
> >> >> >> >> >> ListGroups, DescribeGroups}. While true (at least today) for
> >> >> >> >> >> HeartbeatResponse and a few others, I don't think we can
> > assume
> >> >> >> >> >> JoinGroupResponse, SyncGroupResponse,
> > DescribeGroupsResponse,
> >> and
> >> >> >> >> >> OffsetFetchResponse will be small, as they are effectively
> >> >> bounded by
> >> >> >> >> the
> >> >> >> >> >> max message size allowed by the broker for the
> >> __consumer_offsets
> >> >> >> topic
> >> >> >> >> >> which by default is 1MB.
> >> >> >> >> >>
> >> >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
> >> >> >> >> mickael.maison@gmail.com>
> >> >> >> >> >> wrote:
> >> >> >> >> >>
> >> >> >> >> >> > I've updated the KIP to address all the comments raised
> > here
> >> and
> >> >> >> from
> >> >> >> >> >> > the "DISCUSS" thread.
> >> >> >> >> >> > See:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >> >> >> >
> >> >> >> >> >> > Now, I'd like to restart the vote.
> >> >> >> >> >> >
> >> >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> >> >> >> >> >> > <ra...@googlemail.com> wrote:
> >> >> >> >> >> > > Hi Mickael,
> >> >> >> >> >> > >
> >> >> >> >> >> > > I am +1 on the overall approach of this KIP, but have a
> >> >> couple of
> >> >> >> >> >> > comments
> >> >> >> >> >> > > (sorry, should have brought them up on the discuss
> > thread
> >> >> >> earlier):
> >> >> >> >> >> > >
> >> >> >> >> >> > > 1. Perhaps it would be better to do this after
> > KAFKA-4137
> >> >> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is
> >> >> >> implemented?
> >> >> >> >> At
> >> >> >> >> >> > the
> >> >> >> >> >> > > moment, coordinator shares the same NetworkClient (and
> >> hence
> >> >> the
> >> >> >> >> same
> >> >> >> >> >> > > Selector) with consumer connections used for fetching
> >> records.
> >> >> >> Since
> >> >> >> >> >> > > freeing of memory relies on consuming applications
> > invoking
> >> >> >> poll()
> >> >> >> >> >> after
> >> >> >> >> >> > > processing previous records and potentially after
> >> committing
> >> >> >> >> offsets,
> >> >> >> >> >> it
> >> >> >> >> >> > > will be good to ensure that coordinator is not blocked
> > for
> >> >> read
> >> >> >> by
> >> >> >> >> >> fetch
> >> >> >> >> >> > > responses. This may be simpler once coordinator has its
> > own
> >> >> >> >> Selector.
> >> >> >> >> >> > >
> >> >> >> >> >> > > 2. The KIP says: *Once messages are returned to the
> > user,
> >> >> >> messages
> >> >> >> >> are
> >> >> >> >> >> > > deleted from the MemoryPool so new messages can be
> > stored.*
> >> >> >> >> >> > > Can you expand that a bit? I am assuming that partial
> >> buffers
> >> >> >> never
> >> >> >> >> get
> >> >> >> >> >> > > freed when some messages are returned to the user since
> > the
> >> >> >> >> consumer is
> >> >> >> >> >> > > still holding a reference to the buffer. Would buffers
> > be
> >> >> freed
> >> >> >> when
> >> >> >> >> >> > > fetches for all the partitions in a response are parsed,
> >> but
> >> >> >> perhaps
> >> >> >> >> >> not
> >> >> >> >> >> > > yet returned to the user (i.e., is the memory freed when
> > a
> >> >> >> >> reference to
> >> >> >> >> >> > the
> >> >> >> >> >> > > response buffer is no longer required)? It will be good
> > to
> >> >> >> document
> >> >> >> >> the
> >> >> >> >> >> > > (approximate) maximum memory requirement for the
> >> >> non-compressed
> >> >> >> >> case.
> >> >> >> >> >> > There
> >> >> >> >> >> > > is data read from the socket, cached in the Fetcher and
> > (as
> >> >> Radai
> >> >> >> >> has
> >> >> >> >> >> > > pointed out), the records still with the user
> > application.
> >> >> >> >> >> > >
> >> >> >> >> >> > >
> >> >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
> >> >> >> radai.rosenblatt@gmail.com>
> >> >> >> >> >> > wrote:
> >> >> >> >> >> > >
> >> >> >> >> >> > >> +1 (non-binding).
> >> >> >> >> >> > >>
> >> >> >> >> >> > >> small nit pick - just because you returned a response
> > to
> >> user
> >> >> >> >> doesnt
> >> >> >> >> >> > mean
> >> >> >> >> >> > >> the memory id no longer used. for some cases the actual
> >> >> "point
> >> >> >> of
> >> >> >> >> >> > >> termination" may be the deserializer (really
> >> impl-dependant),
> >> >> >> but
> >> >> >> >> >> > >> generally, wouldnt it be "nice" to have an explicit
> >> dispose()
> >> >> >> call
> >> >> >> >> on
> >> >> >> >> >> > >> responses (with the addition that getting the next
> > batch
> >> of
> >> >> data
> >> >> >> >> from
> >> >> >> >> >> a
> >> >> >> >> >> > >> consumer automatically disposes the previous results)
> >> >> >> >> >> > >>
> >> >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
> >> >> >> ECOMAR@uk.ibm.com>
> >> >> >> >> >> > wrote:
> >> >> >> >> >> > >>
> >> >> >> >> >> > >> > +1 (non binding)
> >> >> >> >> >> > >> > --------------------------------------------------
> >> >> >> >> >> > >> > Edoardo Comar
> >> >> >> >> >> > >> > IBM MessageHub
> >> >> >> >> >> > >> > ecomar@uk.ibm.com
> >> >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> > IBM United Kingdom Limited Registered in England and
> >> Wales
> >> >> >> with
> >> >> >> >> >> number
> >> >> >> >> >> > >> > 741598 Registered office: PO Box 41, North Harbour,
> >> >> >> Portsmouth,
> >> >> >> >> >> Hants.
> >> >> >> >> >> > >> PO6
> >> >> >> >> >> > >> > 3AU
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> > From:   Mickael Maison <mi...@gmail.com>
> >> >> >> >> >> > >> > To:     dev@kafka.apache.org
> >> >> >> >> >> > >> > Date:   05/12/2016 14:38
> >> >> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory
> > usage
> >> in
> >> >> the
> >> >> >> >> >> > consumer
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> > Hi all,
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> > I'd like to start the vote for KIP-81:
> >> >> >> >> >> > >> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> > Thank you
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> > Unless stated otherwise above:
> >> >> >> >> >> > >> > IBM United Kingdom Limited - Registered in England
> > and
> >> >> Wales
> >> >> >> with
> >> >> >> >> >> > number
> >> >> >> >> >> > >> > 741598.
> >> >> >> >> >> > >> > Registered office: PO Box 41, North Harbour,
> > Portsmouth,
> >> >> >> >> Hampshire
> >> >> >> >> >> PO6
> >> >> >> >> >> > >> 3AU
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >>
> >> >> >> >> >> > >
> >> >> >> >> >> > >
> >> >> >> >> >> > >
> >> >> >> >> >> > > --
> >> >> >> >> >> > > Regards,
> >> >> >> >> >> > >
> >> >> >> >> >> > > Rajini
> >> >> >> >> >> >
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
> >
> > Unless stated otherwise above:
> > IBM United Kingdom Limited - Registered in England and Wales with number
> > 741598.
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> 3AU
>

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Mickael Maison <mi...@gmail.com>.
Thanks for the suggestion.

Currently, I can't think of a scenario when we would need multiple
priority "levels". If in the future it makes sense to have some, I
think we could just make the change without a new KIP as these APIs
are not public.
So I'd be more inclined to keep the boolean for now.

On Wed, Mar 29, 2017 at 6:13 PM, Edoardo Comar <EC...@uk.ibm.com> wrote:
> Hi Mickael,
> as discussed we could change the priority parameter to be an int rather
> than a boolean.
>
> That's a bit more extensible
> --------------------------------------------------
> Edoardo Comar
> IBM MessageHub
> ecomar@uk.ibm.com
> IBM UK Ltd, Hursley Park, SO21 2JN
>
> IBM United Kingdom Limited Registered in England and Wales with number
> 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> 3AU
>
>
>
> From:   Guozhang Wang <wa...@gmail.com>
> To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
> Date:   28/03/2017 19:02
> Subject:        Re: [VOTE] KIP-81: Bound Fetch memory usage in the
> consumer
>
>
>
> 1) Makes sense.
> 2) Makes sense. Thanks!
>
> On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison
> <mi...@gmail.com>
> wrote:
>
>> Hi Guozhang,
>>
>> Thanks for the feedback.
>>
>> 1) By MemoryPool, I mean the implementation added in KIP-72. That will
>> most likely be SimpleMemoryPool, but the PR for KIP-72 has not been
>> merged yet.
>> I've updated the KIP to make it more obvious.
>>
>> 2) I was thinking to pass in the priority when creating the
>> Coordinator Node (in
>> https://github.com/apache/kafka/blob/trunk/clients/src/
>> main/java/org/apache/kafka/clients/consumer/internals/
>> AbstractCoordinator.java#L582)
>> Then when calling Selector.connect() (in
>> https://github.com/apache/kafka/blob/trunk/clients/src/
>> main/java/org/apache/kafka/clients/NetworkClient.java#L643)
>> retrieve it and pass it in the Selector so it uses it when building
>> the Channel.
>> The idea was to avoid having to deduce the connection is for the
>> Coordinator from the ID but instead have it explicitly set by
>> AbstractCoordinator (and pass it all the way down to the Channel)
>>
>> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
>> > Mickael,
>> >
>> > Sorry for the late review of the KIP. I'm +1 on the proposed change as
>> > well. Just a few minor comments on the wiki itself:
>> >
>> > 1. By the "MemoryPool" are you referring to a new class impl or to
>> reusing "
>> > org.apache.kafka.clients.producer.internals.BufferPool"? I assume it
> was
>> > the latter case, and if yes, could you update the wiki page to make it
>> > clear?
>> >
>> > 2. I think it is sufficient to add the priority to KafkaChannel class,
>> but
>> > not needed in Node (but one may need to add this parameter to
> Selector#
>> > connect). Could you point me to which usage of Node needs to access
> the
>> > priority?
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <
>> mickael.maison@gmail.com>
>> > wrote:
>> >
>> >> Thanks Jason for the feedback! Yes it makes sense to always use the
>> >> MemoryPool is we can. I've updated the KIP with the suggestion
>> >>
>> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <ja...@confluent.io>
>> >> wrote:
>> >> > Just a minor comment. The KIP suggests that coordinator responses
> are
>> >> > always allocated outside of the memory pool, but maybe we can
> reserve
>> >> that
>> >> > capability for only when the pool does not have enough space? It
>> seems a
>> >> > little nicer to use the pool if we can. If that seems reasonable,
> I'm
>> +1
>> >> on
>> >> > the KIP. Thanks for the effort!
>> >> >
>> >> > -Jason
>> >> >
>> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
>> >> mickael.maison@gmail.com>
>> >> > wrote:
>> >> >
>> >> >> Yes I agree, having a generic flag is more future proof.
>> >> >> I'll update the KIP in the coming days.
>> >> >>
>> >> >> Thanks
>> >> >>
>> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson
> <jason@confluent.io
>> >
>> >> >> wrote:
>> >> >> > Hey Mickael,
>> >> >> >
>> >> >> > The suggestion to add something to Node makes sense. I could
>> imagine
>> >> for
>> >> >> > example adding a flag to indicate that the connection has a
> higher
>> >> >> > "priority," meaning that we can allocate outside of the memory
>> pool if
>> >> >> > necessary. That would still be generic even if the only use case
> is
>> >> the
>> >> >> > consumer coordinator. We might also face a similar problem when
> the
>> >> >> > producer is sending requests to the transaction coordinator for
>> >> KIP-98.
>> >> >> > What do you think?
>> >> >> >
>> >> >> > Thanks,
>> >> >> > Jason
>> >> >> >
>> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
>> >> >> mickael.maison@gmail.com>
>> >> >> > wrote:
>> >> >> >
>> >> >> >> Apologies for the late response.
>> >> >> >>
>> >> >> >> Thanks Jason for the suggestion. Yes you are right, the
>> Coordinator
>> >> >> >> connection is "tagged" with a different id, so we could
> retrieve
>> it
>> >> in
>> >> >> >> NetworkReceive to make the distinction.
>> >> >> >> However, currently the coordinator connection are made
> different
>> by
>> >> >> using:
>> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
>> >> >> >> for the Node id.
>> >> >> >>
>> >> >> >> So to identify Coordinator connections, we'd have to check that
>> the
>> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which
> is a
>> >> bit
>> >> >> >> hacky ...
>> >> >> >>
>> >> >> >> Maybe we could add a constructor to Node that allows to pass in
> a
>> >> >> >> sourceId String. That way we could make all the coordinator
>> >> >> >> connections explicit (by setting it to "Coordinator-[ID]" for
>> >> >> >> example).
>> >> >> >> What do you think ?
>> >> >> >>
>> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
>> >> jason@confluent.io>
>> >> >> >> wrote:
>> >> >> >> > Good point. The consumer does use a separate connection to
> the
>> >> >> >> coordinator,
>> >> >> >> > so perhaps the connection itself could be tagged for normal
> heap
>> >> >> >> allocation?
>> >> >> >> >
>> >> >> >> > -Jason
>> >> >> >> >
>> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
>> >> >> >> onurkaraman.apache@gmail.com
>> >> >> >> >> wrote:
>> >> >> >> >
>> >> >> >> >> I only did a quick scan but I wanted to point out what I
> think
>> is
>> >> an
>> >> >> >> >> incorrect assumption in the KIP's caveats:
>> >> >> >> >> "
>> >> >> >> >> There is a risk using the MemoryPool that, after we fill up
> the
>> >> >> memory
>> >> >> >> with
>> >> >> >> >> fetch data, we can starve the coordinator's connection
>> >> >> >> >> ...
>> >> >> >> >> To alleviate this issue, only messages larger than 1Kb will
> be
>> >> >> >> allocated in
>> >> >> >> >> the MemoryPool. Smaller messages will be allocated directly
> on
>> the
>> >> >> Heap
>> >> >> >> >> like before. This allows group/heartbeat messages to avoid
>> being
>> >> >> >> delayed if
>> >> >> >> >> the MemoryPool fills up.
>> >> >> >> >> "
>> >> >> >> >>
>> >> >> >> >> So it sounds like there's an incorrect assumption that
>> responses
>> >> from
>> >> >> >> the
>> >> >> >> >> coordinator will always be small (< 1Kb as mentioned in the
>> >> caveat).
>> >> >> >> There
>> >> >> >> >> are now a handful of request types between clients and the
>> >> >> coordinator:
>> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit,
>> >> >> OffsetFetch,
>> >> >> >> >> ListGroups, DescribeGroups}. While true (at least today) for
>> >> >> >> >> HeartbeatResponse and a few others, I don't think we can
> assume
>> >> >> >> >> JoinGroupResponse, SyncGroupResponse,
> DescribeGroupsResponse,
>> and
>> >> >> >> >> OffsetFetchResponse will be small, as they are effectively
>> >> bounded by
>> >> >> >> the
>> >> >> >> >> max message size allowed by the broker for the
>> __consumer_offsets
>> >> >> topic
>> >> >> >> >> which by default is 1MB.
>> >> >> >> >>
>> >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
>> >> >> >> mickael.maison@gmail.com>
>> >> >> >> >> wrote:
>> >> >> >> >>
>> >> >> >> >> > I've updated the KIP to address all the comments raised
> here
>> and
>> >> >> from
>> >> >> >> >> > the "DISCUSS" thread.
>> >> >> >> >> > See:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >> >> >> >> >
>> >> >> >> >> > Now, I'd like to restart the vote.
>> >> >> >> >> >
>> >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
>> >> >> >> >> > <ra...@googlemail.com> wrote:
>> >> >> >> >> > > Hi Mickael,
>> >> >> >> >> > >
>> >> >> >> >> > > I am +1 on the overall approach of this KIP, but have a
>> >> couple of
>> >> >> >> >> > comments
>> >> >> >> >> > > (sorry, should have brought them up on the discuss
> thread
>> >> >> earlier):
>> >> >> >> >> > >
>> >> >> >> >> > > 1. Perhaps it would be better to do this after
> KAFKA-4137
>> >> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is
>> >> >> implemented?
>> >> >> >> At
>> >> >> >> >> > the
>> >> >> >> >> > > moment, coordinator shares the same NetworkClient (and
>> hence
>> >> the
>> >> >> >> same
>> >> >> >> >> > > Selector) with consumer connections used for fetching
>> records.
>> >> >> Since
>> >> >> >> >> > > freeing of memory relies on consuming applications
> invoking
>> >> >> poll()
>> >> >> >> >> after
>> >> >> >> >> > > processing previous records and potentially after
>> committing
>> >> >> >> offsets,
>> >> >> >> >> it
>> >> >> >> >> > > will be good to ensure that coordinator is not blocked
> for
>> >> read
>> >> >> by
>> >> >> >> >> fetch
>> >> >> >> >> > > responses. This may be simpler once coordinator has its
> own
>> >> >> >> Selector.
>> >> >> >> >> > >
>> >> >> >> >> > > 2. The KIP says: *Once messages are returned to the
> user,
>> >> >> messages
>> >> >> >> are
>> >> >> >> >> > > deleted from the MemoryPool so new messages can be
> stored.*
>> >> >> >> >> > > Can you expand that a bit? I am assuming that partial
>> buffers
>> >> >> never
>> >> >> >> get
>> >> >> >> >> > > freed when some messages are returned to the user since
> the
>> >> >> >> consumer is
>> >> >> >> >> > > still holding a reference to the buffer. Would buffers
> be
>> >> freed
>> >> >> when
>> >> >> >> >> > > fetches for all the partitions in a response are parsed,
>> but
>> >> >> perhaps
>> >> >> >> >> not
>> >> >> >> >> > > yet returned to the user (i.e., is the memory freed when
> a
>> >> >> >> reference to
>> >> >> >> >> > the
>> >> >> >> >> > > response buffer is no longer required)? It will be good
> to
>> >> >> document
>> >> >> >> the
>> >> >> >> >> > > (approximate) maximum memory requirement for the
>> >> non-compressed
>> >> >> >> case.
>> >> >> >> >> > There
>> >> >> >> >> > > is data read from the socket, cached in the Fetcher and
> (as
>> >> Radai
>> >> >> >> has
>> >> >> >> >> > > pointed out), the records still with the user
> application.
>> >> >> >> >> > >
>> >> >> >> >> > >
>> >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
>> >> >> radai.rosenblatt@gmail.com>
>> >> >> >> >> > wrote:
>> >> >> >> >> > >
>> >> >> >> >> > >> +1 (non-binding).
>> >> >> >> >> > >>
>> >> >> >> >> > >> small nit pick - just because you returned a response
> to
>> user
>> >> >> >> doesnt
>> >> >> >> >> > mean
>> >> >> >> >> > >> the memory id no longer used. for some cases the actual
>> >> "point
>> >> >> of
>> >> >> >> >> > >> termination" may be the deserializer (really
>> impl-dependant),
>> >> >> but
>> >> >> >> >> > >> generally, wouldnt it be "nice" to have an explicit
>> dispose()
>> >> >> call
>> >> >> >> on
>> >> >> >> >> > >> responses (with the addition that getting the next
> batch
>> of
>> >> data
>> >> >> >> from
>> >> >> >> >> a
>> >> >> >> >> > >> consumer automatically disposes the previous results)
>> >> >> >> >> > >>
>> >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
>> >> >> ECOMAR@uk.ibm.com>
>> >> >> >> >> > wrote:
>> >> >> >> >> > >>
>> >> >> >> >> > >> > +1 (non binding)
>> >> >> >> >> > >> > --------------------------------------------------
>> >> >> >> >> > >> > Edoardo Comar
>> >> >> >> >> > >> > IBM MessageHub
>> >> >> >> >> > >> > ecomar@uk.ibm.com
>> >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
>> >> >> >> >> > >> >
>> >> >> >> >> > >> > IBM United Kingdom Limited Registered in England and
>> Wales
>> >> >> with
>> >> >> >> >> number
>> >> >> >> >> > >> > 741598 Registered office: PO Box 41, North Harbour,
>> >> >> Portsmouth,
>> >> >> >> >> Hants.
>> >> >> >> >> > >> PO6
>> >> >> >> >> > >> > 3AU
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> > From:   Mickael Maison <mi...@gmail.com>
>> >> >> >> >> > >> > To:     dev@kafka.apache.org
>> >> >> >> >> > >> > Date:   05/12/2016 14:38
>> >> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory
> usage
>> in
>> >> the
>> >> >> >> >> > consumer
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> > Hi all,
>> >> >> >> >> > >> >
>> >> >> >> >> > >> > I'd like to start the vote for KIP-81:
>> >> >> >> >> > >> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> > Thank you
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> > Unless stated otherwise above:
>> >> >> >> >> > >> > IBM United Kingdom Limited - Registered in England
> and
>> >> Wales
>> >> >> with
>> >> >> >> >> > number
>> >> >> >> >> > >> > 741598.
>> >> >> >> >> > >> > Registered office: PO Box 41, North Harbour,
> Portsmouth,
>> >> >> >> Hampshire
>> >> >> >> >> PO6
>> >> >> >> >> > >> 3AU
>> >> >> >> >> > >> >
>> >> >> >> >> > >>
>> >> >> >> >> > >
>> >> >> >> >> > >
>> >> >> >> >> > >
>> >> >> >> >> > > --
>> >> >> >> >> > > Regards,
>> >> >> >> >> > >
>> >> >> >> >> > > Rajini
>> >> >> >> >> >
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>>
>
>
>
> --
> -- Guozhang
>
>
>
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Edoardo Comar <EC...@uk.ibm.com>.
Hi Mickael,
as discussed we could change the priority parameter to be an int rather 
than a boolean.

That's a bit more extensible 
--------------------------------------------------
Edoardo Comar
IBM MessageHub
ecomar@uk.ibm.com
IBM UK Ltd, Hursley Park, SO21 2JN

IBM United Kingdom Limited Registered in England and Wales with number 
741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 
3AU



From:   Guozhang Wang <wa...@gmail.com>
To:     "dev@kafka.apache.org" <de...@kafka.apache.org>
Date:   28/03/2017 19:02
Subject:        Re: [VOTE] KIP-81: Bound Fetch memory usage in the 
consumer



1) Makes sense.
2) Makes sense. Thanks!

On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison 
<mi...@gmail.com>
wrote:

> Hi Guozhang,
>
> Thanks for the feedback.
>
> 1) By MemoryPool, I mean the implementation added in KIP-72. That will
> most likely be SimpleMemoryPool, but the PR for KIP-72 has not been
> merged yet.
> I've updated the KIP to make it more obvious.
>
> 2) I was thinking to pass in the priority when creating the
> Coordinator Node (in
> https://github.com/apache/kafka/blob/trunk/clients/src/
> main/java/org/apache/kafka/clients/consumer/internals/
> AbstractCoordinator.java#L582)
> Then when calling Selector.connect() (in
> https://github.com/apache/kafka/blob/trunk/clients/src/
> main/java/org/apache/kafka/clients/NetworkClient.java#L643)
> retrieve it and pass it in the Selector so it uses it when building
> the Channel.
> The idea was to avoid having to deduce the connection is for the
> Coordinator from the ID but instead have it explicitly set by
> AbstractCoordinator (and pass it all the way down to the Channel)
>
> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <wa...@gmail.com> 
wrote:
> > Mickael,
> >
> > Sorry for the late review of the KIP. I'm +1 on the proposed change as
> > well. Just a few minor comments on the wiki itself:
> >
> > 1. By the "MemoryPool" are you referring to a new class impl or to
> reusing "
> > org.apache.kafka.clients.producer.internals.BufferPool"? I assume it 
was
> > the latter case, and if yes, could you update the wiki page to make it
> > clear?
> >
> > 2. I think it is sufficient to add the priority to KafkaChannel class,
> but
> > not needed in Node (but one may need to add this parameter to 
Selector#
> > connect). Could you point me to which usage of Node needs to access 
the
> > priority?
> >
> >
> > Guozhang
> >
> >
> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <
> mickael.maison@gmail.com>
> > wrote:
> >
> >> Thanks Jason for the feedback! Yes it makes sense to always use the
> >> MemoryPool is we can. I've updated the KIP with the suggestion
> >>
> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <ja...@confluent.io>
> >> wrote:
> >> > Just a minor comment. The KIP suggests that coordinator responses 
are
> >> > always allocated outside of the memory pool, but maybe we can 
reserve
> >> that
> >> > capability for only when the pool does not have enough space? It
> seems a
> >> > little nicer to use the pool if we can. If that seems reasonable, 
I'm
> +1
> >> on
> >> > the KIP. Thanks for the effort!
> >> >
> >> > -Jason
> >> >
> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
> >> mickael.maison@gmail.com>
> >> > wrote:
> >> >
> >> >> Yes I agree, having a generic flag is more future proof.
> >> >> I'll update the KIP in the coming days.
> >> >>
> >> >> Thanks
> >> >>
> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson 
<jason@confluent.io
> >
> >> >> wrote:
> >> >> > Hey Mickael,
> >> >> >
> >> >> > The suggestion to add something to Node makes sense. I could
> imagine
> >> for
> >> >> > example adding a flag to indicate that the connection has a 
higher
> >> >> > "priority," meaning that we can allocate outside of the memory
> pool if
> >> >> > necessary. That would still be generic even if the only use case 
is
> >> the
> >> >> > consumer coordinator. We might also face a similar problem when 
the
> >> >> > producer is sending requests to the transaction coordinator for
> >> KIP-98.
> >> >> > What do you think?
> >> >> >
> >> >> > Thanks,
> >> >> > Jason
> >> >> >
> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
> >> >> mickael.maison@gmail.com>
> >> >> > wrote:
> >> >> >
> >> >> >> Apologies for the late response.
> >> >> >>
> >> >> >> Thanks Jason for the suggestion. Yes you are right, the
> Coordinator
> >> >> >> connection is "tagged" with a different id, so we could 
retrieve
> it
> >> in
> >> >> >> NetworkReceive to make the distinction.
> >> >> >> However, currently the coordinator connection are made 
different
> by
> >> >> using:
> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
> >> >> >> for the Node id.
> >> >> >>
> >> >> >> So to identify Coordinator connections, we'd have to check that
> the
> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which 
is a
> >> bit
> >> >> >> hacky ...
> >> >> >>
> >> >> >> Maybe we could add a constructor to Node that allows to pass in 
a
> >> >> >> sourceId String. That way we could make all the coordinator
> >> >> >> connections explicit (by setting it to "Coordinator-[ID]" for
> >> >> >> example).
> >> >> >> What do you think ?
> >> >> >>
> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
> >> jason@confluent.io>
> >> >> >> wrote:
> >> >> >> > Good point. The consumer does use a separate connection to 
the
> >> >> >> coordinator,
> >> >> >> > so perhaps the connection itself could be tagged for normal 
heap
> >> >> >> allocation?
> >> >> >> >
> >> >> >> > -Jason
> >> >> >> >
> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> >> >> >> onurkaraman.apache@gmail.com
> >> >> >> >> wrote:
> >> >> >> >
> >> >> >> >> I only did a quick scan but I wanted to point out what I 
think
> is
> >> an
> >> >> >> >> incorrect assumption in the KIP's caveats:
> >> >> >> >> "
> >> >> >> >> There is a risk using the MemoryPool that, after we fill up 
the
> >> >> memory
> >> >> >> with
> >> >> >> >> fetch data, we can starve the coordinator's connection
> >> >> >> >> ...
> >> >> >> >> To alleviate this issue, only messages larger than 1Kb will 
be
> >> >> >> allocated in
> >> >> >> >> the MemoryPool. Smaller messages will be allocated directly 
on
> the
> >> >> Heap
> >> >> >> >> like before. This allows group/heartbeat messages to avoid
> being
> >> >> >> delayed if
> >> >> >> >> the MemoryPool fills up.
> >> >> >> >> "
> >> >> >> >>
> >> >> >> >> So it sounds like there's an incorrect assumption that
> responses
> >> from
> >> >> >> the
> >> >> >> >> coordinator will always be small (< 1Kb as mentioned in the
> >> caveat).
> >> >> >> There
> >> >> >> >> are now a handful of request types between clients and the
> >> >> coordinator:
> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit,
> >> >> OffsetFetch,
> >> >> >> >> ListGroups, DescribeGroups}. While true (at least today) for
> >> >> >> >> HeartbeatResponse and a few others, I don't think we can 
assume
> >> >> >> >> JoinGroupResponse, SyncGroupResponse, 
DescribeGroupsResponse,
> and
> >> >> >> >> OffsetFetchResponse will be small, as they are effectively
> >> bounded by
> >> >> >> the
> >> >> >> >> max message size allowed by the broker for the
> __consumer_offsets
> >> >> topic
> >> >> >> >> which by default is 1MB.
> >> >> >> >>
> >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
> >> >> >> mickael.maison@gmail.com>
> >> >> >> >> wrote:
> >> >> >> >>
> >> >> >> >> > I've updated the KIP to address all the comments raised 
here
> and
> >> >> from
> >> >> >> >> > the "DISCUSS" thread.
> >> >> >> >> > See: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >> >> >
> >> >> >> >> > Now, I'd like to restart the vote.
> >> >> >> >> >
> >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> >> >> >> >> > <ra...@googlemail.com> wrote:
> >> >> >> >> > > Hi Mickael,
> >> >> >> >> > >
> >> >> >> >> > > I am +1 on the overall approach of this KIP, but have a
> >> couple of
> >> >> >> >> > comments
> >> >> >> >> > > (sorry, should have brought them up on the discuss 
thread
> >> >> earlier):
> >> >> >> >> > >
> >> >> >> >> > > 1. Perhaps it would be better to do this after 
KAFKA-4137
> >> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is
> >> >> implemented?
> >> >> >> At
> >> >> >> >> > the
> >> >> >> >> > > moment, coordinator shares the same NetworkClient (and
> hence
> >> the
> >> >> >> same
> >> >> >> >> > > Selector) with consumer connections used for fetching
> records.
> >> >> Since
> >> >> >> >> > > freeing of memory relies on consuming applications 
invoking
> >> >> poll()
> >> >> >> >> after
> >> >> >> >> > > processing previous records and potentially after
> committing
> >> >> >> offsets,
> >> >> >> >> it
> >> >> >> >> > > will be good to ensure that coordinator is not blocked 
for
> >> read
> >> >> by
> >> >> >> >> fetch
> >> >> >> >> > > responses. This may be simpler once coordinator has its 
own
> >> >> >> Selector.
> >> >> >> >> > >
> >> >> >> >> > > 2. The KIP says: *Once messages are returned to the 
user,
> >> >> messages
> >> >> >> are
> >> >> >> >> > > deleted from the MemoryPool so new messages can be 
stored.*
> >> >> >> >> > > Can you expand that a bit? I am assuming that partial
> buffers
> >> >> never
> >> >> >> get
> >> >> >> >> > > freed when some messages are returned to the user since 
the
> >> >> >> consumer is
> >> >> >> >> > > still holding a reference to the buffer. Would buffers 
be
> >> freed
> >> >> when
> >> >> >> >> > > fetches for all the partitions in a response are parsed,
> but
> >> >> perhaps
> >> >> >> >> not
> >> >> >> >> > > yet returned to the user (i.e., is the memory freed when 
a
> >> >> >> reference to
> >> >> >> >> > the
> >> >> >> >> > > response buffer is no longer required)? It will be good 
to
> >> >> document
> >> >> >> the
> >> >> >> >> > > (approximate) maximum memory requirement for the
> >> non-compressed
> >> >> >> case.
> >> >> >> >> > There
> >> >> >> >> > > is data read from the socket, cached in the Fetcher and 
(as
> >> Radai
> >> >> >> has
> >> >> >> >> > > pointed out), the records still with the user 
application.
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
> >> >> radai.rosenblatt@gmail.com>
> >> >> >> >> > wrote:
> >> >> >> >> > >
> >> >> >> >> > >> +1 (non-binding).
> >> >> >> >> > >>
> >> >> >> >> > >> small nit pick - just because you returned a response 
to
> user
> >> >> >> doesnt
> >> >> >> >> > mean
> >> >> >> >> > >> the memory id no longer used. for some cases the actual
> >> "point
> >> >> of
> >> >> >> >> > >> termination" may be the deserializer (really
> impl-dependant),
> >> >> but
> >> >> >> >> > >> generally, wouldnt it be "nice" to have an explicit
> dispose()
> >> >> call
> >> >> >> on
> >> >> >> >> > >> responses (with the addition that getting the next 
batch
> of
> >> data
> >> >> >> from
> >> >> >> >> a
> >> >> >> >> > >> consumer automatically disposes the previous results)
> >> >> >> >> > >>
> >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
> >> >> ECOMAR@uk.ibm.com>
> >> >> >> >> > wrote:
> >> >> >> >> > >>
> >> >> >> >> > >> > +1 (non binding)
> >> >> >> >> > >> > --------------------------------------------------
> >> >> >> >> > >> > Edoardo Comar
> >> >> >> >> > >> > IBM MessageHub
> >> >> >> >> > >> > ecomar@uk.ibm.com
> >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> >> >> >> >> > >> >
> >> >> >> >> > >> > IBM United Kingdom Limited Registered in England and
> Wales
> >> >> with
> >> >> >> >> number
> >> >> >> >> > >> > 741598 Registered office: PO Box 41, North Harbour,
> >> >> Portsmouth,
> >> >> >> >> Hants.
> >> >> >> >> > >> PO6
> >> >> >> >> > >> > 3AU
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> > From:   Mickael Maison <mi...@gmail.com>
> >> >> >> >> > >> > To:     dev@kafka.apache.org
> >> >> >> >> > >> > Date:   05/12/2016 14:38
> >> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory 
usage
> in
> >> the
> >> >> >> >> > consumer
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> > Hi all,
> >> >> >> >> > >> >
> >> >> >> >> > >> > I'd like to start the vote for KIP-81:
> >> >> >> >> > >> > 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> > Thank you
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> > Unless stated otherwise above:
> >> >> >> >> > >> > IBM United Kingdom Limited - Registered in England 
and
> >> Wales
> >> >> with
> >> >> >> >> > number
> >> >> >> >> > >> > 741598.
> >> >> >> >> > >> > Registered office: PO Box 41, North Harbour, 
Portsmouth,
> >> >> >> Hampshire
> >> >> >> >> PO6
> >> >> >> >> > >> 3AU
> >> >> >> >> > >> >
> >> >> >> >> > >>
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > > --
> >> >> >> >> > > Regards,
> >> >> >> >> > >
> >> >> >> >> > > Rajini
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang



Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Guozhang Wang <wa...@gmail.com>.
1) Makes sense.
2) Makes sense. Thanks!

On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison <mi...@gmail.com>
wrote:

> Hi Guozhang,
>
> Thanks for the feedback.
>
> 1) By MemoryPool, I mean the implementation added in KIP-72. That will
> most likely be SimpleMemoryPool, but the PR for KIP-72 has not been
> merged yet.
> I've updated the KIP to make it more obvious.
>
> 2) I was thinking to pass in the priority when creating the
> Coordinator Node (in
> https://github.com/apache/kafka/blob/trunk/clients/src/
> main/java/org/apache/kafka/clients/consumer/internals/
> AbstractCoordinator.java#L582)
> Then when calling Selector.connect() (in
> https://github.com/apache/kafka/blob/trunk/clients/src/
> main/java/org/apache/kafka/clients/NetworkClient.java#L643)
> retrieve it and pass it in the Selector so it uses it when building
> the Channel.
> The idea was to avoid having to deduce the connection is for the
> Coordinator from the ID but instead have it explicitly set by
> AbstractCoordinator (and pass it all the way down to the Channel)
>
> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <wa...@gmail.com> wrote:
> > Mickael,
> >
> > Sorry for the late review of the KIP. I'm +1 on the proposed change as
> > well. Just a few minor comments on the wiki itself:
> >
> > 1. By the "MemoryPool" are you referring to a new class impl or to
> reusing "
> > org.apache.kafka.clients.producer.internals.BufferPool"? I assume it was
> > the latter case, and if yes, could you update the wiki page to make it
> > clear?
> >
> > 2. I think it is sufficient to add the priority to KafkaChannel class,
> but
> > not needed in Node (but one may need to add this parameter to Selector#
> > connect). Could you point me to which usage of Node needs to access the
> > priority?
> >
> >
> > Guozhang
> >
> >
> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <
> mickael.maison@gmail.com>
> > wrote:
> >
> >> Thanks Jason for the feedback! Yes it makes sense to always use the
> >> MemoryPool is we can. I've updated the KIP with the suggestion
> >>
> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <ja...@confluent.io>
> >> wrote:
> >> > Just a minor comment. The KIP suggests that coordinator responses are
> >> > always allocated outside of the memory pool, but maybe we can reserve
> >> that
> >> > capability for only when the pool does not have enough space? It
> seems a
> >> > little nicer to use the pool if we can. If that seems reasonable, I'm
> +1
> >> on
> >> > the KIP. Thanks for the effort!
> >> >
> >> > -Jason
> >> >
> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
> >> mickael.maison@gmail.com>
> >> > wrote:
> >> >
> >> >> Yes I agree, having a generic flag is more future proof.
> >> >> I'll update the KIP in the coming days.
> >> >>
> >> >> Thanks
> >> >>
> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson <jason@confluent.io
> >
> >> >> wrote:
> >> >> > Hey Mickael,
> >> >> >
> >> >> > The suggestion to add something to Node makes sense. I could
> imagine
> >> for
> >> >> > example adding a flag to indicate that the connection has a higher
> >> >> > "priority," meaning that we can allocate outside of the memory
> pool if
> >> >> > necessary. That would still be generic even if the only use case is
> >> the
> >> >> > consumer coordinator. We might also face a similar problem when the
> >> >> > producer is sending requests to the transaction coordinator for
> >> KIP-98.
> >> >> > What do you think?
> >> >> >
> >> >> > Thanks,
> >> >> > Jason
> >> >> >
> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
> >> >> mickael.maison@gmail.com>
> >> >> > wrote:
> >> >> >
> >> >> >> Apologies for the late response.
> >> >> >>
> >> >> >> Thanks Jason for the suggestion. Yes you are right, the
> Coordinator
> >> >> >> connection is "tagged" with a different id, so we could retrieve
> it
> >> in
> >> >> >> NetworkReceive to make the distinction.
> >> >> >> However, currently the coordinator connection are made different
> by
> >> >> using:
> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
> >> >> >> for the Node id.
> >> >> >>
> >> >> >> So to identify Coordinator connections, we'd have to check that
> the
> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which is a
> >> bit
> >> >> >> hacky ...
> >> >> >>
> >> >> >> Maybe we could add a constructor to Node that allows to pass in a
> >> >> >> sourceId String. That way we could make all the coordinator
> >> >> >> connections explicit (by setting it to "Coordinator-[ID]" for
> >> >> >> example).
> >> >> >> What do you think ?
> >> >> >>
> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
> >> jason@confluent.io>
> >> >> >> wrote:
> >> >> >> > Good point. The consumer does use a separate connection to the
> >> >> >> coordinator,
> >> >> >> > so perhaps the connection itself could be tagged for normal heap
> >> >> >> allocation?
> >> >> >> >
> >> >> >> > -Jason
> >> >> >> >
> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> >> >> >> onurkaraman.apache@gmail.com
> >> >> >> >> wrote:
> >> >> >> >
> >> >> >> >> I only did a quick scan but I wanted to point out what I think
> is
> >> an
> >> >> >> >> incorrect assumption in the KIP's caveats:
> >> >> >> >> "
> >> >> >> >> There is a risk using the MemoryPool that, after we fill up the
> >> >> memory
> >> >> >> with
> >> >> >> >> fetch data, we can starve the coordinator's connection
> >> >> >> >> ...
> >> >> >> >> To alleviate this issue, only messages larger than 1Kb will be
> >> >> >> allocated in
> >> >> >> >> the MemoryPool. Smaller messages will be allocated directly on
> the
> >> >> Heap
> >> >> >> >> like before. This allows group/heartbeat messages to avoid
> being
> >> >> >> delayed if
> >> >> >> >> the MemoryPool fills up.
> >> >> >> >> "
> >> >> >> >>
> >> >> >> >> So it sounds like there's an incorrect assumption that
> responses
> >> from
> >> >> >> the
> >> >> >> >> coordinator will always be small (< 1Kb as mentioned in the
> >> caveat).
> >> >> >> There
> >> >> >> >> are now a handful of request types between clients and the
> >> >> coordinator:
> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit,
> >> >> OffsetFetch,
> >> >> >> >> ListGroups, DescribeGroups}. While true (at least today) for
> >> >> >> >> HeartbeatResponse and a few others, I don't think we can assume
> >> >> >> >> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse,
> and
> >> >> >> >> OffsetFetchResponse will be small, as they are effectively
> >> bounded by
> >> >> >> the
> >> >> >> >> max message size allowed by the broker for the
> __consumer_offsets
> >> >> topic
> >> >> >> >> which by default is 1MB.
> >> >> >> >>
> >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
> >> >> >> mickael.maison@gmail.com>
> >> >> >> >> wrote:
> >> >> >> >>
> >> >> >> >> > I've updated the KIP to address all the comments raised here
> and
> >> >> from
> >> >> >> >> > the "DISCUSS" thread.
> >> >> >> >> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >> >> >
> >> >> >> >> > Now, I'd like to restart the vote.
> >> >> >> >> >
> >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> >> >> >> >> > <ra...@googlemail.com> wrote:
> >> >> >> >> > > Hi Mickael,
> >> >> >> >> > >
> >> >> >> >> > > I am +1 on the overall approach of this KIP, but have a
> >> couple of
> >> >> >> >> > comments
> >> >> >> >> > > (sorry, should have brought them up on the discuss thread
> >> >> earlier):
> >> >> >> >> > >
> >> >> >> >> > > 1. Perhaps it would be better to do this after KAFKA-4137
> >> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is
> >> >> implemented?
> >> >> >> At
> >> >> >> >> > the
> >> >> >> >> > > moment, coordinator shares the same NetworkClient (and
> hence
> >> the
> >> >> >> same
> >> >> >> >> > > Selector) with consumer connections used for fetching
> records.
> >> >> Since
> >> >> >> >> > > freeing of memory relies on consuming applications invoking
> >> >> poll()
> >> >> >> >> after
> >> >> >> >> > > processing previous records and potentially after
> committing
> >> >> >> offsets,
> >> >> >> >> it
> >> >> >> >> > > will be good to ensure that coordinator is not blocked for
> >> read
> >> >> by
> >> >> >> >> fetch
> >> >> >> >> > > responses. This may be simpler once coordinator has its own
> >> >> >> Selector.
> >> >> >> >> > >
> >> >> >> >> > > 2. The KIP says: *Once messages are returned to the user,
> >> >> messages
> >> >> >> are
> >> >> >> >> > > deleted from the MemoryPool so new messages can be stored.*
> >> >> >> >> > > Can you expand that a bit? I am assuming that partial
> buffers
> >> >> never
> >> >> >> get
> >> >> >> >> > > freed when some messages are returned to the user since the
> >> >> >> consumer is
> >> >> >> >> > > still holding a reference to the buffer. Would buffers be
> >> freed
> >> >> when
> >> >> >> >> > > fetches for all the partitions in a response are parsed,
> but
> >> >> perhaps
> >> >> >> >> not
> >> >> >> >> > > yet returned to the user (i.e., is the memory freed when a
> >> >> >> reference to
> >> >> >> >> > the
> >> >> >> >> > > response buffer is no longer required)? It will be good to
> >> >> document
> >> >> >> the
> >> >> >> >> > > (approximate) maximum memory requirement for the
> >> non-compressed
> >> >> >> case.
> >> >> >> >> > There
> >> >> >> >> > > is data read from the socket, cached in the Fetcher and (as
> >> Radai
> >> >> >> has
> >> >> >> >> > > pointed out), the records still with the user application.
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
> >> >> radai.rosenblatt@gmail.com>
> >> >> >> >> > wrote:
> >> >> >> >> > >
> >> >> >> >> > >> +1 (non-binding).
> >> >> >> >> > >>
> >> >> >> >> > >> small nit pick - just because you returned a response to
> user
> >> >> >> doesnt
> >> >> >> >> > mean
> >> >> >> >> > >> the memory id no longer used. for some cases the actual
> >> "point
> >> >> of
> >> >> >> >> > >> termination" may be the deserializer (really
> impl-dependant),
> >> >> but
> >> >> >> >> > >> generally, wouldnt it be "nice" to have an explicit
> dispose()
> >> >> call
> >> >> >> on
> >> >> >> >> > >> responses (with the addition that getting the next batch
> of
> >> data
> >> >> >> from
> >> >> >> >> a
> >> >> >> >> > >> consumer automatically disposes the previous results)
> >> >> >> >> > >>
> >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
> >> >> ECOMAR@uk.ibm.com>
> >> >> >> >> > wrote:
> >> >> >> >> > >>
> >> >> >> >> > >> > +1 (non binding)
> >> >> >> >> > >> > --------------------------------------------------
> >> >> >> >> > >> > Edoardo Comar
> >> >> >> >> > >> > IBM MessageHub
> >> >> >> >> > >> > ecomar@uk.ibm.com
> >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> >> >> >> >> > >> >
> >> >> >> >> > >> > IBM United Kingdom Limited Registered in England and
> Wales
> >> >> with
> >> >> >> >> number
> >> >> >> >> > >> > 741598 Registered office: PO Box 41, North Harbour,
> >> >> Portsmouth,
> >> >> >> >> Hants.
> >> >> >> >> > >> PO6
> >> >> >> >> > >> > 3AU
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> > From:   Mickael Maison <mi...@gmail.com>
> >> >> >> >> > >> > To:     dev@kafka.apache.org
> >> >> >> >> > >> > Date:   05/12/2016 14:38
> >> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory usage
> in
> >> the
> >> >> >> >> > consumer
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> > Hi all,
> >> >> >> >> > >> >
> >> >> >> >> > >> > I'd like to start the vote for KIP-81:
> >> >> >> >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> > Thank you
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> > Unless stated otherwise above:
> >> >> >> >> > >> > IBM United Kingdom Limited - Registered in England and
> >> Wales
> >> >> with
> >> >> >> >> > number
> >> >> >> >> > >> > 741598.
> >> >> >> >> > >> > Registered office: PO Box 41, North Harbour, Portsmouth,
> >> >> >> Hampshire
> >> >> >> >> PO6
> >> >> >> >> > >> 3AU
> >> >> >> >> > >> >
> >> >> >> >> > >>
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > > --
> >> >> >> >> > > Regards,
> >> >> >> >> > >
> >> >> >> >> > > Rajini
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Mickael Maison <mi...@gmail.com>.
Hi Guozhang,

Thanks for the feedback.

1) By MemoryPool, I mean the implementation added in KIP-72. That will
most likely be SimpleMemoryPool, but the PR for KIP-72 has not been
merged yet.
I've updated the KIP to make it more obvious.

2) I was thinking to pass in the priority when creating the
Coordinator Node (in
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L582)
Then when calling Selector.connect() (in
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L643)
retrieve it and pass it in the Selector so it uses it when building
the Channel.
The idea was to avoid having to deduce the connection is for the
Coordinator from the ID but instead have it explicitly set by
AbstractCoordinator (and pass it all the way down to the Channel)

On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <wa...@gmail.com> wrote:
> Mickael,
>
> Sorry for the late review of the KIP. I'm +1 on the proposed change as
> well. Just a few minor comments on the wiki itself:
>
> 1. By the "MemoryPool" are you referring to a new class impl or to reusing "
> org.apache.kafka.clients.producer.internals.BufferPool"? I assume it was
> the latter case, and if yes, could you update the wiki page to make it
> clear?
>
> 2. I think it is sufficient to add the priority to KafkaChannel class, but
> not needed in Node (but one may need to add this parameter to Selector#
> connect). Could you point me to which usage of Node needs to access the
> priority?
>
>
> Guozhang
>
>
> On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <mi...@gmail.com>
> wrote:
>
>> Thanks Jason for the feedback! Yes it makes sense to always use the
>> MemoryPool is we can. I've updated the KIP with the suggestion
>>
>> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <ja...@confluent.io>
>> wrote:
>> > Just a minor comment. The KIP suggests that coordinator responses are
>> > always allocated outside of the memory pool, but maybe we can reserve
>> that
>> > capability for only when the pool does not have enough space? It seems a
>> > little nicer to use the pool if we can. If that seems reasonable, I'm +1
>> on
>> > the KIP. Thanks for the effort!
>> >
>> > -Jason
>> >
>> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
>> mickael.maison@gmail.com>
>> > wrote:
>> >
>> >> Yes I agree, having a generic flag is more future proof.
>> >> I'll update the KIP in the coming days.
>> >>
>> >> Thanks
>> >>
>> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson <ja...@confluent.io>
>> >> wrote:
>> >> > Hey Mickael,
>> >> >
>> >> > The suggestion to add something to Node makes sense. I could imagine
>> for
>> >> > example adding a flag to indicate that the connection has a higher
>> >> > "priority," meaning that we can allocate outside of the memory pool if
>> >> > necessary. That would still be generic even if the only use case is
>> the
>> >> > consumer coordinator. We might also face a similar problem when the
>> >> > producer is sending requests to the transaction coordinator for
>> KIP-98.
>> >> > What do you think?
>> >> >
>> >> > Thanks,
>> >> > Jason
>> >> >
>> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
>> >> mickael.maison@gmail.com>
>> >> > wrote:
>> >> >
>> >> >> Apologies for the late response.
>> >> >>
>> >> >> Thanks Jason for the suggestion. Yes you are right, the Coordinator
>> >> >> connection is "tagged" with a different id, so we could retrieve it
>> in
>> >> >> NetworkReceive to make the distinction.
>> >> >> However, currently the coordinator connection are made different by
>> >> using:
>> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
>> >> >> for the Node id.
>> >> >>
>> >> >> So to identify Coordinator connections, we'd have to check that the
>> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which is a
>> bit
>> >> >> hacky ...
>> >> >>
>> >> >> Maybe we could add a constructor to Node that allows to pass in a
>> >> >> sourceId String. That way we could make all the coordinator
>> >> >> connections explicit (by setting it to "Coordinator-[ID]" for
>> >> >> example).
>> >> >> What do you think ?
>> >> >>
>> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
>> jason@confluent.io>
>> >> >> wrote:
>> >> >> > Good point. The consumer does use a separate connection to the
>> >> >> coordinator,
>> >> >> > so perhaps the connection itself could be tagged for normal heap
>> >> >> allocation?
>> >> >> >
>> >> >> > -Jason
>> >> >> >
>> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
>> >> >> onurkaraman.apache@gmail.com
>> >> >> >> wrote:
>> >> >> >
>> >> >> >> I only did a quick scan but I wanted to point out what I think is
>> an
>> >> >> >> incorrect assumption in the KIP's caveats:
>> >> >> >> "
>> >> >> >> There is a risk using the MemoryPool that, after we fill up the
>> >> memory
>> >> >> with
>> >> >> >> fetch data, we can starve the coordinator's connection
>> >> >> >> ...
>> >> >> >> To alleviate this issue, only messages larger than 1Kb will be
>> >> >> allocated in
>> >> >> >> the MemoryPool. Smaller messages will be allocated directly on the
>> >> Heap
>> >> >> >> like before. This allows group/heartbeat messages to avoid being
>> >> >> delayed if
>> >> >> >> the MemoryPool fills up.
>> >> >> >> "
>> >> >> >>
>> >> >> >> So it sounds like there's an incorrect assumption that responses
>> from
>> >> >> the
>> >> >> >> coordinator will always be small (< 1Kb as mentioned in the
>> caveat).
>> >> >> There
>> >> >> >> are now a handful of request types between clients and the
>> >> coordinator:
>> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit,
>> >> OffsetFetch,
>> >> >> >> ListGroups, DescribeGroups}. While true (at least today) for
>> >> >> >> HeartbeatResponse and a few others, I don't think we can assume
>> >> >> >> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and
>> >> >> >> OffsetFetchResponse will be small, as they are effectively
>> bounded by
>> >> >> the
>> >> >> >> max message size allowed by the broker for the __consumer_offsets
>> >> topic
>> >> >> >> which by default is 1MB.
>> >> >> >>
>> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
>> >> >> mickael.maison@gmail.com>
>> >> >> >> wrote:
>> >> >> >>
>> >> >> >> > I've updated the KIP to address all the comments raised here and
>> >> from
>> >> >> >> > the "DISCUSS" thread.
>> >> >> >> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >> >> >> >
>> >> >> >> > Now, I'd like to restart the vote.
>> >> >> >> >
>> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
>> >> >> >> > <ra...@googlemail.com> wrote:
>> >> >> >> > > Hi Mickael,
>> >> >> >> > >
>> >> >> >> > > I am +1 on the overall approach of this KIP, but have a
>> couple of
>> >> >> >> > comments
>> >> >> >> > > (sorry, should have brought them up on the discuss thread
>> >> earlier):
>> >> >> >> > >
>> >> >> >> > > 1. Perhaps it would be better to do this after KAFKA-4137
>> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is
>> >> implemented?
>> >> >> At
>> >> >> >> > the
>> >> >> >> > > moment, coordinator shares the same NetworkClient (and hence
>> the
>> >> >> same
>> >> >> >> > > Selector) with consumer connections used for fetching records.
>> >> Since
>> >> >> >> > > freeing of memory relies on consuming applications invoking
>> >> poll()
>> >> >> >> after
>> >> >> >> > > processing previous records and potentially after committing
>> >> >> offsets,
>> >> >> >> it
>> >> >> >> > > will be good to ensure that coordinator is not blocked for
>> read
>> >> by
>> >> >> >> fetch
>> >> >> >> > > responses. This may be simpler once coordinator has its own
>> >> >> Selector.
>> >> >> >> > >
>> >> >> >> > > 2. The KIP says: *Once messages are returned to the user,
>> >> messages
>> >> >> are
>> >> >> >> > > deleted from the MemoryPool so new messages can be stored.*
>> >> >> >> > > Can you expand that a bit? I am assuming that partial buffers
>> >> never
>> >> >> get
>> >> >> >> > > freed when some messages are returned to the user since the
>> >> >> consumer is
>> >> >> >> > > still holding a reference to the buffer. Would buffers be
>> freed
>> >> when
>> >> >> >> > > fetches for all the partitions in a response are parsed, but
>> >> perhaps
>> >> >> >> not
>> >> >> >> > > yet returned to the user (i.e., is the memory freed when a
>> >> >> reference to
>> >> >> >> > the
>> >> >> >> > > response buffer is no longer required)? It will be good to
>> >> document
>> >> >> the
>> >> >> >> > > (approximate) maximum memory requirement for the
>> non-compressed
>> >> >> case.
>> >> >> >> > There
>> >> >> >> > > is data read from the socket, cached in the Fetcher and (as
>> Radai
>> >> >> has
>> >> >> >> > > pointed out), the records still with the user application.
>> >> >> >> > >
>> >> >> >> > >
>> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
>> >> radai.rosenblatt@gmail.com>
>> >> >> >> > wrote:
>> >> >> >> > >
>> >> >> >> > >> +1 (non-binding).
>> >> >> >> > >>
>> >> >> >> > >> small nit pick - just because you returned a response to user
>> >> >> doesnt
>> >> >> >> > mean
>> >> >> >> > >> the memory id no longer used. for some cases the actual
>> "point
>> >> of
>> >> >> >> > >> termination" may be the deserializer (really impl-dependant),
>> >> but
>> >> >> >> > >> generally, wouldnt it be "nice" to have an explicit dispose()
>> >> call
>> >> >> on
>> >> >> >> > >> responses (with the addition that getting the next batch of
>> data
>> >> >> from
>> >> >> >> a
>> >> >> >> > >> consumer automatically disposes the previous results)
>> >> >> >> > >>
>> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
>> >> ECOMAR@uk.ibm.com>
>> >> >> >> > wrote:
>> >> >> >> > >>
>> >> >> >> > >> > +1 (non binding)
>> >> >> >> > >> > --------------------------------------------------
>> >> >> >> > >> > Edoardo Comar
>> >> >> >> > >> > IBM MessageHub
>> >> >> >> > >> > ecomar@uk.ibm.com
>> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
>> >> >> >> > >> >
>> >> >> >> > >> > IBM United Kingdom Limited Registered in England and Wales
>> >> with
>> >> >> >> number
>> >> >> >> > >> > 741598 Registered office: PO Box 41, North Harbour,
>> >> Portsmouth,
>> >> >> >> Hants.
>> >> >> >> > >> PO6
>> >> >> >> > >> > 3AU
>> >> >> >> > >> >
>> >> >> >> > >> >
>> >> >> >> > >> >
>> >> >> >> > >> > From:   Mickael Maison <mi...@gmail.com>
>> >> >> >> > >> > To:     dev@kafka.apache.org
>> >> >> >> > >> > Date:   05/12/2016 14:38
>> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory usage in
>> the
>> >> >> >> > consumer
>> >> >> >> > >> >
>> >> >> >> > >> >
>> >> >> >> > >> >
>> >> >> >> > >> > Hi all,
>> >> >> >> > >> >
>> >> >> >> > >> > I'd like to start the vote for KIP-81:
>> >> >> >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >> >> >> > >> >
>> >> >> >> > >> >
>> >> >> >> > >> > Thank you
>> >> >> >> > >> >
>> >> >> >> > >> >
>> >> >> >> > >> >
>> >> >> >> > >> >
>> >> >> >> > >> > Unless stated otherwise above:
>> >> >> >> > >> > IBM United Kingdom Limited - Registered in England and
>> Wales
>> >> with
>> >> >> >> > number
>> >> >> >> > >> > 741598.
>> >> >> >> > >> > Registered office: PO Box 41, North Harbour, Portsmouth,
>> >> >> Hampshire
>> >> >> >> PO6
>> >> >> >> > >> 3AU
>> >> >> >> > >> >
>> >> >> >> > >>
>> >> >> >> > >
>> >> >> >> > >
>> >> >> >> > >
>> >> >> >> > > --
>> >> >> >> > > Regards,
>> >> >> >> > >
>> >> >> >> > > Rajini
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >>
>>
>
>
>
> --
> -- Guozhang

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Guozhang Wang <wa...@gmail.com>.
Mickael,

Sorry for the late review of the KIP. I'm +1 on the proposed change as
well. Just a few minor comments on the wiki itself:

1. By the "MemoryPool" are you referring to a new class impl or to reusing "
org.apache.kafka.clients.producer.internals.BufferPool"? I assume it was
the latter case, and if yes, could you update the wiki page to make it
clear?

2. I think it is sufficient to add the priority to KafkaChannel class, but
not needed in Node (but one may need to add this parameter to Selector#
connect). Could you point me to which usage of Node needs to access the
priority?


Guozhang


On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <mi...@gmail.com>
wrote:

> Thanks Jason for the feedback! Yes it makes sense to always use the
> MemoryPool is we can. I've updated the KIP with the suggestion
>
> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
> > Just a minor comment. The KIP suggests that coordinator responses are
> > always allocated outside of the memory pool, but maybe we can reserve
> that
> > capability for only when the pool does not have enough space? It seems a
> > little nicer to use the pool if we can. If that seems reasonable, I'm +1
> on
> > the KIP. Thanks for the effort!
> >
> > -Jason
> >
> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
> mickael.maison@gmail.com>
> > wrote:
> >
> >> Yes I agree, having a generic flag is more future proof.
> >> I'll update the KIP in the coming days.
> >>
> >> Thanks
> >>
> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson <ja...@confluent.io>
> >> wrote:
> >> > Hey Mickael,
> >> >
> >> > The suggestion to add something to Node makes sense. I could imagine
> for
> >> > example adding a flag to indicate that the connection has a higher
> >> > "priority," meaning that we can allocate outside of the memory pool if
> >> > necessary. That would still be generic even if the only use case is
> the
> >> > consumer coordinator. We might also face a similar problem when the
> >> > producer is sending requests to the transaction coordinator for
> KIP-98.
> >> > What do you think?
> >> >
> >> > Thanks,
> >> > Jason
> >> >
> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
> >> mickael.maison@gmail.com>
> >> > wrote:
> >> >
> >> >> Apologies for the late response.
> >> >>
> >> >> Thanks Jason for the suggestion. Yes you are right, the Coordinator
> >> >> connection is "tagged" with a different id, so we could retrieve it
> in
> >> >> NetworkReceive to make the distinction.
> >> >> However, currently the coordinator connection are made different by
> >> using:
> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
> >> >> for the Node id.
> >> >>
> >> >> So to identify Coordinator connections, we'd have to check that the
> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which is a
> bit
> >> >> hacky ...
> >> >>
> >> >> Maybe we could add a constructor to Node that allows to pass in a
> >> >> sourceId String. That way we could make all the coordinator
> >> >> connections explicit (by setting it to "Coordinator-[ID]" for
> >> >> example).
> >> >> What do you think ?
> >> >>
> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
> jason@confluent.io>
> >> >> wrote:
> >> >> > Good point. The consumer does use a separate connection to the
> >> >> coordinator,
> >> >> > so perhaps the connection itself could be tagged for normal heap
> >> >> allocation?
> >> >> >
> >> >> > -Jason
> >> >> >
> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> >> >> onurkaraman.apache@gmail.com
> >> >> >> wrote:
> >> >> >
> >> >> >> I only did a quick scan but I wanted to point out what I think is
> an
> >> >> >> incorrect assumption in the KIP's caveats:
> >> >> >> "
> >> >> >> There is a risk using the MemoryPool that, after we fill up the
> >> memory
> >> >> with
> >> >> >> fetch data, we can starve the coordinator's connection
> >> >> >> ...
> >> >> >> To alleviate this issue, only messages larger than 1Kb will be
> >> >> allocated in
> >> >> >> the MemoryPool. Smaller messages will be allocated directly on the
> >> Heap
> >> >> >> like before. This allows group/heartbeat messages to avoid being
> >> >> delayed if
> >> >> >> the MemoryPool fills up.
> >> >> >> "
> >> >> >>
> >> >> >> So it sounds like there's an incorrect assumption that responses
> from
> >> >> the
> >> >> >> coordinator will always be small (< 1Kb as mentioned in the
> caveat).
> >> >> There
> >> >> >> are now a handful of request types between clients and the
> >> coordinator:
> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit,
> >> OffsetFetch,
> >> >> >> ListGroups, DescribeGroups}. While true (at least today) for
> >> >> >> HeartbeatResponse and a few others, I don't think we can assume
> >> >> >> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and
> >> >> >> OffsetFetchResponse will be small, as they are effectively
> bounded by
> >> >> the
> >> >> >> max message size allowed by the broker for the __consumer_offsets
> >> topic
> >> >> >> which by default is 1MB.
> >> >> >>
> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
> >> >> mickael.maison@gmail.com>
> >> >> >> wrote:
> >> >> >>
> >> >> >> > I've updated the KIP to address all the comments raised here and
> >> from
> >> >> >> > the "DISCUSS" thread.
> >> >> >> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >> >
> >> >> >> > Now, I'd like to restart the vote.
> >> >> >> >
> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> >> >> >> > <ra...@googlemail.com> wrote:
> >> >> >> > > Hi Mickael,
> >> >> >> > >
> >> >> >> > > I am +1 on the overall approach of this KIP, but have a
> couple of
> >> >> >> > comments
> >> >> >> > > (sorry, should have brought them up on the discuss thread
> >> earlier):
> >> >> >> > >
> >> >> >> > > 1. Perhaps it would be better to do this after KAFKA-4137
> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is
> >> implemented?
> >> >> At
> >> >> >> > the
> >> >> >> > > moment, coordinator shares the same NetworkClient (and hence
> the
> >> >> same
> >> >> >> > > Selector) with consumer connections used for fetching records.
> >> Since
> >> >> >> > > freeing of memory relies on consuming applications invoking
> >> poll()
> >> >> >> after
> >> >> >> > > processing previous records and potentially after committing
> >> >> offsets,
> >> >> >> it
> >> >> >> > > will be good to ensure that coordinator is not blocked for
> read
> >> by
> >> >> >> fetch
> >> >> >> > > responses. This may be simpler once coordinator has its own
> >> >> Selector.
> >> >> >> > >
> >> >> >> > > 2. The KIP says: *Once messages are returned to the user,
> >> messages
> >> >> are
> >> >> >> > > deleted from the MemoryPool so new messages can be stored.*
> >> >> >> > > Can you expand that a bit? I am assuming that partial buffers
> >> never
> >> >> get
> >> >> >> > > freed when some messages are returned to the user since the
> >> >> consumer is
> >> >> >> > > still holding a reference to the buffer. Would buffers be
> freed
> >> when
> >> >> >> > > fetches for all the partitions in a response are parsed, but
> >> perhaps
> >> >> >> not
> >> >> >> > > yet returned to the user (i.e., is the memory freed when a
> >> >> reference to
> >> >> >> > the
> >> >> >> > > response buffer is no longer required)? It will be good to
> >> document
> >> >> the
> >> >> >> > > (approximate) maximum memory requirement for the
> non-compressed
> >> >> case.
> >> >> >> > There
> >> >> >> > > is data read from the socket, cached in the Fetcher and (as
> Radai
> >> >> has
> >> >> >> > > pointed out), the records still with the user application.
> >> >> >> > >
> >> >> >> > >
> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
> >> radai.rosenblatt@gmail.com>
> >> >> >> > wrote:
> >> >> >> > >
> >> >> >> > >> +1 (non-binding).
> >> >> >> > >>
> >> >> >> > >> small nit pick - just because you returned a response to user
> >> >> doesnt
> >> >> >> > mean
> >> >> >> > >> the memory id no longer used. for some cases the actual
> "point
> >> of
> >> >> >> > >> termination" may be the deserializer (really impl-dependant),
> >> but
> >> >> >> > >> generally, wouldnt it be "nice" to have an explicit dispose()
> >> call
> >> >> on
> >> >> >> > >> responses (with the addition that getting the next batch of
> data
> >> >> from
> >> >> >> a
> >> >> >> > >> consumer automatically disposes the previous results)
> >> >> >> > >>
> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
> >> ECOMAR@uk.ibm.com>
> >> >> >> > wrote:
> >> >> >> > >>
> >> >> >> > >> > +1 (non binding)
> >> >> >> > >> > --------------------------------------------------
> >> >> >> > >> > Edoardo Comar
> >> >> >> > >> > IBM MessageHub
> >> >> >> > >> > ecomar@uk.ibm.com
> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> >> >> >> > >> >
> >> >> >> > >> > IBM United Kingdom Limited Registered in England and Wales
> >> with
> >> >> >> number
> >> >> >> > >> > 741598 Registered office: PO Box 41, North Harbour,
> >> Portsmouth,
> >> >> >> Hants.
> >> >> >> > >> PO6
> >> >> >> > >> > 3AU
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> > From:   Mickael Maison <mi...@gmail.com>
> >> >> >> > >> > To:     dev@kafka.apache.org
> >> >> >> > >> > Date:   05/12/2016 14:38
> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory usage in
> the
> >> >> >> > consumer
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> > Hi all,
> >> >> >> > >> >
> >> >> >> > >> > I'd like to start the vote for KIP-81:
> >> >> >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> > Thank you
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> >
> >> >> >> > >> > Unless stated otherwise above:
> >> >> >> > >> > IBM United Kingdom Limited - Registered in England and
> Wales
> >> with
> >> >> >> > number
> >> >> >> > >> > 741598.
> >> >> >> > >> > Registered office: PO Box 41, North Harbour, Portsmouth,
> >> >> Hampshire
> >> >> >> PO6
> >> >> >> > >> 3AU
> >> >> >> > >> >
> >> >> >> > >>
> >> >> >> > >
> >> >> >> > >
> >> >> >> > >
> >> >> >> > > --
> >> >> >> > > Regards,
> >> >> >> > >
> >> >> >> > > Rajini
> >> >> >> >
> >> >> >>
> >> >>
> >>
>



-- 
-- Guozhang

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Mickael Maison <mi...@gmail.com>.
Thanks Jason for the feedback! Yes it makes sense to always use the
MemoryPool is we can. I've updated the KIP with the suggestion

On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <ja...@confluent.io> wrote:
> Just a minor comment. The KIP suggests that coordinator responses are
> always allocated outside of the memory pool, but maybe we can reserve that
> capability for only when the pool does not have enough space? It seems a
> little nicer to use the pool if we can. If that seems reasonable, I'm +1 on
> the KIP. Thanks for the effort!
>
> -Jason
>
> On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <mi...@gmail.com>
> wrote:
>
>> Yes I agree, having a generic flag is more future proof.
>> I'll update the KIP in the coming days.
>>
>> Thanks
>>
>> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson <ja...@confluent.io>
>> wrote:
>> > Hey Mickael,
>> >
>> > The suggestion to add something to Node makes sense. I could imagine for
>> > example adding a flag to indicate that the connection has a higher
>> > "priority," meaning that we can allocate outside of the memory pool if
>> > necessary. That would still be generic even if the only use case is the
>> > consumer coordinator. We might also face a similar problem when the
>> > producer is sending requests to the transaction coordinator for KIP-98.
>> > What do you think?
>> >
>> > Thanks,
>> > Jason
>> >
>> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
>> mickael.maison@gmail.com>
>> > wrote:
>> >
>> >> Apologies for the late response.
>> >>
>> >> Thanks Jason for the suggestion. Yes you are right, the Coordinator
>> >> connection is "tagged" with a different id, so we could retrieve it in
>> >> NetworkReceive to make the distinction.
>> >> However, currently the coordinator connection are made different by
>> using:
>> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
>> >> for the Node id.
>> >>
>> >> So to identify Coordinator connections, we'd have to check that the
>> >> NetworkReceive source is a value near Integer.MAX_VALUE which is a bit
>> >> hacky ...
>> >>
>> >> Maybe we could add a constructor to Node that allows to pass in a
>> >> sourceId String. That way we could make all the coordinator
>> >> connections explicit (by setting it to "Coordinator-[ID]" for
>> >> example).
>> >> What do you think ?
>> >>
>> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <ja...@confluent.io>
>> >> wrote:
>> >> > Good point. The consumer does use a separate connection to the
>> >> coordinator,
>> >> > so perhaps the connection itself could be tagged for normal heap
>> >> allocation?
>> >> >
>> >> > -Jason
>> >> >
>> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
>> >> onurkaraman.apache@gmail.com
>> >> >> wrote:
>> >> >
>> >> >> I only did a quick scan but I wanted to point out what I think is an
>> >> >> incorrect assumption in the KIP's caveats:
>> >> >> "
>> >> >> There is a risk using the MemoryPool that, after we fill up the
>> memory
>> >> with
>> >> >> fetch data, we can starve the coordinator's connection
>> >> >> ...
>> >> >> To alleviate this issue, only messages larger than 1Kb will be
>> >> allocated in
>> >> >> the MemoryPool. Smaller messages will be allocated directly on the
>> Heap
>> >> >> like before. This allows group/heartbeat messages to avoid being
>> >> delayed if
>> >> >> the MemoryPool fills up.
>> >> >> "
>> >> >>
>> >> >> So it sounds like there's an incorrect assumption that responses from
>> >> the
>> >> >> coordinator will always be small (< 1Kb as mentioned in the caveat).
>> >> There
>> >> >> are now a handful of request types between clients and the
>> coordinator:
>> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit,
>> OffsetFetch,
>> >> >> ListGroups, DescribeGroups}. While true (at least today) for
>> >> >> HeartbeatResponse and a few others, I don't think we can assume
>> >> >> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and
>> >> >> OffsetFetchResponse will be small, as they are effectively bounded by
>> >> the
>> >> >> max message size allowed by the broker for the __consumer_offsets
>> topic
>> >> >> which by default is 1MB.
>> >> >>
>> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
>> >> mickael.maison@gmail.com>
>> >> >> wrote:
>> >> >>
>> >> >> > I've updated the KIP to address all the comments raised here and
>> from
>> >> >> > the "DISCUSS" thread.
>> >> >> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >> >> >
>> >> >> > Now, I'd like to restart the vote.
>> >> >> >
>> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
>> >> >> > <ra...@googlemail.com> wrote:
>> >> >> > > Hi Mickael,
>> >> >> > >
>> >> >> > > I am +1 on the overall approach of this KIP, but have a couple of
>> >> >> > comments
>> >> >> > > (sorry, should have brought them up on the discuss thread
>> earlier):
>> >> >> > >
>> >> >> > > 1. Perhaps it would be better to do this after KAFKA-4137
>> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is
>> implemented?
>> >> At
>> >> >> > the
>> >> >> > > moment, coordinator shares the same NetworkClient (and hence the
>> >> same
>> >> >> > > Selector) with consumer connections used for fetching records.
>> Since
>> >> >> > > freeing of memory relies on consuming applications invoking
>> poll()
>> >> >> after
>> >> >> > > processing previous records and potentially after committing
>> >> offsets,
>> >> >> it
>> >> >> > > will be good to ensure that coordinator is not blocked for read
>> by
>> >> >> fetch
>> >> >> > > responses. This may be simpler once coordinator has its own
>> >> Selector.
>> >> >> > >
>> >> >> > > 2. The KIP says: *Once messages are returned to the user,
>> messages
>> >> are
>> >> >> > > deleted from the MemoryPool so new messages can be stored.*
>> >> >> > > Can you expand that a bit? I am assuming that partial buffers
>> never
>> >> get
>> >> >> > > freed when some messages are returned to the user since the
>> >> consumer is
>> >> >> > > still holding a reference to the buffer. Would buffers be freed
>> when
>> >> >> > > fetches for all the partitions in a response are parsed, but
>> perhaps
>> >> >> not
>> >> >> > > yet returned to the user (i.e., is the memory freed when a
>> >> reference to
>> >> >> > the
>> >> >> > > response buffer is no longer required)? It will be good to
>> document
>> >> the
>> >> >> > > (approximate) maximum memory requirement for the non-compressed
>> >> case.
>> >> >> > There
>> >> >> > > is data read from the socket, cached in the Fetcher and (as Radai
>> >> has
>> >> >> > > pointed out), the records still with the user application.
>> >> >> > >
>> >> >> > >
>> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
>> radai.rosenblatt@gmail.com>
>> >> >> > wrote:
>> >> >> > >
>> >> >> > >> +1 (non-binding).
>> >> >> > >>
>> >> >> > >> small nit pick - just because you returned a response to user
>> >> doesnt
>> >> >> > mean
>> >> >> > >> the memory id no longer used. for some cases the actual "point
>> of
>> >> >> > >> termination" may be the deserializer (really impl-dependant),
>> but
>> >> >> > >> generally, wouldnt it be "nice" to have an explicit dispose()
>> call
>> >> on
>> >> >> > >> responses (with the addition that getting the next batch of data
>> >> from
>> >> >> a
>> >> >> > >> consumer automatically disposes the previous results)
>> >> >> > >>
>> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
>> ECOMAR@uk.ibm.com>
>> >> >> > wrote:
>> >> >> > >>
>> >> >> > >> > +1 (non binding)
>> >> >> > >> > --------------------------------------------------
>> >> >> > >> > Edoardo Comar
>> >> >> > >> > IBM MessageHub
>> >> >> > >> > ecomar@uk.ibm.com
>> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
>> >> >> > >> >
>> >> >> > >> > IBM United Kingdom Limited Registered in England and Wales
>> with
>> >> >> number
>> >> >> > >> > 741598 Registered office: PO Box 41, North Harbour,
>> Portsmouth,
>> >> >> Hants.
>> >> >> > >> PO6
>> >> >> > >> > 3AU
>> >> >> > >> >
>> >> >> > >> >
>> >> >> > >> >
>> >> >> > >> > From:   Mickael Maison <mi...@gmail.com>
>> >> >> > >> > To:     dev@kafka.apache.org
>> >> >> > >> > Date:   05/12/2016 14:38
>> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory usage in the
>> >> >> > consumer
>> >> >> > >> >
>> >> >> > >> >
>> >> >> > >> >
>> >> >> > >> > Hi all,
>> >> >> > >> >
>> >> >> > >> > I'd like to start the vote for KIP-81:
>> >> >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >> >> > >> >
>> >> >> > >> >
>> >> >> > >> > Thank you
>> >> >> > >> >
>> >> >> > >> >
>> >> >> > >> >
>> >> >> > >> >
>> >> >> > >> > Unless stated otherwise above:
>> >> >> > >> > IBM United Kingdom Limited - Registered in England and Wales
>> with
>> >> >> > number
>> >> >> > >> > 741598.
>> >> >> > >> > Registered office: PO Box 41, North Harbour, Portsmouth,
>> >> Hampshire
>> >> >> PO6
>> >> >> > >> 3AU
>> >> >> > >> >
>> >> >> > >>
>> >> >> > >
>> >> >> > >
>> >> >> > >
>> >> >> > > --
>> >> >> > > Regards,
>> >> >> > >
>> >> >> > > Rajini
>> >> >> >
>> >> >>
>> >>
>>

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Jason Gustafson <ja...@confluent.io>.
Just a minor comment. The KIP suggests that coordinator responses are
always allocated outside of the memory pool, but maybe we can reserve that
capability for only when the pool does not have enough space? It seems a
little nicer to use the pool if we can. If that seems reasonable, I'm +1 on
the KIP. Thanks for the effort!

-Jason

On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <mi...@gmail.com>
wrote:

> Yes I agree, having a generic flag is more future proof.
> I'll update the KIP in the coming days.
>
> Thanks
>
> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
> > Hey Mickael,
> >
> > The suggestion to add something to Node makes sense. I could imagine for
> > example adding a flag to indicate that the connection has a higher
> > "priority," meaning that we can allocate outside of the memory pool if
> > necessary. That would still be generic even if the only use case is the
> > consumer coordinator. We might also face a similar problem when the
> > producer is sending requests to the transaction coordinator for KIP-98.
> > What do you think?
> >
> > Thanks,
> > Jason
> >
> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
> mickael.maison@gmail.com>
> > wrote:
> >
> >> Apologies for the late response.
> >>
> >> Thanks Jason for the suggestion. Yes you are right, the Coordinator
> >> connection is "tagged" with a different id, so we could retrieve it in
> >> NetworkReceive to make the distinction.
> >> However, currently the coordinator connection are made different by
> using:
> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
> >> for the Node id.
> >>
> >> So to identify Coordinator connections, we'd have to check that the
> >> NetworkReceive source is a value near Integer.MAX_VALUE which is a bit
> >> hacky ...
> >>
> >> Maybe we could add a constructor to Node that allows to pass in a
> >> sourceId String. That way we could make all the coordinator
> >> connections explicit (by setting it to "Coordinator-[ID]" for
> >> example).
> >> What do you think ?
> >>
> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <ja...@confluent.io>
> >> wrote:
> >> > Good point. The consumer does use a separate connection to the
> >> coordinator,
> >> > so perhaps the connection itself could be tagged for normal heap
> >> allocation?
> >> >
> >> > -Jason
> >> >
> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> >> onurkaraman.apache@gmail.com
> >> >> wrote:
> >> >
> >> >> I only did a quick scan but I wanted to point out what I think is an
> >> >> incorrect assumption in the KIP's caveats:
> >> >> "
> >> >> There is a risk using the MemoryPool that, after we fill up the
> memory
> >> with
> >> >> fetch data, we can starve the coordinator's connection
> >> >> ...
> >> >> To alleviate this issue, only messages larger than 1Kb will be
> >> allocated in
> >> >> the MemoryPool. Smaller messages will be allocated directly on the
> Heap
> >> >> like before. This allows group/heartbeat messages to avoid being
> >> delayed if
> >> >> the MemoryPool fills up.
> >> >> "
> >> >>
> >> >> So it sounds like there's an incorrect assumption that responses from
> >> the
> >> >> coordinator will always be small (< 1Kb as mentioned in the caveat).
> >> There
> >> >> are now a handful of request types between clients and the
> coordinator:
> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit,
> OffsetFetch,
> >> >> ListGroups, DescribeGroups}. While true (at least today) for
> >> >> HeartbeatResponse and a few others, I don't think we can assume
> >> >> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and
> >> >> OffsetFetchResponse will be small, as they are effectively bounded by
> >> the
> >> >> max message size allowed by the broker for the __consumer_offsets
> topic
> >> >> which by default is 1MB.
> >> >>
> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
> >> mickael.maison@gmail.com>
> >> >> wrote:
> >> >>
> >> >> > I've updated the KIP to address all the comments raised here and
> from
> >> >> > the "DISCUSS" thread.
> >> >> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >
> >> >> > Now, I'd like to restart the vote.
> >> >> >
> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> >> >> > <ra...@googlemail.com> wrote:
> >> >> > > Hi Mickael,
> >> >> > >
> >> >> > > I am +1 on the overall approach of this KIP, but have a couple of
> >> >> > comments
> >> >> > > (sorry, should have brought them up on the discuss thread
> earlier):
> >> >> > >
> >> >> > > 1. Perhaps it would be better to do this after KAFKA-4137
> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is
> implemented?
> >> At
> >> >> > the
> >> >> > > moment, coordinator shares the same NetworkClient (and hence the
> >> same
> >> >> > > Selector) with consumer connections used for fetching records.
> Since
> >> >> > > freeing of memory relies on consuming applications invoking
> poll()
> >> >> after
> >> >> > > processing previous records and potentially after committing
> >> offsets,
> >> >> it
> >> >> > > will be good to ensure that coordinator is not blocked for read
> by
> >> >> fetch
> >> >> > > responses. This may be simpler once coordinator has its own
> >> Selector.
> >> >> > >
> >> >> > > 2. The KIP says: *Once messages are returned to the user,
> messages
> >> are
> >> >> > > deleted from the MemoryPool so new messages can be stored.*
> >> >> > > Can you expand that a bit? I am assuming that partial buffers
> never
> >> get
> >> >> > > freed when some messages are returned to the user since the
> >> consumer is
> >> >> > > still holding a reference to the buffer. Would buffers be freed
> when
> >> >> > > fetches for all the partitions in a response are parsed, but
> perhaps
> >> >> not
> >> >> > > yet returned to the user (i.e., is the memory freed when a
> >> reference to
> >> >> > the
> >> >> > > response buffer is no longer required)? It will be good to
> document
> >> the
> >> >> > > (approximate) maximum memory requirement for the non-compressed
> >> case.
> >> >> > There
> >> >> > > is data read from the socket, cached in the Fetcher and (as Radai
> >> has
> >> >> > > pointed out), the records still with the user application.
> >> >> > >
> >> >> > >
> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
> radai.rosenblatt@gmail.com>
> >> >> > wrote:
> >> >> > >
> >> >> > >> +1 (non-binding).
> >> >> > >>
> >> >> > >> small nit pick - just because you returned a response to user
> >> doesnt
> >> >> > mean
> >> >> > >> the memory id no longer used. for some cases the actual "point
> of
> >> >> > >> termination" may be the deserializer (really impl-dependant),
> but
> >> >> > >> generally, wouldnt it be "nice" to have an explicit dispose()
> call
> >> on
> >> >> > >> responses (with the addition that getting the next batch of data
> >> from
> >> >> a
> >> >> > >> consumer automatically disposes the previous results)
> >> >> > >>
> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
> ECOMAR@uk.ibm.com>
> >> >> > wrote:
> >> >> > >>
> >> >> > >> > +1 (non binding)
> >> >> > >> > --------------------------------------------------
> >> >> > >> > Edoardo Comar
> >> >> > >> > IBM MessageHub
> >> >> > >> > ecomar@uk.ibm.com
> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> >> >> > >> >
> >> >> > >> > IBM United Kingdom Limited Registered in England and Wales
> with
> >> >> number
> >> >> > >> > 741598 Registered office: PO Box 41, North Harbour,
> Portsmouth,
> >> >> Hants.
> >> >> > >> PO6
> >> >> > >> > 3AU
> >> >> > >> >
> >> >> > >> >
> >> >> > >> >
> >> >> > >> > From:   Mickael Maison <mi...@gmail.com>
> >> >> > >> > To:     dev@kafka.apache.org
> >> >> > >> > Date:   05/12/2016 14:38
> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory usage in the
> >> >> > consumer
> >> >> > >> >
> >> >> > >> >
> >> >> > >> >
> >> >> > >> > Hi all,
> >> >> > >> >
> >> >> > >> > I'd like to start the vote for KIP-81:
> >> >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> > >> >
> >> >> > >> >
> >> >> > >> > Thank you
> >> >> > >> >
> >> >> > >> >
> >> >> > >> >
> >> >> > >> >
> >> >> > >> > Unless stated otherwise above:
> >> >> > >> > IBM United Kingdom Limited - Registered in England and Wales
> with
> >> >> > number
> >> >> > >> > 741598.
> >> >> > >> > Registered office: PO Box 41, North Harbour, Portsmouth,
> >> Hampshire
> >> >> PO6
> >> >> > >> 3AU
> >> >> > >> >
> >> >> > >>
> >> >> > >
> >> >> > >
> >> >> > >
> >> >> > > --
> >> >> > > Regards,
> >> >> > >
> >> >> > > Rajini
> >> >> >
> >> >>
> >>
>

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Mickael Maison <mi...@gmail.com>.
Yes I agree, having a generic flag is more future proof.
I'll update the KIP in the coming days.

Thanks

On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson <ja...@confluent.io> wrote:
> Hey Mickael,
>
> The suggestion to add something to Node makes sense. I could imagine for
> example adding a flag to indicate that the connection has a higher
> "priority," meaning that we can allocate outside of the memory pool if
> necessary. That would still be generic even if the only use case is the
> consumer coordinator. We might also face a similar problem when the
> producer is sending requests to the transaction coordinator for KIP-98.
> What do you think?
>
> Thanks,
> Jason
>
> On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <mi...@gmail.com>
> wrote:
>
>> Apologies for the late response.
>>
>> Thanks Jason for the suggestion. Yes you are right, the Coordinator
>> connection is "tagged" with a different id, so we could retrieve it in
>> NetworkReceive to make the distinction.
>> However, currently the coordinator connection are made different by using:
>> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
>> for the Node id.
>>
>> So to identify Coordinator connections, we'd have to check that the
>> NetworkReceive source is a value near Integer.MAX_VALUE which is a bit
>> hacky ...
>>
>> Maybe we could add a constructor to Node that allows to pass in a
>> sourceId String. That way we could make all the coordinator
>> connections explicit (by setting it to "Coordinator-[ID]" for
>> example).
>> What do you think ?
>>
>> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <ja...@confluent.io>
>> wrote:
>> > Good point. The consumer does use a separate connection to the
>> coordinator,
>> > so perhaps the connection itself could be tagged for normal heap
>> allocation?
>> >
>> > -Jason
>> >
>> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
>> onurkaraman.apache@gmail.com
>> >> wrote:
>> >
>> >> I only did a quick scan but I wanted to point out what I think is an
>> >> incorrect assumption in the KIP's caveats:
>> >> "
>> >> There is a risk using the MemoryPool that, after we fill up the memory
>> with
>> >> fetch data, we can starve the coordinator's connection
>> >> ...
>> >> To alleviate this issue, only messages larger than 1Kb will be
>> allocated in
>> >> the MemoryPool. Smaller messages will be allocated directly on the Heap
>> >> like before. This allows group/heartbeat messages to avoid being
>> delayed if
>> >> the MemoryPool fills up.
>> >> "
>> >>
>> >> So it sounds like there's an incorrect assumption that responses from
>> the
>> >> coordinator will always be small (< 1Kb as mentioned in the caveat).
>> There
>> >> are now a handful of request types between clients and the coordinator:
>> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit, OffsetFetch,
>> >> ListGroups, DescribeGroups}. While true (at least today) for
>> >> HeartbeatResponse and a few others, I don't think we can assume
>> >> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and
>> >> OffsetFetchResponse will be small, as they are effectively bounded by
>> the
>> >> max message size allowed by the broker for the __consumer_offsets topic
>> >> which by default is 1MB.
>> >>
>> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
>> mickael.maison@gmail.com>
>> >> wrote:
>> >>
>> >> > I've updated the KIP to address all the comments raised here and from
>> >> > the "DISCUSS" thread.
>> >> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >> >
>> >> > Now, I'd like to restart the vote.
>> >> >
>> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
>> >> > <ra...@googlemail.com> wrote:
>> >> > > Hi Mickael,
>> >> > >
>> >> > > I am +1 on the overall approach of this KIP, but have a couple of
>> >> > comments
>> >> > > (sorry, should have brought them up on the discuss thread earlier):
>> >> > >
>> >> > > 1. Perhaps it would be better to do this after KAFKA-4137
>> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is implemented?
>> At
>> >> > the
>> >> > > moment, coordinator shares the same NetworkClient (and hence the
>> same
>> >> > > Selector) with consumer connections used for fetching records. Since
>> >> > > freeing of memory relies on consuming applications invoking poll()
>> >> after
>> >> > > processing previous records and potentially after committing
>> offsets,
>> >> it
>> >> > > will be good to ensure that coordinator is not blocked for read by
>> >> fetch
>> >> > > responses. This may be simpler once coordinator has its own
>> Selector.
>> >> > >
>> >> > > 2. The KIP says: *Once messages are returned to the user, messages
>> are
>> >> > > deleted from the MemoryPool so new messages can be stored.*
>> >> > > Can you expand that a bit? I am assuming that partial buffers never
>> get
>> >> > > freed when some messages are returned to the user since the
>> consumer is
>> >> > > still holding a reference to the buffer. Would buffers be freed when
>> >> > > fetches for all the partitions in a response are parsed, but perhaps
>> >> not
>> >> > > yet returned to the user (i.e., is the memory freed when a
>> reference to
>> >> > the
>> >> > > response buffer is no longer required)? It will be good to document
>> the
>> >> > > (approximate) maximum memory requirement for the non-compressed
>> case.
>> >> > There
>> >> > > is data read from the socket, cached in the Fetcher and (as Radai
>> has
>> >> > > pointed out), the records still with the user application.
>> >> > >
>> >> > >
>> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <ra...@gmail.com>
>> >> > wrote:
>> >> > >
>> >> > >> +1 (non-binding).
>> >> > >>
>> >> > >> small nit pick - just because you returned a response to user
>> doesnt
>> >> > mean
>> >> > >> the memory id no longer used. for some cases the actual "point of
>> >> > >> termination" may be the deserializer (really impl-dependant), but
>> >> > >> generally, wouldnt it be "nice" to have an explicit dispose() call
>> on
>> >> > >> responses (with the addition that getting the next batch of data
>> from
>> >> a
>> >> > >> consumer automatically disposes the previous results)
>> >> > >>
>> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <EC...@uk.ibm.com>
>> >> > wrote:
>> >> > >>
>> >> > >> > +1 (non binding)
>> >> > >> > --------------------------------------------------
>> >> > >> > Edoardo Comar
>> >> > >> > IBM MessageHub
>> >> > >> > ecomar@uk.ibm.com
>> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
>> >> > >> >
>> >> > >> > IBM United Kingdom Limited Registered in England and Wales with
>> >> number
>> >> > >> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
>> >> Hants.
>> >> > >> PO6
>> >> > >> > 3AU
>> >> > >> >
>> >> > >> >
>> >> > >> >
>> >> > >> > From:   Mickael Maison <mi...@gmail.com>
>> >> > >> > To:     dev@kafka.apache.org
>> >> > >> > Date:   05/12/2016 14:38
>> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory usage in the
>> >> > consumer
>> >> > >> >
>> >> > >> >
>> >> > >> >
>> >> > >> > Hi all,
>> >> > >> >
>> >> > >> > I'd like to start the vote for KIP-81:
>> >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >> > >> >
>> >> > >> >
>> >> > >> > Thank you
>> >> > >> >
>> >> > >> >
>> >> > >> >
>> >> > >> >
>> >> > >> > Unless stated otherwise above:
>> >> > >> > IBM United Kingdom Limited - Registered in England and Wales with
>> >> > number
>> >> > >> > 741598.
>> >> > >> > Registered office: PO Box 41, North Harbour, Portsmouth,
>> Hampshire
>> >> PO6
>> >> > >> 3AU
>> >> > >> >
>> >> > >>
>> >> > >
>> >> > >
>> >> > >
>> >> > > --
>> >> > > Regards,
>> >> > >
>> >> > > Rajini
>> >> >
>> >>
>>

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Mickael,

The suggestion to add something to Node makes sense. I could imagine for
example adding a flag to indicate that the connection has a higher
"priority," meaning that we can allocate outside of the memory pool if
necessary. That would still be generic even if the only use case is the
consumer coordinator. We might also face a similar problem when the
producer is sending requests to the transaction coordinator for KIP-98.
What do you think?

Thanks,
Jason

On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <mi...@gmail.com>
wrote:

> Apologies for the late response.
>
> Thanks Jason for the suggestion. Yes you are right, the Coordinator
> connection is "tagged" with a different id, so we could retrieve it in
> NetworkReceive to make the distinction.
> However, currently the coordinator connection are made different by using:
> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
> for the Node id.
>
> So to identify Coordinator connections, we'd have to check that the
> NetworkReceive source is a value near Integer.MAX_VALUE which is a bit
> hacky ...
>
> Maybe we could add a constructor to Node that allows to pass in a
> sourceId String. That way we could make all the coordinator
> connections explicit (by setting it to "Coordinator-[ID]" for
> example).
> What do you think ?
>
> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
> > Good point. The consumer does use a separate connection to the
> coordinator,
> > so perhaps the connection itself could be tagged for normal heap
> allocation?
> >
> > -Jason
> >
> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> onurkaraman.apache@gmail.com
> >> wrote:
> >
> >> I only did a quick scan but I wanted to point out what I think is an
> >> incorrect assumption in the KIP's caveats:
> >> "
> >> There is a risk using the MemoryPool that, after we fill up the memory
> with
> >> fetch data, we can starve the coordinator's connection
> >> ...
> >> To alleviate this issue, only messages larger than 1Kb will be
> allocated in
> >> the MemoryPool. Smaller messages will be allocated directly on the Heap
> >> like before. This allows group/heartbeat messages to avoid being
> delayed if
> >> the MemoryPool fills up.
> >> "
> >>
> >> So it sounds like there's an incorrect assumption that responses from
> the
> >> coordinator will always be small (< 1Kb as mentioned in the caveat).
> There
> >> are now a handful of request types between clients and the coordinator:
> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit, OffsetFetch,
> >> ListGroups, DescribeGroups}. While true (at least today) for
> >> HeartbeatResponse and a few others, I don't think we can assume
> >> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and
> >> OffsetFetchResponse will be small, as they are effectively bounded by
> the
> >> max message size allowed by the broker for the __consumer_offsets topic
> >> which by default is 1MB.
> >>
> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
> mickael.maison@gmail.com>
> >> wrote:
> >>
> >> > I've updated the KIP to address all the comments raised here and from
> >> > the "DISCUSS" thread.
> >> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >
> >> > Now, I'd like to restart the vote.
> >> >
> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> >> > <ra...@googlemail.com> wrote:
> >> > > Hi Mickael,
> >> > >
> >> > > I am +1 on the overall approach of this KIP, but have a couple of
> >> > comments
> >> > > (sorry, should have brought them up on the discuss thread earlier):
> >> > >
> >> > > 1. Perhaps it would be better to do this after KAFKA-4137
> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is implemented?
> At
> >> > the
> >> > > moment, coordinator shares the same NetworkClient (and hence the
> same
> >> > > Selector) with consumer connections used for fetching records. Since
> >> > > freeing of memory relies on consuming applications invoking poll()
> >> after
> >> > > processing previous records and potentially after committing
> offsets,
> >> it
> >> > > will be good to ensure that coordinator is not blocked for read by
> >> fetch
> >> > > responses. This may be simpler once coordinator has its own
> Selector.
> >> > >
> >> > > 2. The KIP says: *Once messages are returned to the user, messages
> are
> >> > > deleted from the MemoryPool so new messages can be stored.*
> >> > > Can you expand that a bit? I am assuming that partial buffers never
> get
> >> > > freed when some messages are returned to the user since the
> consumer is
> >> > > still holding a reference to the buffer. Would buffers be freed when
> >> > > fetches for all the partitions in a response are parsed, but perhaps
> >> not
> >> > > yet returned to the user (i.e., is the memory freed when a
> reference to
> >> > the
> >> > > response buffer is no longer required)? It will be good to document
> the
> >> > > (approximate) maximum memory requirement for the non-compressed
> case.
> >> > There
> >> > > is data read from the socket, cached in the Fetcher and (as Radai
> has
> >> > > pointed out), the records still with the user application.
> >> > >
> >> > >
> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <ra...@gmail.com>
> >> > wrote:
> >> > >
> >> > >> +1 (non-binding).
> >> > >>
> >> > >> small nit pick - just because you returned a response to user
> doesnt
> >> > mean
> >> > >> the memory id no longer used. for some cases the actual "point of
> >> > >> termination" may be the deserializer (really impl-dependant), but
> >> > >> generally, wouldnt it be "nice" to have an explicit dispose() call
> on
> >> > >> responses (with the addition that getting the next batch of data
> from
> >> a
> >> > >> consumer automatically disposes the previous results)
> >> > >>
> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <EC...@uk.ibm.com>
> >> > wrote:
> >> > >>
> >> > >> > +1 (non binding)
> >> > >> > --------------------------------------------------
> >> > >> > Edoardo Comar
> >> > >> > IBM MessageHub
> >> > >> > ecomar@uk.ibm.com
> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> >> > >> >
> >> > >> > IBM United Kingdom Limited Registered in England and Wales with
> >> number
> >> > >> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
> >> Hants.
> >> > >> PO6
> >> > >> > 3AU
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> > From:   Mickael Maison <mi...@gmail.com>
> >> > >> > To:     dev@kafka.apache.org
> >> > >> > Date:   05/12/2016 14:38
> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory usage in the
> >> > consumer
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> > Hi all,
> >> > >> >
> >> > >> > I'd like to start the vote for KIP-81:
> >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> > >> >
> >> > >> >
> >> > >> > Thank you
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> > Unless stated otherwise above:
> >> > >> > IBM United Kingdom Limited - Registered in England and Wales with
> >> > number
> >> > >> > 741598.
> >> > >> > Registered office: PO Box 41, North Harbour, Portsmouth,
> Hampshire
> >> PO6
> >> > >> 3AU
> >> > >> >
> >> > >>
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > Regards,
> >> > >
> >> > > Rajini
> >> >
> >>
>

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Mickael Maison <mi...@gmail.com>.
Apologies for the late response.

Thanks Jason for the suggestion. Yes you are right, the Coordinator
connection is "tagged" with a different id, so we could retrieve it in
NetworkReceive to make the distinction.
However, currently the coordinator connection are made different by using:
Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
for the Node id.

So to identify Coordinator connections, we'd have to check that the
NetworkReceive source is a value near Integer.MAX_VALUE which is a bit
hacky ...

Maybe we could add a constructor to Node that allows to pass in a
sourceId String. That way we could make all the coordinator
connections explicit (by setting it to "Coordinator-[ID]" for
example).
What do you think ?

On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <ja...@confluent.io> wrote:
> Good point. The consumer does use a separate connection to the coordinator,
> so perhaps the connection itself could be tagged for normal heap allocation?
>
> -Jason
>
> On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <onurkaraman.apache@gmail.com
>> wrote:
>
>> I only did a quick scan but I wanted to point out what I think is an
>> incorrect assumption in the KIP's caveats:
>> "
>> There is a risk using the MemoryPool that, after we fill up the memory with
>> fetch data, we can starve the coordinator's connection
>> ...
>> To alleviate this issue, only messages larger than 1Kb will be allocated in
>> the MemoryPool. Smaller messages will be allocated directly on the Heap
>> like before. This allows group/heartbeat messages to avoid being delayed if
>> the MemoryPool fills up.
>> "
>>
>> So it sounds like there's an incorrect assumption that responses from the
>> coordinator will always be small (< 1Kb as mentioned in the caveat). There
>> are now a handful of request types between clients and the coordinator:
>> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit, OffsetFetch,
>> ListGroups, DescribeGroups}. While true (at least today) for
>> HeartbeatResponse and a few others, I don't think we can assume
>> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and
>> OffsetFetchResponse will be small, as they are effectively bounded by the
>> max message size allowed by the broker for the __consumer_offsets topic
>> which by default is 1MB.
>>
>> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <mi...@gmail.com>
>> wrote:
>>
>> > I've updated the KIP to address all the comments raised here and from
>> > the "DISCUSS" thread.
>> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >
>> > Now, I'd like to restart the vote.
>> >
>> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
>> > <ra...@googlemail.com> wrote:
>> > > Hi Mickael,
>> > >
>> > > I am +1 on the overall approach of this KIP, but have a couple of
>> > comments
>> > > (sorry, should have brought them up on the discuss thread earlier):
>> > >
>> > > 1. Perhaps it would be better to do this after KAFKA-4137
>> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is implemented? At
>> > the
>> > > moment, coordinator shares the same NetworkClient (and hence the same
>> > > Selector) with consumer connections used for fetching records. Since
>> > > freeing of memory relies on consuming applications invoking poll()
>> after
>> > > processing previous records and potentially after committing offsets,
>> it
>> > > will be good to ensure that coordinator is not blocked for read by
>> fetch
>> > > responses. This may be simpler once coordinator has its own Selector.
>> > >
>> > > 2. The KIP says: *Once messages are returned to the user, messages are
>> > > deleted from the MemoryPool so new messages can be stored.*
>> > > Can you expand that a bit? I am assuming that partial buffers never get
>> > > freed when some messages are returned to the user since the consumer is
>> > > still holding a reference to the buffer. Would buffers be freed when
>> > > fetches for all the partitions in a response are parsed, but perhaps
>> not
>> > > yet returned to the user (i.e., is the memory freed when a reference to
>> > the
>> > > response buffer is no longer required)? It will be good to document the
>> > > (approximate) maximum memory requirement for the non-compressed case.
>> > There
>> > > is data read from the socket, cached in the Fetcher and (as Radai has
>> > > pointed out), the records still with the user application.
>> > >
>> > >
>> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <ra...@gmail.com>
>> > wrote:
>> > >
>> > >> +1 (non-binding).
>> > >>
>> > >> small nit pick - just because you returned a response to user doesnt
>> > mean
>> > >> the memory id no longer used. for some cases the actual "point of
>> > >> termination" may be the deserializer (really impl-dependant), but
>> > >> generally, wouldnt it be "nice" to have an explicit dispose() call on
>> > >> responses (with the addition that getting the next batch of data from
>> a
>> > >> consumer automatically disposes the previous results)
>> > >>
>> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <EC...@uk.ibm.com>
>> > wrote:
>> > >>
>> > >> > +1 (non binding)
>> > >> > --------------------------------------------------
>> > >> > Edoardo Comar
>> > >> > IBM MessageHub
>> > >> > ecomar@uk.ibm.com
>> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
>> > >> >
>> > >> > IBM United Kingdom Limited Registered in England and Wales with
>> number
>> > >> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
>> Hants.
>> > >> PO6
>> > >> > 3AU
>> > >> >
>> > >> >
>> > >> >
>> > >> > From:   Mickael Maison <mi...@gmail.com>
>> > >> > To:     dev@kafka.apache.org
>> > >> > Date:   05/12/2016 14:38
>> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory usage in the
>> > consumer
>> > >> >
>> > >> >
>> > >> >
>> > >> > Hi all,
>> > >> >
>> > >> > I'd like to start the vote for KIP-81:
>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> > >> >
>> > >> >
>> > >> > Thank you
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> > Unless stated otherwise above:
>> > >> > IBM United Kingdom Limited - Registered in England and Wales with
>> > number
>> > >> > 741598.
>> > >> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
>> PO6
>> > >> 3AU
>> > >> >
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > Regards,
>> > >
>> > > Rajini
>> >
>>

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Jason Gustafson <ja...@confluent.io>.
Good point. The consumer does use a separate connection to the coordinator,
so perhaps the connection itself could be tagged for normal heap allocation?

-Jason

On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <onurkaraman.apache@gmail.com
> wrote:

> I only did a quick scan but I wanted to point out what I think is an
> incorrect assumption in the KIP's caveats:
> "
> There is a risk using the MemoryPool that, after we fill up the memory with
> fetch data, we can starve the coordinator's connection
> ...
> To alleviate this issue, only messages larger than 1Kb will be allocated in
> the MemoryPool. Smaller messages will be allocated directly on the Heap
> like before. This allows group/heartbeat messages to avoid being delayed if
> the MemoryPool fills up.
> "
>
> So it sounds like there's an incorrect assumption that responses from the
> coordinator will always be small (< 1Kb as mentioned in the caveat). There
> are now a handful of request types between clients and the coordinator:
> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit, OffsetFetch,
> ListGroups, DescribeGroups}. While true (at least today) for
> HeartbeatResponse and a few others, I don't think we can assume
> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and
> OffsetFetchResponse will be small, as they are effectively bounded by the
> max message size allowed by the broker for the __consumer_offsets topic
> which by default is 1MB.
>
> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <mi...@gmail.com>
> wrote:
>
> > I've updated the KIP to address all the comments raised here and from
> > the "DISCUSS" thread.
> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >
> > Now, I'd like to restart the vote.
> >
> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> > <ra...@googlemail.com> wrote:
> > > Hi Mickael,
> > >
> > > I am +1 on the overall approach of this KIP, but have a couple of
> > comments
> > > (sorry, should have brought them up on the discuss thread earlier):
> > >
> > > 1. Perhaps it would be better to do this after KAFKA-4137
> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is implemented? At
> > the
> > > moment, coordinator shares the same NetworkClient (and hence the same
> > > Selector) with consumer connections used for fetching records. Since
> > > freeing of memory relies on consuming applications invoking poll()
> after
> > > processing previous records and potentially after committing offsets,
> it
> > > will be good to ensure that coordinator is not blocked for read by
> fetch
> > > responses. This may be simpler once coordinator has its own Selector.
> > >
> > > 2. The KIP says: *Once messages are returned to the user, messages are
> > > deleted from the MemoryPool so new messages can be stored.*
> > > Can you expand that a bit? I am assuming that partial buffers never get
> > > freed when some messages are returned to the user since the consumer is
> > > still holding a reference to the buffer. Would buffers be freed when
> > > fetches for all the partitions in a response are parsed, but perhaps
> not
> > > yet returned to the user (i.e., is the memory freed when a reference to
> > the
> > > response buffer is no longer required)? It will be good to document the
> > > (approximate) maximum memory requirement for the non-compressed case.
> > There
> > > is data read from the socket, cached in the Fetcher and (as Radai has
> > > pointed out), the records still with the user application.
> > >
> > >
> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <ra...@gmail.com>
> > wrote:
> > >
> > >> +1 (non-binding).
> > >>
> > >> small nit pick - just because you returned a response to user doesnt
> > mean
> > >> the memory id no longer used. for some cases the actual "point of
> > >> termination" may be the deserializer (really impl-dependant), but
> > >> generally, wouldnt it be "nice" to have an explicit dispose() call on
> > >> responses (with the addition that getting the next batch of data from
> a
> > >> consumer automatically disposes the previous results)
> > >>
> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <EC...@uk.ibm.com>
> > wrote:
> > >>
> > >> > +1 (non binding)
> > >> > --------------------------------------------------
> > >> > Edoardo Comar
> > >> > IBM MessageHub
> > >> > ecomar@uk.ibm.com
> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> > >> >
> > >> > IBM United Kingdom Limited Registered in England and Wales with
> number
> > >> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
> Hants.
> > >> PO6
> > >> > 3AU
> > >> >
> > >> >
> > >> >
> > >> > From:   Mickael Maison <mi...@gmail.com>
> > >> > To:     dev@kafka.apache.org
> > >> > Date:   05/12/2016 14:38
> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory usage in the
> > consumer
> > >> >
> > >> >
> > >> >
> > >> > Hi all,
> > >> >
> > >> > I'd like to start the vote for KIP-81:
> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> > >> >
> > >> >
> > >> > Thank you
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > Unless stated otherwise above:
> > >> > IBM United Kingdom Limited - Registered in England and Wales with
> > number
> > >> > 741598.
> > >> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6
> > >> 3AU
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > Regards,
> > >
> > > Rajini
> >
>

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

Posted by Onur Karaman <on...@gmail.com>.
I only did a quick scan but I wanted to point out what I think is an
incorrect assumption in the KIP's caveats:
"
There is a risk using the MemoryPool that, after we fill up the memory with
fetch data, we can starve the coordinator's connection
...
To alleviate this issue, only messages larger than 1Kb will be allocated in
the MemoryPool. Smaller messages will be allocated directly on the Heap
like before. This allows group/heartbeat messages to avoid being delayed if
the MemoryPool fills up.
"

So it sounds like there's an incorrect assumption that responses from the
coordinator will always be small (< 1Kb as mentioned in the caveat). There
are now a handful of request types between clients and the coordinator:
{JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit, OffsetFetch,
ListGroups, DescribeGroups}. While true (at least today) for
HeartbeatResponse and a few others, I don't think we can assume
JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and
OffsetFetchResponse will be small, as they are effectively bounded by the
max message size allowed by the broker for the __consumer_offsets topic
which by default is 1MB.

On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <mi...@gmail.com>
wrote:

> I've updated the KIP to address all the comments raised here and from
> the "DISCUSS" thread.
> See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>
> Now, I'd like to restart the vote.
>
> On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> <ra...@googlemail.com> wrote:
> > Hi Mickael,
> >
> > I am +1 on the overall approach of this KIP, but have a couple of
> comments
> > (sorry, should have brought them up on the discuss thread earlier):
> >
> > 1. Perhaps it would be better to do this after KAFKA-4137
> > <https://issues.apache.org/jira/browse/KAFKA-4137> is implemented? At
> the
> > moment, coordinator shares the same NetworkClient (and hence the same
> > Selector) with consumer connections used for fetching records. Since
> > freeing of memory relies on consuming applications invoking poll() after
> > processing previous records and potentially after committing offsets, it
> > will be good to ensure that coordinator is not blocked for read by fetch
> > responses. This may be simpler once coordinator has its own Selector.
> >
> > 2. The KIP says: *Once messages are returned to the user, messages are
> > deleted from the MemoryPool so new messages can be stored.*
> > Can you expand that a bit? I am assuming that partial buffers never get
> > freed when some messages are returned to the user since the consumer is
> > still holding a reference to the buffer. Would buffers be freed when
> > fetches for all the partitions in a response are parsed, but perhaps not
> > yet returned to the user (i.e., is the memory freed when a reference to
> the
> > response buffer is no longer required)? It will be good to document the
> > (approximate) maximum memory requirement for the non-compressed case.
> There
> > is data read from the socket, cached in the Fetcher and (as Radai has
> > pointed out), the records still with the user application.
> >
> >
> > On Tue, Dec 6, 2016 at 2:04 AM, radai <ra...@gmail.com>
> wrote:
> >
> >> +1 (non-binding).
> >>
> >> small nit pick - just because you returned a response to user doesnt
> mean
> >> the memory id no longer used. for some cases the actual "point of
> >> termination" may be the deserializer (really impl-dependant), but
> >> generally, wouldnt it be "nice" to have an explicit dispose() call on
> >> responses (with the addition that getting the next batch of data from a
> >> consumer automatically disposes the previous results)
> >>
> >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <EC...@uk.ibm.com>
> wrote:
> >>
> >> > +1 (non binding)
> >> > --------------------------------------------------
> >> > Edoardo Comar
> >> > IBM MessageHub
> >> > ecomar@uk.ibm.com
> >> > IBM UK Ltd, Hursley Park, SO21 2JN
> >> >
> >> > IBM United Kingdom Limited Registered in England and Wales with number
> >> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
> >> PO6
> >> > 3AU
> >> >
> >> >
> >> >
> >> > From:   Mickael Maison <mi...@gmail.com>
> >> > To:     dev@kafka.apache.org
> >> > Date:   05/12/2016 14:38
> >> > Subject:        [VOTE] KIP-81: Bound Fetch memory usage in the
> consumer
> >> >
> >> >
> >> >
> >> > Hi all,
> >> >
> >> > I'd like to start the vote for KIP-81:
> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >
> >> >
> >> > Thank you
> >> >
> >> >
> >> >
> >> >
> >> > Unless stated otherwise above:
> >> > IBM United Kingdom Limited - Registered in England and Wales with
> number
> >> > 741598.
> >> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> >> 3AU
> >> >
> >>
> >
> >
> >
> > --
> > Regards,
> >
> > Rajini
>