You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "radai rosenblatt (Jira)" <ji...@apache.org> on 2021/05/02 00:25:00 UTC

[jira] [Commented] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records

    [ https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17337897#comment-17337897 ] 

radai rosenblatt commented on KAFKA-12605:
------------------------------------------

PR files against trunk - https://github.com/apache/kafka/pull/10624

> kafka consumer churns through buffer memory iterating over records
> ------------------------------------------------------------------
>
>                 Key: KAFKA-12605
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12605
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 2.7.0
>            Reporter: radai rosenblatt
>            Priority: Major
>         Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png
>
>
> we recently conducted analysis on memory allocations by the kafka consumer and found a significant amount of buffers that graduate out of the young gen causing GC load.
>  
> these are tthe buffers used to gunzip record batches in the consumer when polling. since the same iterator (and underlying streams and buffers) are likely to live through several poll() cycles these buffers graduate out of young gen and cause issues.
>  
> see attached memory allocation flame graph:
> !Screen Shot 2021-04-01 at 3.55.47 PM.png!  
> the code causing this is in CompressionTypye.GZIP (taken from current trunk):
> {code:java}
> @Override
> public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
>     try {
>         // Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to
>         // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small
>         // number of bytes (potentially a single byte)
>         return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024),
>                 16 * 1024);
>     } catch (Exception e) {
>         throw new KafkaException(e);
>     }
> }{code}
> it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available to attempt re-use.
>  
> i believe it is possible to actually get both tthose buffers from the supplier, and return them when iteration over the record batch is done. 
> doing so will require subclassing  BufferedInputStream and GZIPInputStream (or its parent class) to allow supplying external buffers onto them. also some lifecycle hook would be needed to return said buffers to the pool when iteration is done.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)