You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Andrey L. Neporada" <an...@yandex-team.ru> on 2016/07/21 09:49:21 UTC

[DISCUSS] Optimise memory used by replication process by using adaptive fetch message size

Hi all!

We noticed that our Kafka cluster uses a lot of memory for replication. Our Kafka usage pattern is following:

1. Most messages are small (tens or hundreds kilobytes at most), but some (rare) messages can be several megabytes.So, we have to set replica.fetch.max.bytes = max.message.bytes = 8MB
2. Each Kafka broker handles several thousands of partitions from multiple topics.

In this scenario total memory required for replication (i.e. replica.fetch.max.bytes * numOfPartitions) is unreasonably big.

So we would like to propose following approach to fix this problem:

1. Introduce new config parameter replica.fetch.base.bytes - which is the initial size of replication data chunk. By default this parameter should be equal to replica.fetch.max.bytes so the replication process will work as before.

2. If the ReplicaFetcherThread fails when trying to replicate message bigger than current replication chunk, we increase it twofold (or up to replica.fetch.max.bytes, whichever is smaller) and retry.

3. If the chunk is replicated successfully we try to decrease the size of replication chunk back to replica.fetch.base.bytes.
   

By choosing replica.fetch.base.bytes in optimal way (in our case ~200K), we we able to significatly decrease memory usage without any noticeable impact on replication efficiency.

Here is JIRA ticket (with PR): https://issues.apache.org/jira/browse/KAFKA-3979

Your comments and feedback are highly appreciated!


Thanks,
Andrey.

Re: [DISCUSS] KAFKA-2063 Add possibility to bound fetch response size (was Re: [DISCUSS] Optimise memory used by replication process by using adaptive fetch message size)

Posted by Ben Stopford <be...@confluent.io>.
Andrey - I’m not sure we quite have consensus on the Randomisation vs Round Robin issue but it’s probably worth you just raising a kip and put one of the options as a rejected alternative. 

B
> On 29 Jul 2016, at 11:59, Ben Stopford <be...@confluent.io> wrote:
> 
> Thanks for the kicking this one off Andrey. Generally it looks great! 
> 
> I left a comment on the Jira regarding whether we should remove the existing limitBytes, along with a potential alternative to doing randomisation. 
> 
> B
>> On 29 Jul 2016, at 09:17, Andrey L. Neporada <an...@yandex-team.ru> wrote:
>> 
>> Hi all!
>> 
>> I would like to get your feedback on PR for bug KAFKA-2063.
>> Looks like KIP is needed there, but it would be nice to get feedback first.
>> 
>> Thanks,
>> Andrey.
>> 
>> 
>>> On 22 Jul 2016, at 12:26, Andrey L. Neporada <an...@yandex-team.ru> wrote:
>>> 
>>> Hi!
>>> 
>>> Thanks for feedback - I agree that the proper way to fix this issue is to provide per-request data limit.
>>> Will try to do it.
>>> 
>>> Thanks,
>>> Andrey.
>>> 
>>> 
>>> 
>>>> On 21 Jul 2016, at 18:57, Jay Kreps <ja...@confluent.io> wrote:
>>>> 
>>>> I think the memory usage for consumers can be improved a lot, but I think
>>>> there may be a better way then what you are proposing.
>>>> 
>>>> The problem is exactly what you describe: the bound the user sets is
>>>> per-partition, but the number of partitions may be quite high. The consumer
>>>> could provide a bound on the response size by only requesting a subset of
>>>> the partitions, but this would mean that if there was no data available on
>>>> those partitions the consumer wouldn't be checking other partitions, which
>>>> would add latency.
>>>> 
>>>> I think the solution is to add a new "max response size" parameter to the
>>>> fetch request so the server checks all partitions but doesn't send back
>>>> more than this amount in total. This has to be done carefully to ensure
>>>> fairness (i.e. if one partition has unbounded amounts of data it shouldn't
>>>> indefinitely starve other partitions).
>>>> 
>>>> This will fix memory management both in the replicas and for consumers.
>>>> 
>>>> There is a JIRA for this: https://issues.apache.org/jira/browse/KAFKA-2063
>>>> 
>>>> I think it isn't too hard to do and would be a huge aid to the memory
>>>> profile of both the clients and server.
>>>> 
>>>> I also don't think there is much use in setting a max size that expands
>>>> dynamically since in any case you have to be able to support the maximum,
>>>> so you might as well always use that rather than expanding and contracting
>>>> dynamically. That is, if your max fetch response size is 64MB you need to
>>>> budget 64MB of free memory, so making it smaller some of the time doesn't
>>>> really help you.
>>>> 
>>>> -Jay
>>>> 
>>>> On Thu, Jul 21, 2016 at 2:49 AM, Andrey L. Neporada <
>>>> aneporada@yandex-team.ru> wrote:
>>>> 
>>>>> Hi all!
>>>>> 
>>>>> We noticed that our Kafka cluster uses a lot of memory for replication.
>>>>> Our Kafka usage pattern is following:
>>>>> 
>>>>> 1. Most messages are small (tens or hundreds kilobytes at most), but some
>>>>> (rare) messages can be several megabytes.So, we have to set
>>>>> replica.fetch.max.bytes = max.message.bytes = 8MB
>>>>> 2. Each Kafka broker handles several thousands of partitions from multiple
>>>>> topics.
>>>>> 
>>>>> In this scenario total memory required for replication (i.e.
>>>>> replica.fetch.max.bytes * numOfPartitions) is unreasonably big.
>>>>> 
>>>>> So we would like to propose following approach to fix this problem:
>>>>> 
>>>>> 1. Introduce new config parameter replica.fetch.base.bytes - which is the
>>>>> initial size of replication data chunk. By default this parameter should be
>>>>> equal to replica.fetch.max.bytes so the replication process will work as
>>>>> before.
>>>>> 
>>>>> 2. If the ReplicaFetcherThread fails when trying to replicate message
>>>>> bigger than current replication chunk, we increase it twofold (or up to
>>>>> replica.fetch.max.bytes, whichever is smaller) and retry.
>>>>> 
>>>>> 3. If the chunk is replicated successfully we try to decrease the size of
>>>>> replication chunk back to replica.fetch.base.bytes.
>>>>> 
>>>>> 
>>>>> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K),
>>>>> we we able to significatly decrease memory usage without any noticeable
>>>>> impact on replication efficiency.
>>>>> 
>>>>> Here is JIRA ticket (with PR):
>>>>> https://issues.apache.org/jira/browse/KAFKA-3979
>>>>> 
>>>>> Your comments and feedback are highly appreciated!
>>>>> 
>>>>> 
>>>>> Thanks,
>>>>> Andrey.
>>> 
>> 
> 


Re: [DISCUSS] KAFKA-2063 Add possibility to bound fetch response size (was Re: [DISCUSS] Optimise memory used by replication process by using adaptive fetch message size)

Posted by Ben Stopford <be...@confluent.io>.
Thanks for the kicking this one off Andrey. Generally it looks great! 

I left a comment on the Jira regarding whether we should remove the existing limitBytes, along with a potential alternative to doing randomisation. 

B
> On 29 Jul 2016, at 09:17, Andrey L. Neporada <an...@yandex-team.ru> wrote:
> 
> Hi all!
> 
> I would like to get your feedback on PR for bug KAFKA-2063.
> Looks like KIP is needed there, but it would be nice to get feedback first.
> 
> Thanks,
> Andrey.
> 
> 
>> On 22 Jul 2016, at 12:26, Andrey L. Neporada <an...@yandex-team.ru> wrote:
>> 
>> Hi!
>> 
>> Thanks for feedback - I agree that the proper way to fix this issue is to provide per-request data limit.
>> Will try to do it.
>> 
>> Thanks,
>> Andrey.
>> 
>> 
>> 
>>> On 21 Jul 2016, at 18:57, Jay Kreps <ja...@confluent.io> wrote:
>>> 
>>> I think the memory usage for consumers can be improved a lot, but I think
>>> there may be a better way then what you are proposing.
>>> 
>>> The problem is exactly what you describe: the bound the user sets is
>>> per-partition, but the number of partitions may be quite high. The consumer
>>> could provide a bound on the response size by only requesting a subset of
>>> the partitions, but this would mean that if there was no data available on
>>> those partitions the consumer wouldn't be checking other partitions, which
>>> would add latency.
>>> 
>>> I think the solution is to add a new "max response size" parameter to the
>>> fetch request so the server checks all partitions but doesn't send back
>>> more than this amount in total. This has to be done carefully to ensure
>>> fairness (i.e. if one partition has unbounded amounts of data it shouldn't
>>> indefinitely starve other partitions).
>>> 
>>> This will fix memory management both in the replicas and for consumers.
>>> 
>>> There is a JIRA for this: https://issues.apache.org/jira/browse/KAFKA-2063
>>> 
>>> I think it isn't too hard to do and would be a huge aid to the memory
>>> profile of both the clients and server.
>>> 
>>> I also don't think there is much use in setting a max size that expands
>>> dynamically since in any case you have to be able to support the maximum,
>>> so you might as well always use that rather than expanding and contracting
>>> dynamically. That is, if your max fetch response size is 64MB you need to
>>> budget 64MB of free memory, so making it smaller some of the time doesn't
>>> really help you.
>>> 
>>> -Jay
>>> 
>>> On Thu, Jul 21, 2016 at 2:49 AM, Andrey L. Neporada <
>>> aneporada@yandex-team.ru> wrote:
>>> 
>>>> Hi all!
>>>> 
>>>> We noticed that our Kafka cluster uses a lot of memory for replication.
>>>> Our Kafka usage pattern is following:
>>>> 
>>>> 1. Most messages are small (tens or hundreds kilobytes at most), but some
>>>> (rare) messages can be several megabytes.So, we have to set
>>>> replica.fetch.max.bytes = max.message.bytes = 8MB
>>>> 2. Each Kafka broker handles several thousands of partitions from multiple
>>>> topics.
>>>> 
>>>> In this scenario total memory required for replication (i.e.
>>>> replica.fetch.max.bytes * numOfPartitions) is unreasonably big.
>>>> 
>>>> So we would like to propose following approach to fix this problem:
>>>> 
>>>> 1. Introduce new config parameter replica.fetch.base.bytes - which is the
>>>> initial size of replication data chunk. By default this parameter should be
>>>> equal to replica.fetch.max.bytes so the replication process will work as
>>>> before.
>>>> 
>>>> 2. If the ReplicaFetcherThread fails when trying to replicate message
>>>> bigger than current replication chunk, we increase it twofold (or up to
>>>> replica.fetch.max.bytes, whichever is smaller) and retry.
>>>> 
>>>> 3. If the chunk is replicated successfully we try to decrease the size of
>>>> replication chunk back to replica.fetch.base.bytes.
>>>> 
>>>> 
>>>> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K),
>>>> we we able to significatly decrease memory usage without any noticeable
>>>> impact on replication efficiency.
>>>> 
>>>> Here is JIRA ticket (with PR):
>>>> https://issues.apache.org/jira/browse/KAFKA-3979
>>>> 
>>>> Your comments and feedback are highly appreciated!
>>>> 
>>>> 
>>>> Thanks,
>>>> Andrey.
>> 
> 


[DISCUSS] KAFKA-2063 Add possibility to bound fetch response size (was Re: [DISCUSS] Optimise memory used by replication process by using adaptive fetch message size)

Posted by "Andrey L. Neporada" <an...@yandex-team.ru>.
Hi all!

I would like to get your feedback on PR for bug KAFKA-2063.
Looks like KIP is needed there, but it would be nice to get feedback first.

Thanks,
Andrey.


> On 22 Jul 2016, at 12:26, Andrey L. Neporada <an...@yandex-team.ru> wrote:
> 
> Hi!
> 
> Thanks for feedback - I agree that the proper way to fix this issue is to provide per-request data limit.
> Will try to do it.
> 
> Thanks,
> Andrey.
> 
> 
> 
>> On 21 Jul 2016, at 18:57, Jay Kreps <ja...@confluent.io> wrote:
>> 
>> I think the memory usage for consumers can be improved a lot, but I think
>> there may be a better way then what you are proposing.
>> 
>> The problem is exactly what you describe: the bound the user sets is
>> per-partition, but the number of partitions may be quite high. The consumer
>> could provide a bound on the response size by only requesting a subset of
>> the partitions, but this would mean that if there was no data available on
>> those partitions the consumer wouldn't be checking other partitions, which
>> would add latency.
>> 
>> I think the solution is to add a new "max response size" parameter to the
>> fetch request so the server checks all partitions but doesn't send back
>> more than this amount in total. This has to be done carefully to ensure
>> fairness (i.e. if one partition has unbounded amounts of data it shouldn't
>> indefinitely starve other partitions).
>> 
>> This will fix memory management both in the replicas and for consumers.
>> 
>> There is a JIRA for this: https://issues.apache.org/jira/browse/KAFKA-2063
>> 
>> I think it isn't too hard to do and would be a huge aid to the memory
>> profile of both the clients and server.
>> 
>> I also don't think there is much use in setting a max size that expands
>> dynamically since in any case you have to be able to support the maximum,
>> so you might as well always use that rather than expanding and contracting
>> dynamically. That is, if your max fetch response size is 64MB you need to
>> budget 64MB of free memory, so making it smaller some of the time doesn't
>> really help you.
>> 
>> -Jay
>> 
>> On Thu, Jul 21, 2016 at 2:49 AM, Andrey L. Neporada <
>> aneporada@yandex-team.ru> wrote:
>> 
>>> Hi all!
>>> 
>>> We noticed that our Kafka cluster uses a lot of memory for replication.
>>> Our Kafka usage pattern is following:
>>> 
>>> 1. Most messages are small (tens or hundreds kilobytes at most), but some
>>> (rare) messages can be several megabytes.So, we have to set
>>> replica.fetch.max.bytes = max.message.bytes = 8MB
>>> 2. Each Kafka broker handles several thousands of partitions from multiple
>>> topics.
>>> 
>>> In this scenario total memory required for replication (i.e.
>>> replica.fetch.max.bytes * numOfPartitions) is unreasonably big.
>>> 
>>> So we would like to propose following approach to fix this problem:
>>> 
>>> 1. Introduce new config parameter replica.fetch.base.bytes - which is the
>>> initial size of replication data chunk. By default this parameter should be
>>> equal to replica.fetch.max.bytes so the replication process will work as
>>> before.
>>> 
>>> 2. If the ReplicaFetcherThread fails when trying to replicate message
>>> bigger than current replication chunk, we increase it twofold (or up to
>>> replica.fetch.max.bytes, whichever is smaller) and retry.
>>> 
>>> 3. If the chunk is replicated successfully we try to decrease the size of
>>> replication chunk back to replica.fetch.base.bytes.
>>> 
>>> 
>>> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K),
>>> we we able to significatly decrease memory usage without any noticeable
>>> impact on replication efficiency.
>>> 
>>> Here is JIRA ticket (with PR):
>>> https://issues.apache.org/jira/browse/KAFKA-3979
>>> 
>>> Your comments and feedback are highly appreciated!
>>> 
>>> 
>>> Thanks,
>>> Andrey.
> 


Re: [DISCUSS] Optimise memory used by replication process by using adaptive fetch message size

Posted by "Andrey L. Neporada" <an...@yandex-team.ru>.
Hi!

Thanks for feedback - I agree that the proper way to fix this issue is to provide per-request data limit.
Will try to do it.

Thanks,
Andrey.



> On 21 Jul 2016, at 18:57, Jay Kreps <ja...@confluent.io> wrote:
> 
> I think the memory usage for consumers can be improved a lot, but I think
> there may be a better way then what you are proposing.
> 
> The problem is exactly what you describe: the bound the user sets is
> per-partition, but the number of partitions may be quite high. The consumer
> could provide a bound on the response size by only requesting a subset of
> the partitions, but this would mean that if there was no data available on
> those partitions the consumer wouldn't be checking other partitions, which
> would add latency.
> 
> I think the solution is to add a new "max response size" parameter to the
> fetch request so the server checks all partitions but doesn't send back
> more than this amount in total. This has to be done carefully to ensure
> fairness (i.e. if one partition has unbounded amounts of data it shouldn't
> indefinitely starve other partitions).
> 
> This will fix memory management both in the replicas and for consumers.
> 
> There is a JIRA for this: https://issues.apache.org/jira/browse/KAFKA-2063
> 
> I think it isn't too hard to do and would be a huge aid to the memory
> profile of both the clients and server.
> 
> I also don't think there is much use in setting a max size that expands
> dynamically since in any case you have to be able to support the maximum,
> so you might as well always use that rather than expanding and contracting
> dynamically. That is, if your max fetch response size is 64MB you need to
> budget 64MB of free memory, so making it smaller some of the time doesn't
> really help you.
> 
> -Jay
> 
> On Thu, Jul 21, 2016 at 2:49 AM, Andrey L. Neporada <
> aneporada@yandex-team.ru> wrote:
> 
>> Hi all!
>> 
>> We noticed that our Kafka cluster uses a lot of memory for replication.
>> Our Kafka usage pattern is following:
>> 
>> 1. Most messages are small (tens or hundreds kilobytes at most), but some
>> (rare) messages can be several megabytes.So, we have to set
>> replica.fetch.max.bytes = max.message.bytes = 8MB
>> 2. Each Kafka broker handles several thousands of partitions from multiple
>> topics.
>> 
>> In this scenario total memory required for replication (i.e.
>> replica.fetch.max.bytes * numOfPartitions) is unreasonably big.
>> 
>> So we would like to propose following approach to fix this problem:
>> 
>> 1. Introduce new config parameter replica.fetch.base.bytes - which is the
>> initial size of replication data chunk. By default this parameter should be
>> equal to replica.fetch.max.bytes so the replication process will work as
>> before.
>> 
>> 2. If the ReplicaFetcherThread fails when trying to replicate message
>> bigger than current replication chunk, we increase it twofold (or up to
>> replica.fetch.max.bytes, whichever is smaller) and retry.
>> 
>> 3. If the chunk is replicated successfully we try to decrease the size of
>> replication chunk back to replica.fetch.base.bytes.
>> 
>> 
>> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K),
>> we we able to significatly decrease memory usage without any noticeable
>> impact on replication efficiency.
>> 
>> Here is JIRA ticket (with PR):
>> https://issues.apache.org/jira/browse/KAFKA-3979
>> 
>> Your comments and feedback are highly appreciated!
>> 
>> 
>> Thanks,
>> Andrey.


Re: [DISCUSS] Optimise memory used by replication process by using adaptive fetch message size

Posted by Jay Kreps <ja...@confluent.io>.
I think the memory usage for consumers can be improved a lot, but I think
there may be a better way then what you are proposing.

The problem is exactly what you describe: the bound the user sets is
per-partition, but the number of partitions may be quite high. The consumer
could provide a bound on the response size by only requesting a subset of
the partitions, but this would mean that if there was no data available on
those partitions the consumer wouldn't be checking other partitions, which
would add latency.

I think the solution is to add a new "max response size" parameter to the
fetch request so the server checks all partitions but doesn't send back
more than this amount in total. This has to be done carefully to ensure
fairness (i.e. if one partition has unbounded amounts of data it shouldn't
indefinitely starve other partitions).

This will fix memory management both in the replicas and for consumers.

There is a JIRA for this: https://issues.apache.org/jira/browse/KAFKA-2063

I think it isn't too hard to do and would be a huge aid to the memory
profile of both the clients and server.

I also don't think there is much use in setting a max size that expands
dynamically since in any case you have to be able to support the maximum,
so you might as well always use that rather than expanding and contracting
dynamically. That is, if your max fetch response size is 64MB you need to
budget 64MB of free memory, so making it smaller some of the time doesn't
really help you.

-Jay

On Thu, Jul 21, 2016 at 2:49 AM, Andrey L. Neporada <
aneporada@yandex-team.ru> wrote:

> Hi all!
>
> We noticed that our Kafka cluster uses a lot of memory for replication.
> Our Kafka usage pattern is following:
>
> 1. Most messages are small (tens or hundreds kilobytes at most), but some
> (rare) messages can be several megabytes.So, we have to set
> replica.fetch.max.bytes = max.message.bytes = 8MB
> 2. Each Kafka broker handles several thousands of partitions from multiple
> topics.
>
> In this scenario total memory required for replication (i.e.
> replica.fetch.max.bytes * numOfPartitions) is unreasonably big.
>
> So we would like to propose following approach to fix this problem:
>
> 1. Introduce new config parameter replica.fetch.base.bytes - which is the
> initial size of replication data chunk. By default this parameter should be
> equal to replica.fetch.max.bytes so the replication process will work as
> before.
>
> 2. If the ReplicaFetcherThread fails when trying to replicate message
> bigger than current replication chunk, we increase it twofold (or up to
> replica.fetch.max.bytes, whichever is smaller) and retry.
>
> 3. If the chunk is replicated successfully we try to decrease the size of
> replication chunk back to replica.fetch.base.bytes.
>
>
> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K),
> we we able to significatly decrease memory usage without any noticeable
> impact on replication efficiency.
>
> Here is JIRA ticket (with PR):
> https://issues.apache.org/jira/browse/KAFKA-3979
>
> Your comments and feedback are highly appreciated!
>
>
> Thanks,
> Andrey.

Re: [DISCUSS] Optimise memory used by replication process by using adaptive fetch message size

Posted by "Andrey L. Neporada" <an...@yandex-team.ru>.
Hi!

Sorry for not being clear enough.
The problem is on follower side, not on leader side.
It is the follower who is allocating replica.fetch.max.bytes-sized buffers for fetch responses somewhere in Java client code.

Andrey.



> On 21 Jul 2016, at 16:28, Tom Crayford <tc...@heroku.com> wrote:
> 
> Hi there,
> 
> From my understanding of the protocol (and from digging in the source code
> a bunch) I can't see anywhere where Kafka overallocates memory based on the
> fetch request's max bytes, but maybe I have missed something. If there is
> such a place, then I'd recommend fixing that issue instead - it seems more
> pressing and will alleviate your issue (unless I'm misunderstanding
> something and we *have* to overallocate somewhere).
> 
> I looked in the fetch request path up and down, and in the leader, tracing
> from KafkaApis -> ReplicaManager -> Log -> LogSegment, then to
> FetchResponse and FetchResponseSend (in case you want some pointers to some
> code).
> 
> I may be missing something here, but there seems to be a deeper issue here,
> 
> Tom Crayford
> Heroku Kafka
> 
> On Thu, Jul 21, 2016 at 10:49 AM, Andrey L. Neporada <
> aneporada@yandex-team.ru> wrote:
> 
>> Hi all!
>> 
>> We noticed that our Kafka cluster uses a lot of memory for replication.
>> Our Kafka usage pattern is following:
>> 
>> 1. Most messages are small (tens or hundreds kilobytes at most), but some
>> (rare) messages can be several megabytes.So, we have to set
>> replica.fetch.max.bytes = max.message.bytes = 8MB
>> 2. Each Kafka broker handles several thousands of partitions from multiple
>> topics.
>> 
>> In this scenario total memory required for replication (i.e.
>> replica.fetch.max.bytes * numOfPartitions) is unreasonably big.
>> 
>> So we would like to propose following approach to fix this problem:
>> 
>> 1. Introduce new config parameter replica.fetch.base.bytes - which is the
>> initial size of replication data chunk. By default this parameter should be
>> equal to replica.fetch.max.bytes so the replication process will work as
>> before.
>> 
>> 2. If the ReplicaFetcherThread fails when trying to replicate message
>> bigger than current replication chunk, we increase it twofold (or up to
>> replica.fetch.max.bytes, whichever is smaller) and retry.
>> 
>> 3. If the chunk is replicated successfully we try to decrease the size of
>> replication chunk back to replica.fetch.base.bytes.
>> 
>> 
>> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K),
>> we we able to significatly decrease memory usage without any noticeable
>> impact on replication efficiency.
>> 
>> Here is JIRA ticket (with PR):
>> https://issues.apache.org/jira/browse/KAFKA-3979
>> 
>> Your comments and feedback are highly appreciated!
>> 
>> 
>> Thanks,
>> Andrey.


Re: [DISCUSS] Optimise memory used by replication process by using adaptive fetch message size

Posted by Tom Crayford <tc...@heroku.com>.
Hi there,

From my understanding of the protocol (and from digging in the source code
a bunch) I can't see anywhere where Kafka overallocates memory based on the
fetch request's max bytes, but maybe I have missed something. If there is
such a place, then I'd recommend fixing that issue instead - it seems more
pressing and will alleviate your issue (unless I'm misunderstanding
something and we *have* to overallocate somewhere).

I looked in the fetch request path up and down, and in the leader, tracing
from KafkaApis -> ReplicaManager -> Log -> LogSegment, then to
FetchResponse and FetchResponseSend (in case you want some pointers to some
code).

I may be missing something here, but there seems to be a deeper issue here,

Tom Crayford
Heroku Kafka

On Thu, Jul 21, 2016 at 10:49 AM, Andrey L. Neporada <
aneporada@yandex-team.ru> wrote:

> Hi all!
>
> We noticed that our Kafka cluster uses a lot of memory for replication.
> Our Kafka usage pattern is following:
>
> 1. Most messages are small (tens or hundreds kilobytes at most), but some
> (rare) messages can be several megabytes.So, we have to set
> replica.fetch.max.bytes = max.message.bytes = 8MB
> 2. Each Kafka broker handles several thousands of partitions from multiple
> topics.
>
> In this scenario total memory required for replication (i.e.
> replica.fetch.max.bytes * numOfPartitions) is unreasonably big.
>
> So we would like to propose following approach to fix this problem:
>
> 1. Introduce new config parameter replica.fetch.base.bytes - which is the
> initial size of replication data chunk. By default this parameter should be
> equal to replica.fetch.max.bytes so the replication process will work as
> before.
>
> 2. If the ReplicaFetcherThread fails when trying to replicate message
> bigger than current replication chunk, we increase it twofold (or up to
> replica.fetch.max.bytes, whichever is smaller) and retry.
>
> 3. If the chunk is replicated successfully we try to decrease the size of
> replication chunk back to replica.fetch.base.bytes.
>
>
> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K),
> we we able to significatly decrease memory usage without any noticeable
> impact on replication efficiency.
>
> Here is JIRA ticket (with PR):
> https://issues.apache.org/jira/browse/KAFKA-3979
>
> Your comments and feedback are highly appreciated!
>
>
> Thanks,
> Andrey.