You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Divij Vaidya (Jira)" <ji...@apache.org> on 2023/06/05 10:43:00 UTC

[jira] [Updated] (KAFKA-15057) Use new interface ZstdBufferDecompressingStreamNoFinalizer from zstd-jni

     [ https://issues.apache.org/jira/browse/KAFKA-15057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Divij Vaidya updated KAFKA-15057:
---------------------------------
    Description: 
h1. Background

In Kafka's code, every batch of records is stored in a in-memory byte buffer. For compressed workload, this buffer contains data in compressed form. Before writing it to the log, Kafka performs some validations such as ensuring that offsets are monotonically increasing etc. To perform this validation, Kafka needs to uncompress the data stored in byte buffer.

For zstd compressed batches, Kafka uses ZstdInputStreamNoFinalizer interface provided by the downstream zstd-jni library to perform decompression. 

ZstdInputStreamNoFinalizer takes input an InputStream and provides output an InputStream. Since, Kafka stores the entire batch in a ByteBuffer, Kafka wraps the ByteBuffer into an InputStream to satisfy the input contract for ZstdInputStreamNoFinalizer.
h1. Problem

ZstdInputStreamNoFinalizer is not a good fit for our use case because we already have the entire compressed data stored in a buffer. We don't have a need for an interface which takes InputStream as an input. Our requirement is for an interface which takes a ByteBuffer as an input and provides a stream of uncompressed data as output. Prior to zstd-jni 1.5.5, no such interface existed. Hence, we were forced to use ZstdInputStreamNoFinalizer.

Usage of ZstdInputStreamNoFinalizer has the following problems:
1. When decompression of batch is complete, we try to read another byte to check if the actual batch size if equal to declared batch size. This is done at RecordIterator#next(). This extra call to read another byte leads to a JNI call in existing interface.
2. Since this interface requires input as a InputStream, we take the ByteBuffer containing compressed batch and convert it into a InputStream. This interface internally uses an intermediate buffer to read data from this InputStream in chunks. The chunk size is determined by underlying zstd library and hence, we will allocate a new buffer with very batch. This leads to the following transformation: ByteBuffer (compressed batch) -> InputStream (compressed batch) -> data copy to intermediate ByteBuffer (chunk of compressed batch) -> send chunk to zstd library for decompression -> refill the intermediate buffer by copying the data to intermediate ByteBuffer (next chunk of compressed batch)
h1. Solution

I have extended an an interface in downstream library zstd-jni to suit the use case of Kafka. The new interface is called ZstdBufferDecompressingStreamNoFinalizer. It provides an interface where it takes input as a ByteBuffer containing compressed data and provides output as an InputStream. It solves the above problems as follows:
1. When we read the final decompressed frame, this interface sets a flag to mark that all uncompressed data has been consumed. When RecordIterator#next() tries to determine if the stream has ended, we simply read the flag and hence, do not have to make a JNI call.
2. It does not require any buffer allocation for input. It takes the input buffer and passes it across the JNI boundary without any intermediate copying. Hence, we don't perform any buffer allocation.
h1. References
h2. Changes in downstream zstd-jni

Add new interface - 
[https://github.com/luben/zstd-jni/commit/d65490e8b8aadc4b59545755e55f7dd368fe8aa5]

Bug fixes in new interface - 
[https://github.com/luben/zstd-jni/commit/8bf8066438785ce55b62fc7e6816faafe1e3b39e] 
[https://github.com/luben/zstd-jni/commit/100c434dfcec17a865ca2c2b844afe1046ce1b10]
[https://github.com/luben/zstd-jni/commit/355b8511a2967d097a619047a579930cac2ccd9d] 

  was:
h1. Background

In Kafka's code, every batch of records is stored in a in-memory byte buffer. For compressed workload, this buffer contains data in compressed form. Before writing it to the log, Kafka performs some validations such as ensuring that offsets are monotonically increasing etc. To perform this validation, Kafka needs to uncompress the data stored in byte buffer.

For zstd compressed batches, Kafka uses ZstdInputStreamNoFinalizer interface provided by the downstream zstd-jni library to perform decompression. 

ZstdInputStreamNoFinalizer takes input an InputStream and provides output an InputStream. Since, Kafka stores the entire batch in a ByteBuffer, Kafka wraps the ByteBuffer into an InputStream to satisfy the input contract for ZstdInputStreamNoFinalizer.
h1. Problem

ZstdInputStreamNoFinalizer is not a good fit for our use case because we already have the entire compressed data stored in a buffer. We don't have a need for an interface which takes InputStream as an input. Our requirement is for an interface which takes a ByteBuffer as an input and provides a stream of uncompressed data as output. Prior to zstd-jni 1.5.5, no such interface existed. Hence, we were forced to use ZstdInputStreamNoFinalizer.

Usage of ZstdInputStreamNoFinalizer has the following problems:
1. When decompression of batch is complete, we try to read another byte to check if the actual batch size if equal to declared batch size. This is done at RecordIterator#next(). This extra call to read another byte leads to a JNI call in existing interface.
2. Since this interface requires input as a InputStream, we take the ByteBuffer containing compressed batch and convert it into a InputStream. This interface internally uses an intermediate buffer to read data from this InputStream in chunks. The chunk size is determined by underlying zstd library and hence, we will allocate a new buffer with very batch.
h1. Solution

I have extended an an interface in downstream library zstd-jni to suit the use case of Kafka. The new interface is called ZstdBufferDecompressingStreamNoFinalizer. It provides an interface where it takes input as a ByteBuffer containing compressed data and provides output as an InputStream. It solves the above problems as follows:
1. When we read the final decompressed frame, this interface sets a flag to mark that all uncompressed data has been consumed. When RecordIterator#next() tries to determine if the stream has ended, we simply read the flag and hence, do not have to make a JNI call.
2. It does not require any buffer allocation for input. It takes the input buffer and passes it across the JNI boundary without any intermediate copying. Hence, we don't perform any buffer allocation.
h1. References
h2. Changes in downstream zstd-jni

Add new interface - 
[https://github.com/luben/zstd-jni/commit/d65490e8b8aadc4b59545755e55f7dd368fe8aa5]

Bug fixes in new interface - 
[https://github.com/luben/zstd-jni/commit/8bf8066438785ce55b62fc7e6816faafe1e3b39e] 
[https://github.com/luben/zstd-jni/commit/100c434dfcec17a865ca2c2b844afe1046ce1b10]
[https://github.com/luben/zstd-jni/commit/355b8511a2967d097a619047a579930cac2ccd9d] 


> Use new interface ZstdBufferDecompressingStreamNoFinalizer from zstd-jni
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-15057
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15057
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 3.6.0
>            Reporter: Divij Vaidya
>            Assignee: Divij Vaidya
>            Priority: Major
>             Fix For: 3.6.0
>
>
> h1. Background
> In Kafka's code, every batch of records is stored in a in-memory byte buffer. For compressed workload, this buffer contains data in compressed form. Before writing it to the log, Kafka performs some validations such as ensuring that offsets are monotonically increasing etc. To perform this validation, Kafka needs to uncompress the data stored in byte buffer.
> For zstd compressed batches, Kafka uses ZstdInputStreamNoFinalizer interface provided by the downstream zstd-jni library to perform decompression. 
> ZstdInputStreamNoFinalizer takes input an InputStream and provides output an InputStream. Since, Kafka stores the entire batch in a ByteBuffer, Kafka wraps the ByteBuffer into an InputStream to satisfy the input contract for ZstdInputStreamNoFinalizer.
> h1. Problem
> ZstdInputStreamNoFinalizer is not a good fit for our use case because we already have the entire compressed data stored in a buffer. We don't have a need for an interface which takes InputStream as an input. Our requirement is for an interface which takes a ByteBuffer as an input and provides a stream of uncompressed data as output. Prior to zstd-jni 1.5.5, no such interface existed. Hence, we were forced to use ZstdInputStreamNoFinalizer.
> Usage of ZstdInputStreamNoFinalizer has the following problems:
> 1. When decompression of batch is complete, we try to read another byte to check if the actual batch size if equal to declared batch size. This is done at RecordIterator#next(). This extra call to read another byte leads to a JNI call in existing interface.
> 2. Since this interface requires input as a InputStream, we take the ByteBuffer containing compressed batch and convert it into a InputStream. This interface internally uses an intermediate buffer to read data from this InputStream in chunks. The chunk size is determined by underlying zstd library and hence, we will allocate a new buffer with very batch. This leads to the following transformation: ByteBuffer (compressed batch) -> InputStream (compressed batch) -> data copy to intermediate ByteBuffer (chunk of compressed batch) -> send chunk to zstd library for decompression -> refill the intermediate buffer by copying the data to intermediate ByteBuffer (next chunk of compressed batch)
> h1. Solution
> I have extended an an interface in downstream library zstd-jni to suit the use case of Kafka. The new interface is called ZstdBufferDecompressingStreamNoFinalizer. It provides an interface where it takes input as a ByteBuffer containing compressed data and provides output as an InputStream. It solves the above problems as follows:
> 1. When we read the final decompressed frame, this interface sets a flag to mark that all uncompressed data has been consumed. When RecordIterator#next() tries to determine if the stream has ended, we simply read the flag and hence, do not have to make a JNI call.
> 2. It does not require any buffer allocation for input. It takes the input buffer and passes it across the JNI boundary without any intermediate copying. Hence, we don't perform any buffer allocation.
> h1. References
> h2. Changes in downstream zstd-jni
> Add new interface - 
> [https://github.com/luben/zstd-jni/commit/d65490e8b8aadc4b59545755e55f7dd368fe8aa5]
> Bug fixes in new interface - 
> [https://github.com/luben/zstd-jni/commit/8bf8066438785ce55b62fc7e6816faafe1e3b39e] 
> [https://github.com/luben/zstd-jni/commit/100c434dfcec17a865ca2c2b844afe1046ce1b10]
> [https://github.com/luben/zstd-jni/commit/355b8511a2967d097a619047a579930cac2ccd9d] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)