You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/20 14:17:53 UTC

[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1082588469


##########
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##########
@@ -62,10 +68,11 @@ public void release(ByteBuffer buffer) {
                 }
             };
 
-            // Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
-            // in cases where the caller reads a small number of bytes (potentially a single byte).
-            return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer),
-                bufferPool), 16 * 1024);
+            // We do not use an intermediate buffer to store the decompressed data as a result of JNI read() calls using
+            // `ZstdInputStreamNoFinalizer` here. Every read() call to `DataInputStream` will be a JNI call and the
+            // caller is expected to balance the tradeoff between reading large amount of data vs. making multiple JNI
+            // calls.
+            return new DataInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer), bufferPool));

Review Comment:
   2 questions:
   1. Why do we wrap into DataInputStream?
   2. Have as checked that there are no workloads where we end up doing too many JNI calls?



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -47,6 +47,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             return new ByteBufferInputStream(buffer);
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 2 * 1024; // 2KB

Review Comment:
   What's the meaning of this for an uncompressed stream?



##########
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##########
@@ -26,21 +26,25 @@
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 public class ZstdFactory {
+    /**
+     * Default compression level
+     */
+    private static final int DEFAULT_COMPRESSION_LEVEL = 3;

Review Comment:
   Since this is unrelated, do we have to include it as part of this PR?



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier);
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 16 * 1024; // 16KB

Review Comment:
   We decided not to get this info from the zstd library?



##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##########
@@ -273,20 +272,32 @@ public int partitionLeaderEpoch() {
     public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
         final ByteBuffer buffer = this.buffer.duplicate();
         buffer.position(RECORDS_OFFSET);
-        return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier));
+        final InputStream decompressedStream = compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+        return decompressedStream instanceof DataInputStream ? (DataInputStream) decompressedStream : new DataInputStream(decompressedStream);
     }
 
     private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) {
         final DataInputStream inputStream = recordInputStream(bufferSupplier);
 
         if (skipKeyValue) {
             // this buffer is used to skip length delimited fields like key, value, headers
-            byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
+            final ByteBuffer skipBuffer = bufferSupplier.get(compressionType().getRecommendedDOutSize());

Review Comment:
   I thought we wanted to call the underlying skipBytes API versus doing the skipping by reading into a skip buffer. I don't see that change. What am I missing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org