You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Dhruvil Shah <dh...@confluent.io> on 2018/04/06 21:56:01 UTC

[DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

Hi,

I created a KIP to help mitigate out of memory issues during
down-conversion. The KIP proposes introducing a configuration that can
prevent down-conversions altogether, and also describes a design for
efficient memory usage for down-conversion.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-283%3A+Efficient+Memory+Usage+for+Down-Conversion

Suggestions and feedback are welcome!

Thanks,
Dhruvil

Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

Posted by Dhruvil Shah <dh...@confluent.io>.
Hi Jason,

1. The motivation for adding the configuration was that even though we are
improving memory usage, there is still overhead involved with
down-conversion - the broker still has to do extra work which some users
might want to eliminate completely. The proposal also moves the
down-conversion and related file I/O to the network thread which could
block the thread till the read is complete, though we try to minimize the
impact by reading and down-converting only a small batch of messages at a
time.

I agree that the configuration should be added only if we see a high value
for it. One way to determine whether we really need the configuration would
be to run some performance tests to see what kind of an impact the proposal
has, in terms of its effects on memory consumption, ability to handle
concurrent requests, etc.

2. One advantage of being able to specify the version number could be that
users can turn off certain down-conversions that are deemed more expensive
than others. For example, converting from X --> Y could be cheap but X -->
Z could be much more expensive. I am not sure if this has any practical
significance, given how Kafka message formats work today though.

3. I will look into this a bit more and get back to you.

4. Makes sense, will update the KIP with that.

Thanks,
Dhruvil

On Wed, Apr 11, 2018 at 3:54 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Dhruvil,
>
> Thanks for the KIP. Looks good overall. I have a few questions about the
> new configs:
>
> 1. I'm mainly wondering how necessary the configs are given the
> improvements in this KIP to reduce memory pressure from down-conversion.
> The reason I ask is that we'll be stuck with this config for a long time,
> so we should make sure we really need it. Ideally the improvements here are
> enough to make the memory problem a non-issue, but if we're not convinced
> they achieve that, the configs may have some value.
> 2. It seems like the intent is to use these configs to disable
> down-conversion specifically. Would it make sense to let the name be more
> specific (e.g. `log.disable.downconversion`)? Or do you think there are
> other benefits of this config outside of this use case?
> 3. Does this config apply to replica fetchers? I think it would be
> reasonable to disable down-conversion for replica fetchers in all cases
> since it should not be needed anyway and can cause weird log divergence
> edge cases. I opened https://issues.apache.org/jira/browse/KAFKA-6392
> about
> this some time ago. Would it be reasonable to include this as part of this
> KIP?
> 4. You mention in the KIP that we would use the invalid request error code
> if the version is disallowed. Wouldn't it make more sense to use
> unsupported version?
>
> Thanks,
> Jason
>
>
> On Wed, Apr 11, 2018 at 6:38 AM, Rajini Sivaram <ra...@gmail.com>
> wrote:
>
> > Hi Dhruvil,
> >
> > Thanks for the KIP. This is a great improvement to reduce OOMs in brokers
> > during down-conversion.
> >
> > Just a couple of minor questions:
> >
> > The goals state: "*Provide appropriate configuration parameters to manage
> > maximum memory usage during down-conversion on the broker.*"
> > Which config parameters are these referring to?
> >
> > What exactly is a chunk going to be - will it be all the records for a
> > partition (which could be quite large?) or one message batch? The KIP
> talks
> > about pre-allocated fixed size buffers, but your last note suggests that
> > you would use temporary buffers created for each partition. Do we need to
> > consider using a memory pool for these or do we think that the buffers
> will
> > be small enough to cope with lots of connections with downconversions?
> This
> > will be a clear improvement over what we have now in any case, but  just
> > checking anyway.
> >
> > Regards,
> >
> > Rajini
> >
> > On Sat, Apr 7, 2018 at 12:29 AM, Dhruvil Shah <dh...@confluent.io>
> > wrote:
> >
> > > Hi Ted,
> > >
> > > Thanks for the comments.
> > >
> > >
> > >
> > > *>> bq. we can perform down-conversion when Records.writeTo is
> called.>>
> > > Wouldn't this delay the network thread (though maybe the duration is
> > > short)>> ?*
> > > Yes, this is noted in the Cons section. I think we have a precedent for
> > > this in the `SSLTransportLayer` implementation, so trying to follow a
> > > similar model here.
> > >
> > >
> > > *>> Can you expand on the structure of LazyDownConvertedRecords in more
> > > detail ?*
> > > I added the basic structure to the KIP.
> > >
> > >
> > >
> > >
> > > *>> bq. even if it exceeds fetch.max.bytes>> I did a brief search but
> > > didn't see the above config. Did you mean>> message.max.bytes>> ?*
> > > Yes, thanks for the correction.
> > >
> > >
> > > *>> After the buffers grow, is there a way to trim them down if
> > > subsequent>> down-conversion doesn't need that much memory ?*
> > > The easiest way probably is to allocate and use a new buffer for each
> > > topic-partition. I think we would not require any trimming down if we
> do
> > > this. The buffer will be available for garbage collection as soon as we
> > are
> > > done serializing and writing all messages to the socket for the
> > particular
> > > topic-partition.
> > >
> > > Thanks,
> > > Dhruvil
> > >
> > >
> > > On Fri, Apr 6, 2018 at 3:23 PM, Ted Yu <yu...@gmail.com> wrote:
> > >
> > > > bq. we can perform down-conversion when Records.writeTo is called.
> > > >
> > > > Wouldn't this delay the network thread (though maybe the duration is
> > > short)
> > > > ?
> > > >
> > > > Can you expand on the structure of LazyDownConvertedRecords in more
> > > detail
> > > > ?
> > > >
> > > > bq. even if it exceeds fetch.max.bytes
> > > >
> > > > I did a brief search but didn't see the above config. Did you mean
> > > > message.max.bytes
> > > > ?
> > > >
> > > > bq. with possibility to grow if the allocation
> > > >
> > > > After the buffers grow, is there a way to trim them down if
> subsequent
> > > > down-conversion doesn't need that much memory ?
> > > >
> > > > Thanks
> > > >
> > > >
> > > > On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dh...@confluent.io>
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I created a KIP to help mitigate out of memory issues during
> > > > > down-conversion. The KIP proposes introducing a configuration that
> > can
> > > > > prevent down-conversions altogether, and also describes a design
> for
> > > > > efficient memory usage for down-conversion.
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
> > > > >
> > > > > Suggestions and feedback are welcome!
> > > > >
> > > > > Thanks,
> > > > > Dhruvil
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Dhruvil,

Thanks for the KIP. Looks good overall. I have a few questions about the
new configs:

1. I'm mainly wondering how necessary the configs are given the
improvements in this KIP to reduce memory pressure from down-conversion.
The reason I ask is that we'll be stuck with this config for a long time,
so we should make sure we really need it. Ideally the improvements here are
enough to make the memory problem a non-issue, but if we're not convinced
they achieve that, the configs may have some value.
2. It seems like the intent is to use these configs to disable
down-conversion specifically. Would it make sense to let the name be more
specific (e.g. `log.disable.downconversion`)? Or do you think there are
other benefits of this config outside of this use case?
3. Does this config apply to replica fetchers? I think it would be
reasonable to disable down-conversion for replica fetchers in all cases
since it should not be needed anyway and can cause weird log divergence
edge cases. I opened https://issues.apache.org/jira/browse/KAFKA-6392 about
this some time ago. Would it be reasonable to include this as part of this
KIP?
4. You mention in the KIP that we would use the invalid request error code
if the version is disallowed. Wouldn't it make more sense to use
unsupported version?

Thanks,
Jason


On Wed, Apr 11, 2018 at 6:38 AM, Rajini Sivaram <ra...@gmail.com>
wrote:

> Hi Dhruvil,
>
> Thanks for the KIP. This is a great improvement to reduce OOMs in brokers
> during down-conversion.
>
> Just a couple of minor questions:
>
> The goals state: "*Provide appropriate configuration parameters to manage
> maximum memory usage during down-conversion on the broker.*"
> Which config parameters are these referring to?
>
> What exactly is a chunk going to be - will it be all the records for a
> partition (which could be quite large?) or one message batch? The KIP talks
> about pre-allocated fixed size buffers, but your last note suggests that
> you would use temporary buffers created for each partition. Do we need to
> consider using a memory pool for these or do we think that the buffers will
> be small enough to cope with lots of connections with downconversions? This
> will be a clear improvement over what we have now in any case, but  just
> checking anyway.
>
> Regards,
>
> Rajini
>
> On Sat, Apr 7, 2018 at 12:29 AM, Dhruvil Shah <dh...@confluent.io>
> wrote:
>
> > Hi Ted,
> >
> > Thanks for the comments.
> >
> >
> >
> > *>> bq. we can perform down-conversion when Records.writeTo is called.>>
> > Wouldn't this delay the network thread (though maybe the duration is
> > short)>> ?*
> > Yes, this is noted in the Cons section. I think we have a precedent for
> > this in the `SSLTransportLayer` implementation, so trying to follow a
> > similar model here.
> >
> >
> > *>> Can you expand on the structure of LazyDownConvertedRecords in more
> > detail ?*
> > I added the basic structure to the KIP.
> >
> >
> >
> >
> > *>> bq. even if it exceeds fetch.max.bytes>> I did a brief search but
> > didn't see the above config. Did you mean>> message.max.bytes>> ?*
> > Yes, thanks for the correction.
> >
> >
> > *>> After the buffers grow, is there a way to trim them down if
> > subsequent>> down-conversion doesn't need that much memory ?*
> > The easiest way probably is to allocate and use a new buffer for each
> > topic-partition. I think we would not require any trimming down if we do
> > this. The buffer will be available for garbage collection as soon as we
> are
> > done serializing and writing all messages to the socket for the
> particular
> > topic-partition.
> >
> > Thanks,
> > Dhruvil
> >
> >
> > On Fri, Apr 6, 2018 at 3:23 PM, Ted Yu <yu...@gmail.com> wrote:
> >
> > > bq. we can perform down-conversion when Records.writeTo is called.
> > >
> > > Wouldn't this delay the network thread (though maybe the duration is
> > short)
> > > ?
> > >
> > > Can you expand on the structure of LazyDownConvertedRecords in more
> > detail
> > > ?
> > >
> > > bq. even if it exceeds fetch.max.bytes
> > >
> > > I did a brief search but didn't see the above config. Did you mean
> > > message.max.bytes
> > > ?
> > >
> > > bq. with possibility to grow if the allocation
> > >
> > > After the buffers grow, is there a way to trim them down if subsequent
> > > down-conversion doesn't need that much memory ?
> > >
> > > Thanks
> > >
> > >
> > > On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dh...@confluent.io>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I created a KIP to help mitigate out of memory issues during
> > > > down-conversion. The KIP proposes introducing a configuration that
> can
> > > > prevent down-conversions altogether, and also describes a design for
> > > > efficient memory usage for down-conversion.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
> > > >
> > > > Suggestions and feedback are welcome!
> > > >
> > > > Thanks,
> > > > Dhruvil
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

Posted by Rajini Sivaram <ra...@gmail.com>.
Thanks Dhruvil. That makes sense.

On Thu, Apr 12, 2018 at 2:49 AM, Dhruvil Shah <dh...@confluent.io> wrote:

> Hi Rajini,
>
> Thanks for the comments.
>
> Which config parameters are these referring to?
>
> This refers to a proposal that was later rejected. I have removed this goal
> from the KIP as it is no longer valid.
>
> What exactly is a chunk going to be
>
> I have updated the KIP to remove references to the fixed buffers. I started
> out thinking we would have fix-sized buffers with a memory pool but it
> seems we could achieve what we want without the added complexity.
>
> My current proposal is that we read a set of message batches into memory
> till we reach a pre-configured size threshold, say 16kB (the pre-configured
> size might need to be larger if the first message batch is larger than
> that). We down-convert these batches and hold them in a temporary buffer
> till are able to send out all the messages. And then repeat the same
> process for subsequent batches.
>
> Because we down-convert a maximum of 16kB worth of messages at any given
> point in time, the memory utilization should be much more deterministic,
> i.e. we have a maximum of about 16kB memory allocated for each in-flight
> `FetchResponse` that requires down-conversion.
>
> Let me know if this makes sense.
>
> Thanks,
> Dhruvil
>
> On Wed, Apr 11, 2018 at 6:38 AM, Rajini Sivaram <ra...@gmail.com>
> wrote:
>
> > Hi Dhruvil,
> >
> > Thanks for the KIP. This is a great improvement to reduce OOMs in brokers
> > during down-conversion.
> >
> > Just a couple of minor questions:
> >
> > The goals state: "*Provide appropriate configuration parameters to manage
> > maximum memory usage during down-conversion on the broker.*"
> > Which config parameters are these referring to?
> >
> > What exactly is a chunk going to be - will it be all the records for a
> > partition (which could be quite large?) or one message batch? The KIP
> talks
> > about pre-allocated fixed size buffers, but your last note suggests that
> > you would use temporary buffers created for each partition. Do we need to
> > consider using a memory pool for these or do we think that the buffers
> will
> > be small enough to cope with lots of connections with downconversions?
> This
> > will be a clear improvement over what we have now in any case, but  just
> > checking anyway.
> >
> > Regards,
> >
> > Rajini
> >
> > On Sat, Apr 7, 2018 at 12:29 AM, Dhruvil Shah <dh...@confluent.io>
> > wrote:
> >
> > > Hi Ted,
> > >
> > > Thanks for the comments.
> > >
> > >
> > >
> > > *>> bq. we can perform down-conversion when Records.writeTo is
> called.>>
> > > Wouldn't this delay the network thread (though maybe the duration is
> > > short)>> ?*
> > > Yes, this is noted in the Cons section. I think we have a precedent for
> > > this in the `SSLTransportLayer` implementation, so trying to follow a
> > > similar model here.
> > >
> > >
> > > *>> Can you expand on the structure of LazyDownConvertedRecords in more
> > > detail ?*
> > > I added the basic structure to the KIP.
> > >
> > >
> > >
> > >
> > > *>> bq. even if it exceeds fetch.max.bytes>> I did a brief search but
> > > didn't see the above config. Did you mean>> message.max.bytes>> ?*
> > > Yes, thanks for the correction.
> > >
> > >
> > > *>> After the buffers grow, is there a way to trim them down if
> > > subsequent>> down-conversion doesn't need that much memory ?*
> > > The easiest way probably is to allocate and use a new buffer for each
> > > topic-partition. I think we would not require any trimming down if we
> do
> > > this. The buffer will be available for garbage collection as soon as we
> > are
> > > done serializing and writing all messages to the socket for the
> > particular
> > > topic-partition.
> > >
> > > Thanks,
> > > Dhruvil
> > >
> > >
> > > On Fri, Apr 6, 2018 at 3:23 PM, Ted Yu <yu...@gmail.com> wrote:
> > >
> > > > bq. we can perform down-conversion when Records.writeTo is called.
> > > >
> > > > Wouldn't this delay the network thread (though maybe the duration is
> > > short)
> > > > ?
> > > >
> > > > Can you expand on the structure of LazyDownConvertedRecords in more
> > > detail
> > > > ?
> > > >
> > > > bq. even if it exceeds fetch.max.bytes
> > > >
> > > > I did a brief search but didn't see the above config. Did you mean
> > > > message.max.bytes
> > > > ?
> > > >
> > > > bq. with possibility to grow if the allocation
> > > >
> > > > After the buffers grow, is there a way to trim them down if
> subsequent
> > > > down-conversion doesn't need that much memory ?
> > > >
> > > > Thanks
> > > >
> > > >
> > > > On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dh...@confluent.io>
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I created a KIP to help mitigate out of memory issues during
> > > > > down-conversion. The KIP proposes introducing a configuration that
> > can
> > > > > prevent down-conversions altogether, and also describes a design
> for
> > > > > efficient memory usage for down-conversion.
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
> > > > >
> > > > > Suggestions and feedback are welcome!
> > > > >
> > > > > Thanks,
> > > > > Dhruvil
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

Posted by Dhruvil Shah <dh...@confluent.io>.
Hi Rajini,

Thanks for the comments.

Which config parameters are these referring to?

This refers to a proposal that was later rejected. I have removed this goal
from the KIP as it is no longer valid.

What exactly is a chunk going to be

I have updated the KIP to remove references to the fixed buffers. I started
out thinking we would have fix-sized buffers with a memory pool but it
seems we could achieve what we want without the added complexity.

My current proposal is that we read a set of message batches into memory
till we reach a pre-configured size threshold, say 16kB (the pre-configured
size might need to be larger if the first message batch is larger than
that). We down-convert these batches and hold them in a temporary buffer
till are able to send out all the messages. And then repeat the same
process for subsequent batches.

Because we down-convert a maximum of 16kB worth of messages at any given
point in time, the memory utilization should be much more deterministic,
i.e. we have a maximum of about 16kB memory allocated for each in-flight
`FetchResponse` that requires down-conversion.

Let me know if this makes sense.

Thanks,
Dhruvil

On Wed, Apr 11, 2018 at 6:38 AM, Rajini Sivaram <ra...@gmail.com>
wrote:

> Hi Dhruvil,
>
> Thanks for the KIP. This is a great improvement to reduce OOMs in brokers
> during down-conversion.
>
> Just a couple of minor questions:
>
> The goals state: "*Provide appropriate configuration parameters to manage
> maximum memory usage during down-conversion on the broker.*"
> Which config parameters are these referring to?
>
> What exactly is a chunk going to be - will it be all the records for a
> partition (which could be quite large?) or one message batch? The KIP talks
> about pre-allocated fixed size buffers, but your last note suggests that
> you would use temporary buffers created for each partition. Do we need to
> consider using a memory pool for these or do we think that the buffers will
> be small enough to cope with lots of connections with downconversions? This
> will be a clear improvement over what we have now in any case, but  just
> checking anyway.
>
> Regards,
>
> Rajini
>
> On Sat, Apr 7, 2018 at 12:29 AM, Dhruvil Shah <dh...@confluent.io>
> wrote:
>
> > Hi Ted,
> >
> > Thanks for the comments.
> >
> >
> >
> > *>> bq. we can perform down-conversion when Records.writeTo is called.>>
> > Wouldn't this delay the network thread (though maybe the duration is
> > short)>> ?*
> > Yes, this is noted in the Cons section. I think we have a precedent for
> > this in the `SSLTransportLayer` implementation, so trying to follow a
> > similar model here.
> >
> >
> > *>> Can you expand on the structure of LazyDownConvertedRecords in more
> > detail ?*
> > I added the basic structure to the KIP.
> >
> >
> >
> >
> > *>> bq. even if it exceeds fetch.max.bytes>> I did a brief search but
> > didn't see the above config. Did you mean>> message.max.bytes>> ?*
> > Yes, thanks for the correction.
> >
> >
> > *>> After the buffers grow, is there a way to trim them down if
> > subsequent>> down-conversion doesn't need that much memory ?*
> > The easiest way probably is to allocate and use a new buffer for each
> > topic-partition. I think we would not require any trimming down if we do
> > this. The buffer will be available for garbage collection as soon as we
> are
> > done serializing and writing all messages to the socket for the
> particular
> > topic-partition.
> >
> > Thanks,
> > Dhruvil
> >
> >
> > On Fri, Apr 6, 2018 at 3:23 PM, Ted Yu <yu...@gmail.com> wrote:
> >
> > > bq. we can perform down-conversion when Records.writeTo is called.
> > >
> > > Wouldn't this delay the network thread (though maybe the duration is
> > short)
> > > ?
> > >
> > > Can you expand on the structure of LazyDownConvertedRecords in more
> > detail
> > > ?
> > >
> > > bq. even if it exceeds fetch.max.bytes
> > >
> > > I did a brief search but didn't see the above config. Did you mean
> > > message.max.bytes
> > > ?
> > >
> > > bq. with possibility to grow if the allocation
> > >
> > > After the buffers grow, is there a way to trim them down if subsequent
> > > down-conversion doesn't need that much memory ?
> > >
> > > Thanks
> > >
> > >
> > > On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dh...@confluent.io>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I created a KIP to help mitigate out of memory issues during
> > > > down-conversion. The KIP proposes introducing a configuration that
> can
> > > > prevent down-conversions altogether, and also describes a design for
> > > > efficient memory usage for down-conversion.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
> > > >
> > > > Suggestions and feedback are welcome!
> > > >
> > > > Thanks,
> > > > Dhruvil
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

Posted by Rajini Sivaram <ra...@gmail.com>.
Hi Dhruvil,

Thanks for the KIP. This is a great improvement to reduce OOMs in brokers
during down-conversion.

Just a couple of minor questions:

The goals state: "*Provide appropriate configuration parameters to manage
maximum memory usage during down-conversion on the broker.*"
Which config parameters are these referring to?

What exactly is a chunk going to be - will it be all the records for a
partition (which could be quite large?) or one message batch? The KIP talks
about pre-allocated fixed size buffers, but your last note suggests that
you would use temporary buffers created for each partition. Do we need to
consider using a memory pool for these or do we think that the buffers will
be small enough to cope with lots of connections with downconversions? This
will be a clear improvement over what we have now in any case, but  just
checking anyway.

Regards,

Rajini

On Sat, Apr 7, 2018 at 12:29 AM, Dhruvil Shah <dh...@confluent.io> wrote:

> Hi Ted,
>
> Thanks for the comments.
>
>
>
> *>> bq. we can perform down-conversion when Records.writeTo is called.>>
> Wouldn't this delay the network thread (though maybe the duration is
> short)>> ?*
> Yes, this is noted in the Cons section. I think we have a precedent for
> this in the `SSLTransportLayer` implementation, so trying to follow a
> similar model here.
>
>
> *>> Can you expand on the structure of LazyDownConvertedRecords in more
> detail ?*
> I added the basic structure to the KIP.
>
>
>
>
> *>> bq. even if it exceeds fetch.max.bytes>> I did a brief search but
> didn't see the above config. Did you mean>> message.max.bytes>> ?*
> Yes, thanks for the correction.
>
>
> *>> After the buffers grow, is there a way to trim them down if
> subsequent>> down-conversion doesn't need that much memory ?*
> The easiest way probably is to allocate and use a new buffer for each
> topic-partition. I think we would not require any trimming down if we do
> this. The buffer will be available for garbage collection as soon as we are
> done serializing and writing all messages to the socket for the particular
> topic-partition.
>
> Thanks,
> Dhruvil
>
>
> On Fri, Apr 6, 2018 at 3:23 PM, Ted Yu <yu...@gmail.com> wrote:
>
> > bq. we can perform down-conversion when Records.writeTo is called.
> >
> > Wouldn't this delay the network thread (though maybe the duration is
> short)
> > ?
> >
> > Can you expand on the structure of LazyDownConvertedRecords in more
> detail
> > ?
> >
> > bq. even if it exceeds fetch.max.bytes
> >
> > I did a brief search but didn't see the above config. Did you mean
> > message.max.bytes
> > ?
> >
> > bq. with possibility to grow if the allocation
> >
> > After the buffers grow, is there a way to trim them down if subsequent
> > down-conversion doesn't need that much memory ?
> >
> > Thanks
> >
> >
> > On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dh...@confluent.io>
> wrote:
> >
> > > Hi,
> > >
> > > I created a KIP to help mitigate out of memory issues during
> > > down-conversion. The KIP proposes introducing a configuration that can
> > > prevent down-conversions altogether, and also describes a design for
> > > efficient memory usage for down-conversion.
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
> > >
> > > Suggestions and feedback are welcome!
> > >
> > > Thanks,
> > > Dhruvil
> > >
> >
>

Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

Posted by Dhruvil Shah <dh...@confluent.io>.
Hi Ted,

Thanks for the comments.



*>> bq. we can perform down-conversion when Records.writeTo is called.>>
Wouldn't this delay the network thread (though maybe the duration is
short)>> ?*
Yes, this is noted in the Cons section. I think we have a precedent for
this in the `SSLTransportLayer` implementation, so trying to follow a
similar model here.


*>> Can you expand on the structure of LazyDownConvertedRecords in more
detail ?*
I added the basic structure to the KIP.




*>> bq. even if it exceeds fetch.max.bytes>> I did a brief search but
didn't see the above config. Did you mean>> message.max.bytes>> ?*
Yes, thanks for the correction.


*>> After the buffers grow, is there a way to trim them down if
subsequent>> down-conversion doesn't need that much memory ?*
The easiest way probably is to allocate and use a new buffer for each
topic-partition. I think we would not require any trimming down if we do
this. The buffer will be available for garbage collection as soon as we are
done serializing and writing all messages to the socket for the particular
topic-partition.

Thanks,
Dhruvil


On Fri, Apr 6, 2018 at 3:23 PM, Ted Yu <yu...@gmail.com> wrote:

> bq. we can perform down-conversion when Records.writeTo is called.
>
> Wouldn't this delay the network thread (though maybe the duration is short)
> ?
>
> Can you expand on the structure of LazyDownConvertedRecords in more detail
> ?
>
> bq. even if it exceeds fetch.max.bytes
>
> I did a brief search but didn't see the above config. Did you mean
> message.max.bytes
> ?
>
> bq. with possibility to grow if the allocation
>
> After the buffers grow, is there a way to trim them down if subsequent
> down-conversion doesn't need that much memory ?
>
> Thanks
>
>
> On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dh...@confluent.io> wrote:
>
> > Hi,
> >
> > I created a KIP to help mitigate out of memory issues during
> > down-conversion. The KIP proposes introducing a configuration that can
> > prevent down-conversions altogether, and also describes a design for
> > efficient memory usage for down-conversion.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
> >
> > Suggestions and feedback are welcome!
> >
> > Thanks,
> > Dhruvil
> >
>

Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

Posted by Ted Yu <yu...@gmail.com>.
bq. we can perform down-conversion when Records.writeTo is called.

Wouldn't this delay the network thread (though maybe the duration is short)
?

Can you expand on the structure of LazyDownConvertedRecords in more detail ?

bq. even if it exceeds fetch.max.bytes

I did a brief search but didn't see the above config. Did you mean
message.max.bytes
?

bq. with possibility to grow if the allocation

After the buffers grow, is there a way to trim them down if subsequent
down-conversion doesn't need that much memory ?

Thanks


On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dh...@confluent.io> wrote:

> Hi,
>
> I created a KIP to help mitigate out of memory issues during
> down-conversion. The KIP proposes introducing a configuration that can
> prevent down-conversions altogether, and also describes a design for
> efficient memory usage for down-conversion.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 283%3A+Efficient+Memory+Usage+for+Down-Conversion
>
> Suggestions and feedback are welcome!
>
> Thanks,
> Dhruvil
>

Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

Posted by Dhruvil Shah <dh...@confluent.io>.
I fixed the diagrams - let me know if you are still having trouble seeing
them.

Thanks,
Dhruvil

On Fri, Apr 6, 2018 at 3:05 PM, Ted Yu <yu...@gmail.com> wrote:

> The two embedded diagrams seem broken.
>
> Can you double check ?
>
> Thanks
>
> On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dh...@confluent.io> wrote:
>
> > Hi,
> >
> > I created a KIP to help mitigate out of memory issues during
> > down-conversion. The KIP proposes introducing a configuration that can
> > prevent down-conversions altogether, and also describes a design for
> > efficient memory usage for down-conversion.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
> >
> > Suggestions and feedback are welcome!
> >
> > Thanks,
> > Dhruvil
> >
>

Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

Posted by Ted Yu <yu...@gmail.com>.
The two embedded diagrams seem broken.

Can you double check ?

Thanks

On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dh...@confluent.io> wrote:

> Hi,
>
> I created a KIP to help mitigate out of memory issues during
> down-conversion. The KIP proposes introducing a configuration that can
> prevent down-conversions altogether, and also describes a design for
> efficient memory usage for down-conversion.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 283%3A+Efficient+Memory+Usage+for+Down-Conversion
>
> Suggestions and feedback are welcome!
>
> Thanks,
> Dhruvil
>

Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

Posted by Dhruvil Shah <dh...@confluent.io>.
Hi all,

I have updated the KIP to reflect changes from the discussion in this
thread. In particular, the KIP now recommends adding a configuration that
allows disabling any kind of message down-conversion for client
`FetchRequest`, instead of having to specify the minimum compatibility.

I also added a note about Jun's idea of down-converting only the first
message batch of the first topic-partition. I think this is a bit tricky to
implement, and given that the maximum fetch size for a topic-partition is
capped at 1MB by default, it should probably be acceptable to take the cost
of down-converting the entire partition upfront. Jun, let me know what you
think.

Any other suggestions / feedback are welcome.

Thanks,
Dhruvil

On Tue, Apr 17, 2018 at 4:21 PM, Dhruvil Shah <dh...@confluent.io> wrote:

> Hi Jun,
>
> Yes, that is true. Ideally, we should be able to down-convert only the
> first message batch in the request handling thread and delay everything
> else till the network thread. I have not thought through all the details of
> how we could do this but at first glance this seems tricky to implement,
> given that `FetchResponse.PartitionData` for the first partition will be a
> combination of `MemoryRecords` and `LazyDownConvertedRecords`. I will think
> about this a bit more to see if I can come up with a clean abstraction, and
> will also add it to the KIP.
>
> Thanks,
> Dhruvil
>
> On Mon, Apr 16, 2018 at 6:07 PM, Jun Rao <ju...@confluent.io> wrote:
>
>> Hi, Dhruvil,
>>
>> Thanks for the KIP. Looks good me to overall. Just one comment below.
>>
>> "To prevent this from happening, we will not delay down-conversion of the
>> first partition in the response. We will down-convert all messages of the
>> first partition in the I/O thread (like we do today), and only delay
>> down-conversion for subsequent partitions." It seems that we can further
>> optimize this by only down-converting the first message set in the first
>> partition in the request handling threads?
>>
>> Jun
>>
>>
>> On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dh...@confluent.io>
>> wrote:
>>
>> > Hi,
>> >
>> > I created a KIP to help mitigate out of memory issues during
>> > down-conversion. The KIP proposes introducing a configuration that can
>> > prevent down-conversions altogether, and also describes a design for
>> > efficient memory usage for down-conversion.
>> >
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
>> >
>> > Suggestions and feedback are welcome!
>> >
>> > Thanks,
>> > Dhruvil
>> >
>>
>
>

Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

Posted by Dhruvil Shah <dh...@confluent.io>.
Hi Jun,

Yes, that is true. Ideally, we should be able to down-convert only the
first message batch in the request handling thread and delay everything
else till the network thread. I have not thought through all the details of
how we could do this but at first glance this seems tricky to implement,
given that `FetchResponse.PartitionData` for the first partition will be a
combination of `MemoryRecords` and `LazyDownConvertedRecords`. I will think
about this a bit more to see if I can come up with a clean abstraction, and
will also add it to the KIP.

Thanks,
Dhruvil

On Mon, Apr 16, 2018 at 6:07 PM, Jun Rao <ju...@confluent.io> wrote:

> Hi, Dhruvil,
>
> Thanks for the KIP. Looks good me to overall. Just one comment below.
>
> "To prevent this from happening, we will not delay down-conversion of the
> first partition in the response. We will down-convert all messages of the
> first partition in the I/O thread (like we do today), and only delay
> down-conversion for subsequent partitions." It seems that we can further
> optimize this by only down-converting the first message set in the first
> partition in the request handling threads?
>
> Jun
>
>
> On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dh...@confluent.io> wrote:
>
> > Hi,
> >
> > I created a KIP to help mitigate out of memory issues during
> > down-conversion. The KIP proposes introducing a configuration that can
> > prevent down-conversions altogether, and also describes a design for
> > efficient memory usage for down-conversion.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
> >
> > Suggestions and feedback are welcome!
> >
> > Thanks,
> > Dhruvil
> >
>

Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

Posted by Jun Rao <ju...@confluent.io>.
Hi, Dhruvil,

Thanks for the KIP. Looks good me to overall. Just one comment below.

"To prevent this from happening, we will not delay down-conversion of the
first partition in the response. We will down-convert all messages of the
first partition in the I/O thread (like we do today), and only delay
down-conversion for subsequent partitions." It seems that we can further
optimize this by only down-converting the first message set in the first
partition in the request handling threads?

Jun


On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dh...@confluent.io> wrote:

> Hi,
>
> I created a KIP to help mitigate out of memory issues during
> down-conversion. The KIP proposes introducing a configuration that can
> prevent down-conversions altogether, and also describes a design for
> efficient memory usage for down-conversion.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 283%3A+Efficient+Memory+Usage+for+Down-Conversion
>
> Suggestions and feedback are welcome!
>
> Thanks,
> Dhruvil
>