You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/20 09:43:49 UTC

[GitHub] [kafka] divijvaidya opened a new pull request, #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

divijvaidya opened a new pull request, #13135:
URL: https://github.com/apache/kafka/pull/13135

   This covers two JIRAs https://issues.apache.org/jira/browse/KAFKA-14632 and https://issues.apache.org/jira/browse/KAFKA-14633 
   
   ## Background 
   ![Screenshot 2023-01-19 at 18 27 45](https://user-images.githubusercontent.com/71267/213521204-bb3228ed-7d21-4e07-a520-697ea6fcc0ed.png)
   Currently, we use 2 intermediate buffers while handling decompressed data (one of size 2KB and another of size 16KB). These buffers are (de)allocated once per batch. 
   
   The impact of this was observed in a flamegraph analysis for a compressed workload where we observed that 75% of CPU during `appendAsLeader()` is taken up by `ValidateMessagesAndAssignOffsets`.
   
   ![Screenshot 2023-01-20 at 10 41 08](https://user-images.githubusercontent.com/71267/213664252-389eaf3d-b8aa-465b-b010-db1024663d6f.png)
   
   
   ## Change
   With this PR:
   1. we are removing the number of intermediate buffers from 2 to 1. This reduces 1 point of data copy. Note that this removed data copy occurred in chunks of 2kb at a time, multiple times. This is achieved by getting rid of `BufferedInputStream` and moving to `DataInputStream`. This change has only been made for `zstd` and `gzip`.
   2. we are using thread local buffer pool for both the buffers involved in the process of decompression. This change impacts all compression types.
   3. we pushed the skipping of key/value logic to 
   
   After the change, the above buffer allocation looks as follows:
   ![Screenshot 2023-01-19 at 18 28 14](https://user-images.githubusercontent.com/71267/213525653-917ac5ee-810a-435e-bf84-c97d6b76005e.png)
   
   ## Results
   After this change, a JMH benchmark for `ValidateMessagesAndAssignOffsets` demonstrated 10-50% increased throughput across all compression types without any regression. The improvement is prominent when thread cached buffer pools are used with 1-2% regression in some limited scenarios.
   
   When buffer pools are not used (NO_CACHING in the results), we observed GZIP having 10% better performance in some cases with 1-4% regression for some other scenarios. Overall, without using the buffer pools, the upside of this code change is limited to single digit improvements in certain scenarios.
   
   
   Details results from JMH benchmark are available here: [benchmark-jira.xlsx](https://github.com/apache/kafka/files/10465049/benchmark-jira.xlsx)
   
   
   ## Testing
   - Sanity testing using the existing unit test to ensure that we don't impact correctness.
   - JMH benchmarks for all compression types to ensure that we did not regress other compression types.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1187071361


##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the following differences:
+ * - Unlike {@link java.io.BufferedInputStream#skip(long)} this class could be configured to not push skip() to
+ * input stream. We may want to avoid pushing this to input stream because it's implementation maybe inefficient,
+ * e.g. the case of ZstdInputStream which allocates a new buffer from buffer pool, per skip call.
+ * - Unlike {@link java.io.BufferedInputStream}, which allocates an intermediate buffer, this uses a buffer supplier to

Review Comment:
   Should we also mention we don't have `mark` as in `BufferedInputStream`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1083852068


##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -47,6 +47,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             return new ByteBufferInputStream(buffer);
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 2 * 1024; // 2KB

Review Comment:
   Changed to throw an UnSupportedException in the abstract class and Uncompressed case does not overload this method anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1207907884


##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java:
##########
@@ -356,164 +346,100 @@ private static DefaultRecord readFrom(ByteBuffer buffer,
                 throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
                         " bytes in record payload, but instead read " + (buffer.position() - recordStart));
 
-            return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
+            int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
+            return new DefaultRecord(totalSizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
             throw new InvalidRecordException("Found invalid record structure", e);
         }
     }
 
-    public static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                         byte[] skipArray,
+    public static PartialDefaultRecord readPartiallyFrom(InputStream input,
                                                          long baseOffset,
                                                          long baseTimestamp,
                                                          int baseSequence,
                                                          Long logAppendTime) throws IOException {
         int sizeOfBodyInBytes = ByteUtils.readVarint(input);
         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
 
-        return readPartiallyFrom(input, skipArray, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
+        return readPartiallyFrom(input, totalSizeInBytes, baseOffset, baseTimestamp,
             baseSequence, logAppendTime);
     }
 
-    private static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                          byte[] skipArray,
+    private static PartialDefaultRecord readPartiallyFrom(InputStream input,
                                                           int sizeInBytes,
-                                                          int sizeOfBodyInBytes,
                                                           long baseOffset,
                                                           long baseTimestamp,
                                                           int baseSequence,
                                                           Long logAppendTime) throws IOException {
-        ByteBuffer skipBuffer = ByteBuffer.wrap(skipArray);
-        // set its limit to 0 to indicate no bytes readable yet
-        skipBuffer.limit(0);
-
         try {
-            // reading the attributes / timestamp / offset and key-size does not require
-            // any byte array allocation and therefore we can just read them straight-forwardly
-            IntRef bytesRemaining = PrimitiveRef.ofInt(sizeOfBodyInBytes);
-
-            byte attributes = readByte(skipBuffer, input, bytesRemaining);
-            long timestampDelta = readVarLong(skipBuffer, input, bytesRemaining);
+            byte attributes = (byte) input.read();
+            long timestampDelta = ByteUtils.readVarlong(input);
             long timestamp = baseTimestamp + timestampDelta;
             if (logAppendTime != null)
                 timestamp = logAppendTime;
 
-            int offsetDelta = readVarInt(skipBuffer, input, bytesRemaining);
+            int offsetDelta = ByteUtils.readVarint(input);
             long offset = baseOffset + offsetDelta;
             int sequence = baseSequence >= 0 ?
                 DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) :
                 RecordBatch.NO_SEQUENCE;
 
-            // first skip key
-            int keySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+            // skip key
+            int keySize = ByteUtils.readVarint(input);
+            skipBytes(input, keySize);
 
-            // then skip value
-            int valueSize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+            // skip value
+            int valueSize = ByteUtils.readVarint(input);
+            skipBytes(input, valueSize);
 
-            // then skip header
-            int numHeaders = readVarInt(skipBuffer, input, bytesRemaining);
+            // skip header
+            int numHeaders = ByteUtils.readVarint(input);
             if (numHeaders < 0)
                 throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
             for (int i = 0; i < numHeaders; i++) {
-                int headerKeySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerKeySize = ByteUtils.readVarint(input);
                 if (headerKeySize < 0)
                     throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
+                skipBytes(input, headerKeySize);
 
                 // headerValueSize
-                skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerValueSize = ByteUtils.readVarint(input);
+                skipBytes(input, headerValueSize);
             }
 
-            if (bytesRemaining.value > 0 || skipBuffer.remaining() > 0)
-                throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
-                    " bytes in record payload, but there are still bytes remaining");
-
             return new PartialDefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, keySize, valueSize);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
             throw new InvalidRecordException("Found invalid record structure", e);
         }
     }
 
-    private static byte readByte(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 1 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return buffer.get();
-    }
-
-    private static long readVarLong(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 10 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return ByteUtils.readVarlong(buffer);
-    }
-
-    private static int readVarInt(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return ByteUtils.readVarint(buffer);
-    }
-
-    private static int skipLengthDelimitedField(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        boolean needMore = false;
-        int sizeInBytes = -1;
-        int bytesToSkip = -1;
-
-        while (true) {
-            if (needMore) {
-                readMore(buffer, input, bytesRemaining);
-                needMore = false;
-            }
-
-            if (bytesToSkip < 0) {
-                if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
-                    needMore = true;
-                } else {
-                    sizeInBytes = ByteUtils.readVarint(buffer);
-                    if (sizeInBytes <= 0)
-                        return sizeInBytes;
-                    else
-                        bytesToSkip = sizeInBytes;
 
+    /**
+     * Skips n bytes from the data input.
+     *
+     * No-op for case where bytesToSkip <= 0. This could occur for cases where field is expected to be null.
+     * @throws  InvalidRecordException if the number of bytes could not be skipped.
+     */
+    private static void skipBytes(InputStream in, int bytesToSkip) throws IOException {
+        if (bytesToSkip <= 0) return;
+
+        // Starting JDK 12, this implementation could be replaced by InputStream#skipNBytes
+        while (bytesToSkip > 0) {
+            long ns = in.skip(bytesToSkip);
+            if (ns > 0 && ns <= bytesToSkip) {
+                // adjust number to skip
+                bytesToSkip -= ns;
+            } else if (ns == 0) { // no bytes skipped
+                // read one byte to check for EOS
+                if (in.read() == -1) {

Review Comment:
   Make sense. Thanks for the explanation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1100086490


##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -108,12 +117,18 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         @Override
         public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
-                                                    messageVersion == RecordBatch.MAGIC_VALUE_V0);
+                return new ChunkedDataInputStream(
+                    new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier, messageVersion == RecordBatch.MAGIC_VALUE_V0),
+                    decompressionBufferSupplier, getRecommendedDOutSize());
             } catch (Throwable e) {
                 throw new KafkaException(e);
             }
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 2 * 1024; // 2KB

Review Comment:
   Note to reviewer:
   
   The size of 2KB is based on the size of `skipArray` which has now been removed in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1082366873


##########
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##########
@@ -26,21 +26,25 @@
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 public class ZstdFactory {
+    /**
+     * Default compression level
+     */
+    private static final int DEFAULT_COMPRESSION_LEVEL = 3;

Review Comment:
   FYI reviewer
   
   This change is a no-op since we are already using the default value of 3 when no value is provided. This change has been made to make it explicit in Kafka code that we are using compression level of 3 with zstd.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1200356934


##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the following differences:
+ * - Unlike {@link java.io.BufferedInputStream#skip(long)} this class could be configured to not push skip() to
+ * input stream. We may want to avoid pushing this to input stream because it's implementation maybe inefficient,
+ * e.g. the case of ZstdInputStream which allocates a new buffer from buffer pool, per skip call.
+ * - Unlike {@link java.io.BufferedInputStream}, which allocates an intermediate buffer, this uses a buffer supplier to

Review Comment:
   It is implicit when markSupported() method returns false for this implementation but yes, I added a JavaDoc here as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1083847838


##########
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##########
@@ -26,21 +26,25 @@
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 public class ZstdFactory {
+    /**
+     * Default compression level
+     */
+    private static final int DEFAULT_COMPRESSION_LEVEL = 3;

Review Comment:
   Removed this change from this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1083831592


##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##########
@@ -273,20 +272,32 @@ public int partitionLeaderEpoch() {
     public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
         final ByteBuffer buffer = this.buffer.duplicate();
         buffer.position(RECORDS_OFFSET);
-        return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier));
+        final InputStream decompressedStream = compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+        return decompressedStream instanceof DataInputStream ? (DataInputStream) decompressedStream : new DataInputStream(decompressedStream);
     }
 
     private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) {
         final DataInputStream inputStream = recordInputStream(bufferSupplier);
 
         if (skipKeyValue) {
             // this buffer is used to skip length delimited fields like key, value, headers
-            byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
+            final ByteBuffer skipBuffer = bufferSupplier.get(compressionType().getRecommendedDOutSize());

Review Comment:
   I have mistakenly added that I have changed it to the description which I will fix.
   
   I would like to make this (skipping) a separate code change because I think that it is a performance regression since it has an extra buffer allocation.
   
   This is because zstd-jni implementation of skip() (de)allocates a "skip" buffer (from the buffer pool) for skipping [1]. Alternatively, in current implementation, we read all data in the same 16KB output buffer (which is allocated only once). In both cases, the amount of data copy from native to Java is same. The only difference is whether we read & skip in our code or we read & skip in zstd-jni code. Pushing skip to zstd-jni would be beneficial when it further pushed it down to native layer.
   
   [1] https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/ZstdInputStreamNoFinalizer.java#L228 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1084344515


##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##########
@@ -273,20 +272,32 @@ public int partitionLeaderEpoch() {
     public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
         final ByteBuffer buffer = this.buffer.duplicate();
         buffer.position(RECORDS_OFFSET);
-        return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier));
+        final InputStream decompressedStream = compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+        return decompressedStream instanceof DataInputStream ? (DataInputStream) decompressedStream : new DataInputStream(decompressedStream);
     }
 
     private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) {
         final DataInputStream inputStream = recordInputStream(bufferSupplier);
 
         if (skipKeyValue) {
             // this buffer is used to skip length delimited fields like key, value, headers
-            byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
+            final ByteBuffer skipBuffer = bufferSupplier.get(compressionType().getRecommendedDOutSize());

Review Comment:
   Since we cache buffers per thread, I think you mean we will use two buffers instead of one per thread (for the zstd case).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1165608795


##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -64,17 +66,20 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         }
 
         @Override
-        public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
+        public BytesStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                // Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to
-                // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small
-                // number of bytes (potentially a single byte)
-                return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024),
-                        16 * 1024);
+                // Set output buffer (uncompressed) to 16 KB and Set input buffer (compressed) to 8 KB (0.5 KB by default) to ensure reasonable performance in cases

Review Comment:
   Moved the comment to getRecommendedDOutSize



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1172889948


##########
clients/src/main/java/org/apache/kafka/common/utils/SkippableChunkedBytesStream.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * SkippableChunkedBytesStream is a variant of ChunkedBytesStream which does not push skip() to the sourceStream.
+ * <p>
+ * Unlike BufferedInputStream.skip() and ChunkedBytesStream.skip(), this does not push skip() to sourceStream.
+ * We want to avoid pushing this to sourceStream because it's implementation maybe inefficient, e.g. the case of Z
+ * stdInputStream which allocates a new buffer from buffer pool, per skip call.
+ *
+ * @see ChunkedBytesStream
+ */
+public class SkippableChunkedBytesStream extends ChunkedBytesStream {

Review Comment:
   1. done. I removed this SkippableChunkedBytesStream and control this behaviour in ChunkedBytesStream using a flag.
   
   2. No. The change I made was to introduce a new Stream which doesn't require us to create a buffer when passing compressed data to Zstd. The new stream was added in [ZstdBufferDecompressingStreamNoFinalizer](https://github.com/luben/zstd-jni/pull/244)  but it's usage in Zstd is pending completion of https://github.com/luben/zstd-jni/issues/252 which I plan to pick up next week. The migration to using this new Stream will be done in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1082622801


##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier);
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 16 * 1024; // 16KB

Review Comment:
   The PR says:
   
   > we pushed the skipping of key/value logic to zstd-jni implementation instead of using the one provided by BufferedInputStream



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1084097512


##########
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##########
@@ -62,10 +68,11 @@ public void release(ByteBuffer buffer) {
                 }
             };
 
-            // Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
-            // in cases where the caller reads a small number of bytes (potentially a single byte).
-            return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer),
-                bufferPool), 16 * 1024);
+            // We do not use an intermediate buffer to store the decompressed data as a result of JNI read() calls using
+            // `ZstdInputStreamNoFinalizer` here. Every read() call to `DataInputStream` will be a JNI call and the
+            // caller is expected to balance the tradeoff between reading large amount of data vs. making multiple JNI
+            // calls.
+            return new DataInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer), bufferPool));

Review Comment:
   2. For broker, the number of JNI calls remain same because prior to this change, we were making JNI calls in chunks of 16KB (using BufferedInputStream) and now we are making JNI calls in chunks of 16KB based on decompression buffer size.
   
   For consumer, the number of JNI calls *will change*. Earlier, consumer was making multiple calls in chunks of 16KB (using BufferedInputStream) and now it is making one call to read the entire data. Note that consumer does not use "skipKeyValueIterator" variation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1521852820

   This is ready for review. A summary of the changes is provided below.
   
   **On the server:**
   1. This PR starts using buffer pools to allocate intermediate buffer which is used by the stream that converts compressed to uncompressed data. This is achieved by using new `ChunkedBytesStream` instead of `BufferedInputStream` for ZSTD & GZIP. For LZ4 and SNAPPY, which weren't using `BufferedInputStream`, this is a no op (see point for 2 for changes to them). The impact of allocation on Zstd can be observed from the [before](https://issues.apache.org/jira/secure/attachment/13057480/flamegraph-trunk-heapalloc-before.html) & [after](https://issues.apache.org/jira/secure/attachment/13057479/flamegraph-pr-heapalloc-after.html) object allocation flamegraph linked to the [JIRA](https://issues.apache.org/jira/browse/KAFKA-14633). Please observe how in the *after* flamegraph, the contribution of allocation by `validateMessagesAndAssignOffsets` and decreased drastically from 39% to 5%.
   2. This PR reduces the number of buffer pools used during decompression from 2 to 1. Earlier we created a "skip buffer" of size 2KB for ALL compression algorithms and another intermediate buffer created by `BufferedInputStream` for some of the compression algorithms (ZSTD & GZIP). This PR uses the same intermediate buffer for ZSTD & GZIP, hence reducing the number of allocations to 1 (instead of 2). For LZ4 and SNAPPY, the number of allocations remain same but the 2KB skip buffer is allocated from the buffer pool now.
   3. The skip implementation for some compression algorithms allocated new buffers. As an example, skip implementation of ZSTD-JNI allocates new buffer of different size (from buffer pool) on every skip invocation. This PR uses the intermediate buffer to perform skip instead of pushing it to down to ZSTD-JNI. 
   
   The impact of the above two changes on throughput is observed by `RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize`. You will notice 20-70% improvement there. (see attached benchmark sheet in description)
   
   **On the consumer:**
   The change 1 remains same for consumer and changes 2 & 3 does not impact consumer since it doesn't use a "skip" iterator.
   
   The impact of the above two changes on consumer  throughput is observed by `RecordBatchIterationBenchmark. measureStreamingIteratorForVariableBatchSize` (note that this a different benchmark that was specified for server, this one doesn't use skipIterator). You will notice mix bag of single digit regression for some compression type to 10-50% improvement for Zstd. The reason that we don't see equivalent gains in consumer is because it copies all uncompressed data in a single buffer and then reads off it. We have not reduced any buffer allocation for consumer scenario(since change 2 & 3 aren't applicable to consumers). There are other optimizations that we can perform for consumer listed below but they are out of scope for this PR.
   
   **Future optimisations (out of scope of this PR)**
   1. For non-skip iterators (used by consumers), we currently allocate intermediate buffer for decompression and then allocate another buffer for storing key & value. The flow looks like: uncompressed data => intermediate buffer => inputStream => recordByteBuffer. This can be improved to uncompressed data => recordByteBuffer, and hence, we would allocate only 1 byte buffer.
   2. We need to revisit whether we require a skipBuffer for LZ4 and SNAPPY. In the current PR, we wanted to maintain parity with legacy implementation, hence a 2KB intermediate buffer in ChunkedBytesStream is used for them but it could potentially be removed.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1162924450


##########
clients/src/main/java/org/apache/kafka/common/utils/BytesStreamBufferSource.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides a BytesStream wrapper over a buffer.
+ */
+public class BytesStreamBufferSource implements BytesStream {

Review Comment:
   Yep. That sounds like a better name. Changed as per your suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1172883027


##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier);
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 16 * 1024; // 16KB

Review Comment:
   It was 16KB itself (+2KB for skipBuffer which we removed). https://github.com/apache/kafka/blob/ef09a2e3fc11a738f6681fd57fb84ad109593fd3/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java#L68



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1200434460


##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java:
##########
@@ -356,164 +346,100 @@ private static DefaultRecord readFrom(ByteBuffer buffer,
                 throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
                         " bytes in record payload, but instead read " + (buffer.position() - recordStart));
 
-            return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
+            int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
+            return new DefaultRecord(totalSizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
             throw new InvalidRecordException("Found invalid record structure", e);
         }
     }
 
-    public static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                         byte[] skipArray,
+    public static PartialDefaultRecord readPartiallyFrom(InputStream input,
                                                          long baseOffset,
                                                          long baseTimestamp,
                                                          int baseSequence,
                                                          Long logAppendTime) throws IOException {
         int sizeOfBodyInBytes = ByteUtils.readVarint(input);
         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
 
-        return readPartiallyFrom(input, skipArray, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
+        return readPartiallyFrom(input, totalSizeInBytes, baseOffset, baseTimestamp,
             baseSequence, logAppendTime);
     }
 
-    private static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                          byte[] skipArray,
+    private static PartialDefaultRecord readPartiallyFrom(InputStream input,
                                                           int sizeInBytes,
-                                                          int sizeOfBodyInBytes,
                                                           long baseOffset,
                                                           long baseTimestamp,
                                                           int baseSequence,
                                                           Long logAppendTime) throws IOException {
-        ByteBuffer skipBuffer = ByteBuffer.wrap(skipArray);
-        // set its limit to 0 to indicate no bytes readable yet
-        skipBuffer.limit(0);
-
         try {
-            // reading the attributes / timestamp / offset and key-size does not require
-            // any byte array allocation and therefore we can just read them straight-forwardly
-            IntRef bytesRemaining = PrimitiveRef.ofInt(sizeOfBodyInBytes);
-
-            byte attributes = readByte(skipBuffer, input, bytesRemaining);
-            long timestampDelta = readVarLong(skipBuffer, input, bytesRemaining);
+            byte attributes = (byte) input.read();
+            long timestampDelta = ByteUtils.readVarlong(input);
             long timestamp = baseTimestamp + timestampDelta;
             if (logAppendTime != null)
                 timestamp = logAppendTime;
 
-            int offsetDelta = readVarInt(skipBuffer, input, bytesRemaining);
+            int offsetDelta = ByteUtils.readVarint(input);
             long offset = baseOffset + offsetDelta;
             int sequence = baseSequence >= 0 ?
                 DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) :
                 RecordBatch.NO_SEQUENCE;
 
-            // first skip key
-            int keySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+            // skip key
+            int keySize = ByteUtils.readVarint(input);
+            skipBytes(input, keySize);
 
-            // then skip value
-            int valueSize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+            // skip value
+            int valueSize = ByteUtils.readVarint(input);
+            skipBytes(input, valueSize);
 
-            // then skip header
-            int numHeaders = readVarInt(skipBuffer, input, bytesRemaining);
+            // skip header
+            int numHeaders = ByteUtils.readVarint(input);
             if (numHeaders < 0)
                 throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
             for (int i = 0; i < numHeaders; i++) {
-                int headerKeySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerKeySize = ByteUtils.readVarint(input);
                 if (headerKeySize < 0)
                     throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
+                skipBytes(input, headerKeySize);
 
                 // headerValueSize
-                skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerValueSize = ByteUtils.readVarint(input);
+                skipBytes(input, headerValueSize);
             }
 
-            if (bytesRemaining.value > 0 || skipBuffer.remaining() > 0)
-                throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
-                    " bytes in record payload, but there are still bytes remaining");
-
             return new PartialDefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, keySize, valueSize);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
             throw new InvalidRecordException("Found invalid record structure", e);
         }
     }
 
-    private static byte readByte(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 1 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return buffer.get();
-    }
-
-    private static long readVarLong(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 10 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return ByteUtils.readVarlong(buffer);
-    }
-
-    private static int readVarInt(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return ByteUtils.readVarint(buffer);
-    }
-
-    private static int skipLengthDelimitedField(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        boolean needMore = false;
-        int sizeInBytes = -1;
-        int bytesToSkip = -1;
-
-        while (true) {
-            if (needMore) {
-                readMore(buffer, input, bytesRemaining);
-                needMore = false;
-            }
-
-            if (bytesToSkip < 0) {
-                if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
-                    needMore = true;
-                } else {
-                    sizeInBytes = ByteUtils.readVarint(buffer);
-                    if (sizeInBytes <= 0)
-                        return sizeInBytes;
-                    else
-                        bytesToSkip = sizeInBytes;
 
+    /**
+     * Skips n bytes from the data input.
+     *
+     * No-op for case where bytesToSkip <= 0. This could occur for cases where field is expected to be null.
+     * @throws  InvalidRecordException if the number of bytes could not be skipped.
+     */
+    private static void skipBytes(InputStream in, int bytesToSkip) throws IOException {
+        if (bytesToSkip <= 0) return;
+
+        // Starting JDK 12, this implementation could be replaced by InputStream#skipNBytes
+        while (bytesToSkip > 0) {
+            long ns = in.skip(bytesToSkip);
+            if (ns > 0 && ns <= bytesToSkip) {
+                // adjust number to skip
+                bytesToSkip -= ns;
+            } else if (ns == 0) { // no bytes skipped
+                // read one byte to check for EOS
+                if (in.read() == -1) {

Review Comment:
   First, as per the interface contract of InputStream#skip, it is possible that it returns smaller number of bytes than expected even if bytes are available to be skipped. That is why we iterate over the skip() multiple times in this loop. Hence, we keep calling skip() until we reach end of file, since any other case (0 or positive) is expected as per the contract.
   
   Also, note that the implementation of this loop is same as InputStream#skipNBytes introduced in JDK 12.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1175200643


##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the following differences:
+ * - Unlike {@link java.io.BufferedInputStream#skip(long)} this class could be configured to not push skip() to
+ * input stream. We may want to avoid pushing this to input stream because it's implementation maybe inefficient,
+ * e.g. the case of ZstdInputStream which allocates a new buffer from buffer pool, per skip call.
+ * - Unlike {@link java.io.BufferedInputStream}, which allocates an intermediate buffer, this uses a buffer supplier to
+ * create the intermediate buffer.
+ * <p>
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this.
+ * - the implementation of this class is performance sensitive. Minor changes as usage of ByteBuffer instead of byte[]

Review Comment:
   ```suggestion
    * - the implementation of this class is performance sensitive. Minor changes such as usage of ByteBuffer instead of byte[]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1162952526


##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call crosses JNI boundary.
+ * <p>
+ * The functionality of this stream is a combination of DataInput and BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()
+ * - Unlike BufferedInputStream, which allocates an intermediate buffer, this uses a buffer supplier to create the
+ * intermediate buffer
+ * - Unlike DataInputStream, the readByte method does not push the reading of a byte to sourceStream.
+ * <p>
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this.
+ * - many method are un-supported in this class because they aren't currently used in the caller code.
+ * - the implementation of this class is performance sensitive. Minor changes as usage of ByteBuffer instead of byte[]
+ *   can significantly impact performance, hence, proceed with caution.
+ */
+public class ChunkedBytesStream implements BytesStream {
+    /**
+     * Supplies the ByteBuffer which is used as intermediate buffer to store the chunk of output data.
+     */
+    private final BufferSupplier bufferSupplier;
+    /**
+     * Source stream containing compressed data.
+     */
+    private InputStream sourceStream;
+    /**
+     * Intermediate buffer to store the chunk of output data. The ChunkedBytesStream is considered closed if
+     * this buffer is null.
+     */
+    private byte[] intermediateBuf;
+    protected int limit;
+    /**
+     *
+     */
+    protected int pos;
+    /**
+     * Reference for the intermediate buffer. This reference is only kept for releasing the buffer from the '
+     * buffer supplier.
+     */
+    private ByteBuffer intermediateBufRef;
+
+
+    public ChunkedBytesStream(InputStream sourceStream, BufferSupplier bufferSupplier, int intermediateBufSize) {
+        this.bufferSupplier = bufferSupplier;
+        this.sourceStream = sourceStream;
+        intermediateBufRef = bufferSupplier.get(intermediateBufSize);
+        if (!intermediateBufRef.hasArray() || (intermediateBufRef.arrayOffset() != 0)) {
+            throw new IllegalArgumentException("provided ByteBuffer lacks array or has non-zero arrayOffset");
+        }
+        intermediateBuf = intermediateBufRef.array();
+    }
+
+    private byte[] getBufIfOpen() throws IOException {
+        byte[] buffer = intermediateBuf;
+        if (buffer == null)
+            throw new IOException("Stream closed");
+        return buffer;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (pos >= limit) {
+            fill();
+            if (pos >= limit)
+                return -1;
+        }
+
+        return getBufIfOpen()[pos++] & 0xff;
+    }
+
+    InputStream getInIfOpen() throws IOException {
+        InputStream input = sourceStream;
+        if (input == null)
+            throw new IOException("Stream closed");
+        return input;
+    }
+
+    /**
+     * Fills the intermediate buffer with more data. The amount of new data read is equal to the remaining empty space
+     * in the buffer. For optimal performance, read as much data as possible in this call.
+     */
+    int fill() throws IOException {
+        byte[] buffer = getBufIfOpen();
+
+        // switch to writing mode
+        pos = 0;
+        limit = pos;
+        int bytesRead = getInIfOpen().read(buffer, pos, buffer.length - pos);
+
+        if (bytesRead > 0)
+            limit = bytesRead + pos;
+
+        return bytesRead;
+    }
+
+    @Override
+    public void close() throws IOException {
+        byte[] mybuf = intermediateBuf;
+        intermediateBuf = null;
+
+        InputStream input = sourceStream;
+        sourceStream = null;
+
+        if (mybuf != null)
+            bufferSupplier.release(intermediateBufRef);
+        if (input != null)
+            input.close();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int totalRead = 0;
+        int bytesRead = 0;
+        while (totalRead < len) {
+            bytesRead = 0;
+            int toRead = len - totalRead;
+            if (pos >= limit) {
+                if (toRead >= getBufIfOpen().length) {
+                    // don't use intermediate buffer if we need to read more than it's capacity
+                    bytesRead = getInIfOpen().read(b, off + totalRead, toRead);
+                    if (bytesRead < 0)
+                        break;
+                } else {
+                    fill();
+                    if (pos >= limit)
+                        break;
+                }
+            } else {
+                int avail = limit - pos;
+                toRead = (avail < toRead) ? avail : toRead;
+                System.arraycopy(getBufIfOpen(), pos, b, off + totalRead, toRead);
+                pos += toRead;
+                bytesRead = toRead;
+            }
+
+            totalRead += bytesRead;
+        }
+
+        if ((bytesRead <= 0) && (totalRead < len))
+            return -1;
+
+        return totalRead;
+    }
+
+    @Override
+    public int skipBytes(int toSkip) throws IOException {
+        if (toSkip <= 0) {
+            return 0;
+        }
+        int totalSkipped = 0;
+
+        // Skip what exists in the intermediate buffer first
+        int avail = limit - pos;
+        int bytesToRead = (avail < (toSkip - totalSkipped)) ? avail : (toSkip - totalSkipped);
+        pos += bytesToRead;
+        totalSkipped += bytesToRead;
+
+        // Use sourceStream's skip() to skip the rest
+        while ((totalSkipped < toSkip) && ((bytesToRead = (int) getInIfOpen().skip(toSkip - totalSkipped)) > 0)) {
+            totalSkipped += bytesToRead;
+        }
+
+        return totalSkipped;
+    }
+
+    @Override
+    public byte readByte() throws IOException {
+        if (pos >= limit) {
+            fill();
+            if (pos >= limit)
+                throw new EOFException();
+        }
+        return getBufIfOpen()[pos++];
+    }
+
+    // visible for testing
+    public InputStream getSourceStream() {

Review Comment:
   ok. Changed in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1200351208


##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the following differences:
+ * - Unlike {@link java.io.BufferedInputStream#skip(long)} this class could be configured to not push skip() to
+ * input stream. We may want to avoid pushing this to input stream because it's implementation maybe inefficient,
+ * e.g. the case of ZstdInputStream which allocates a new buffer from buffer pool, per skip call.
+ * - Unlike {@link java.io.BufferedInputStream}, which allocates an intermediate buffer, this uses a buffer supplier to
+ * create the intermediate buffer.
+ * <p>
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this.
+ * - the implementation of this class is performance sensitive. Minor changes such as usage of ByteBuffer instead of byte[]
+ * can significantly impact performance, hence, proceed with caution.
+ */
+public class ChunkedBytesStream extends FilterInputStream {
+    /**
+     * Supplies the ByteBuffer which is used as intermediate buffer to store the chunk of output data.
+     */
+    private final BufferSupplier bufferSupplier;
+    /**
+     * Intermediate buffer to store the chunk of output data. The ChunkedBytesStream is considered closed if
+     * this buffer is null.
+     */
+    private byte[] intermediateBuf;
+    /**
+     * The index one greater than the index of the last valid byte in
+     * the buffer.
+     * This value is always in the range <code>0</code> through <code>intermediateBuf.length</code>;
+     * elements <code>intermediateBuf[0]</code>  through <code>intermediateBuf[count-1]
+     * </code>contain buffered input data obtained
+     * from the underlying  input stream.
+     */
+    protected int count = 0;
+    /**
+     * The current position in the buffer. This is the index of the next
+     * character to be read from the <code>buf</code> array.
+     * <p>
+     * This value is always in the range <code>0</code>
+     * through <code>count</code>. If it is less
+     * than <code>count</code>, then  <code>intermediateBuf[pos]</code>
+     * is the next byte to be supplied as input;
+     * if it is equal to <code>count</code>, then
+     * the  next <code>read</code> or <code>skip</code>
+     * operation will require more bytes to be
+     * read from the contained  input stream.
+     */
+    protected int pos = 0;
+    /**
+     * Reference for the intermediate buffer. This reference is only kept for releasing the buffer from the
+     * buffer supplier.
+     */
+    private final ByteBuffer intermediateBufRef;
+    /**
+     * Determines if the skip be pushed down
+     */
+    private final boolean pushSkipToSourceStream;
+
+    public ChunkedBytesStream(InputStream in, BufferSupplier bufferSupplier, int intermediateBufSize, boolean pushSkipToSourceStream) {
+        super(in);
+        this.bufferSupplier = bufferSupplier;
+        intermediateBufRef = bufferSupplier.get(intermediateBufSize);
+        if (!intermediateBufRef.hasArray() || (intermediateBufRef.arrayOffset() != 0)) {
+            throw new IllegalArgumentException("provided ByteBuffer lacks array or has non-zero arrayOffset");
+        }
+        intermediateBuf = intermediateBufRef.array();
+        this.pushSkipToSourceStream = pushSkipToSourceStream;
+    }
+
+    /**
+     * Check to make sure that buffer has not been nulled out due to
+     * close; if not return it;
+     */
+    private byte[] getBufIfOpen() throws IOException {
+        byte[] buffer = intermediateBuf;
+        if (buffer == null)
+            throw new IOException("Stream closed");
+        return buffer;
+    }
+
+    /**
+     * See

Review Comment:
   Indentation problem. Fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1398459548

   One more thing: when it comes to the testing, can we include the case where the batches have a single 10 byte message?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1084344515


##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##########
@@ -273,20 +272,32 @@ public int partitionLeaderEpoch() {
     public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
         final ByteBuffer buffer = this.buffer.duplicate();
         buffer.position(RECORDS_OFFSET);
-        return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier));
+        final InputStream decompressedStream = compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+        return decompressedStream instanceof DataInputStream ? (DataInputStream) decompressedStream : new DataInputStream(decompressedStream);
     }
 
     private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) {
         final DataInputStream inputStream = recordInputStream(bufferSupplier);
 
         if (skipKeyValue) {
             // this buffer is used to skip length delimited fields like key, value, headers
-            byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
+            final ByteBuffer skipBuffer = bufferSupplier.get(compressionType().getRecommendedDOutSize());

Review Comment:
   Since we cache buffers per thread, I think you mean we will use two buffers instead of one per thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1165559960


##########
clients/src/main/java/org/apache/kafka/common/utils/BytesStreamBufferSource.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides a BytesStream wrapper over a buffer.
+ */
+public class BytesStreamBufferSource implements BytesStream {
+    final private ByteBuffer buf;
+
+    public BytesStreamBufferSource(final ByteBuffer buffer) {
+        // we do not modify the markers of source buffer
+        buf = buffer.duplicate();
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+        return buf.get() & 0xFF;
+    }
+
+    @Override
+    public int skipBytes(int toSkip) {
+        if (toSkip <= 0) {
+            return 0;
+        }
+
+        int avail = Math.min(toSkip, buf.remaining());
+        buf.position(buf.position() + avail);
+        return avail;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+
+        len = Math.min(len, buf.remaining());
+        buf.get(b, off, len);
+        return len;
+    }
+
+    @Override
+    public byte readByte() throws IOException {
+        try {
+            return buf.get();

Review Comment:
   sure. Modified as per your suggestion.



##########
clients/src/main/java/org/apache/kafka/common/utils/BytesStream.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * This interface provides for reading bytes from an underlying source. The source could be a buffer or a stream.
+ * It extends the {@link Closeable} interface to ensure that the source is appropriately closed (if required).
+ */
+public interface BytesStream extends Closeable {
+    /**
+     * The interface is based on {@link InputStream#read()} and follows the same contract.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1497651824

   Thank you @ijuma for your review. I will address the comments in next one week since I am on vacation for next few days.
   
   > Was this done?
   
   Yes, consumer performance impact was benchmarked using `RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize` (the non-skip API used by the clients) and the results were added to the Results section in the Summary at the top.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1100085805


##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -108,12 +117,18 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         @Override
         public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
-                                                    messageVersion == RecordBatch.MAGIC_VALUE_V0);
+                return new ChunkedDataInputStream(

Review Comment:
   Note to reviewer:
   
   We use `ChunkedDataInputStream` here instead of `SkippableChunkedDataInputStream` because we wish to push down the `skip()` implementation to KafkaLZ4BlockInputStream since it is optimized (unlike `ZstdInputStream#skip()` where we have to use our custom implementation of skip in `SkippableChunkedDataInputStream`)



##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##########
@@ -270,30 +270,28 @@ public int partitionLeaderEpoch() {
         return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET);
     }
 
-    public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
+    public InputStream recordInputStream(BufferSupplier bufferSupplier) {
         final ByteBuffer buffer = this.buffer.duplicate();
         buffer.position(RECORDS_OFFSET);
-        return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier));
+        InputStream is = compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+        return (is instanceof DataInput) ? is : new DataInputStream(is);
     }
 
     private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) {
-        final DataInputStream inputStream = recordInputStream(bufferSupplier);
+        final InputStream inputStream = recordInputStream(bufferSupplier);
 
         if (skipKeyValue) {
-            // this buffer is used to skip length delimited fields like key, value, headers
-            byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];

Review Comment:
   Note to reviewers
   
   Removal of this intermediate buffer affects all compression types. For Zstd and the ones which were already using an intermediate buffer, this is an optimisation since we reduce buffer allocation and data copy. For the ones which weren't using an intermediate buffer, we started using intermediate buffer of size 2KB (similar to this) using ChunkedDataInputStream



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java:
##########
@@ -103,6 +104,12 @@ public void init() {
         }
     }
 
+    @TearDown
+    public void cleanUp() {
+        if (requestLocal != null)
+            requestLocal.close();

Review Comment:
   Note to reviewers
   
   This benchmark has a flaw. We use `@Setup` and `@TearDown` at the default level of `Level.Trial`, which means this method will be executed before/after each run of the benchmark. A benchmark may be executed using multiple threads and hence, it is not guaranteed that each invocation of the benchmark will use the same BufferPool.
   
   I will fix this in a separate PR. This does not have negative impact on the results, at the worst, it will downplay the affect of re-using buffers in the code.



##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java:
##########
@@ -363,157 +361,79 @@ private static DefaultRecord readFrom(ByteBuffer buffer,
     }
 
     public static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                         byte[] skipArray,
                                                          long baseOffset,
                                                          long baseTimestamp,
                                                          int baseSequence,
                                                          Long logAppendTime) throws IOException {
         int sizeOfBodyInBytes = ByteUtils.readVarint(input);
         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
 
-        return readPartiallyFrom(input, skipArray, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
+        return readPartiallyFrom(input, totalSizeInBytes, baseOffset, baseTimestamp,
             baseSequence, logAppendTime);
     }
 
     private static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                          byte[] skipArray,
                                                           int sizeInBytes,
-                                                          int sizeOfBodyInBytes,
                                                           long baseOffset,
                                                           long baseTimestamp,
                                                           int baseSequence,
                                                           Long logAppendTime) throws IOException {
-        ByteBuffer skipBuffer = ByteBuffer.wrap(skipArray);
-        // set its limit to 0 to indicate no bytes readable yet
-        skipBuffer.limit(0);
-
         try {
-            // reading the attributes / timestamp / offset and key-size does not require
-            // any byte array allocation and therefore we can just read them straight-forwardly
-            IntRef bytesRemaining = PrimitiveRef.ofInt(sizeOfBodyInBytes);
-
-            byte attributes = readByte(skipBuffer, input, bytesRemaining);
-            long timestampDelta = readVarLong(skipBuffer, input, bytesRemaining);
+            byte attributes = input.readByte();
+            long timestampDelta = ByteUtils.readVarlong(input);
             long timestamp = baseTimestamp + timestampDelta;
             if (logAppendTime != null)
                 timestamp = logAppendTime;
 
-            int offsetDelta = readVarInt(skipBuffer, input, bytesRemaining);
+            int offsetDelta = ByteUtils.readVarint(input);
             long offset = baseOffset + offsetDelta;
             int sequence = baseSequence >= 0 ?
                 DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) :
                 RecordBatch.NO_SEQUENCE;
 
-            // first skip key
-            int keySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+            // skip key
+            int keySize = ByteUtils.readVarint(input);
+            skipBytes(input, keySize);
 
-            // then skip value
-            int valueSize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+            // skip value
+            int valueSize = ByteUtils.readVarint(input);
+            skipBytes(input, valueSize);
 
-            // then skip header
-            int numHeaders = readVarInt(skipBuffer, input, bytesRemaining);
+            // skip header
+            int numHeaders = ByteUtils.readVarint(input);
             if (numHeaders < 0)
                 throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
             for (int i = 0; i < numHeaders; i++) {
-                int headerKeySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerKeySize = ByteUtils.readVarint(input);
                 if (headerKeySize < 0)
                     throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
+                skipBytes(input, headerKeySize);
 
                 // headerValueSize
-                skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerValueSize = ByteUtils.readVarint(input);
+                skipBytes(input, headerValueSize);
             }
 
-            if (bytesRemaining.value > 0 || skipBuffer.remaining() > 0)
-                throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
-                    " bytes in record payload, but there are still bytes remaining");

Review Comment:
   Note for reviewer:
   It is ok to remove this validation because we perform a similar validation at the end of every record. see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L608



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -108,12 +117,18 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         @Override
         public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
-                                                    messageVersion == RecordBatch.MAGIC_VALUE_V0);
+                return new ChunkedDataInputStream(
+                    new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier, messageVersion == RecordBatch.MAGIC_VALUE_V0),
+                    decompressionBufferSupplier, getRecommendedDOutSize());
             } catch (Throwable e) {
                 throw new KafkaException(e);
             }
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 2 * 1024; // 2KB

Review Comment:
   The size of 2KB is based on the size of `skipArray` which has now been removed in this PR.



##########
clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java:
##########
@@ -29,6 +30,11 @@ public ByteBufferInputStream(ByteBuffer buffer) {
         this.buffer = buffer;
     }
 
+    @Override
+    public int available() throws IOException {

Review Comment:
   Note to reviewer
   
   This was a bug. Prior to this, `available()` calls `InputStream` `available()` which always returns 0. This impacts the cases where we may rely on value of available() to determine end of input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1400407941

   TODO (will update PR in a short while) - 
   
   1. Add benchmark for case when batch contains single 10 byte message
   2. Test consumer performance
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1082615267


##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier);
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 16 * 1024; // 16KB

Review Comment:
   This will be picked up as a separate follow-up PR: https://issues.apache.org/jira/browse/KAFKA-14634 
   
   I decided to keep it separate is to introduce fewer changes at a time and measure their impact.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1398428117

   @ijuma please review when you get a chance since you already have context about this code change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1178242551


##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the following differences:
+ * - Unlike {@link java.io.BufferedInputStream#skip(long)} this class could be configured to not push skip() to
+ * input stream. We may want to avoid pushing this to input stream because it's implementation maybe inefficient,
+ * e.g. the case of ZstdInputStream which allocates a new buffer from buffer pool, per skip call.
+ * - Unlike {@link java.io.BufferedInputStream}, which allocates an intermediate buffer, this uses a buffer supplier to
+ * create the intermediate buffer.
+ * <p>
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this.
+ * - the implementation of this class is performance sensitive. Minor changes as usage of ByteBuffer instead of byte[]

Review Comment:
   Thank you for the suggestion. This is fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1561407539

   Failing tests are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / [1] quorum=kraft, isIdempotenceEnabled=true – kafka.api.SaslPlainSslEndToEndAuthorizationTest
   14s
   Build / JDK 8 and Scala 2.12 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest
   1m 42s
   Build / JDK 8 and Scala 2.12 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest
   1m 43s
   Build / JDK 8 and Scala 2.12 / testConnectorBoundary – org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest
   57s
   Build / JDK 8 and Scala 2.12 / shouldFollowLeaderEpochBasicWorkflow() – kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceWithIbp26Test
   12s
   Build / JDK 8 and Scala 2.12 / shouldAddNamedTopologyToRunningApplicationWithMultipleNodes() – org.apache.kafka.streams.integration.NamedTopologyIntegrationTest
   10s
   Build / JDK 8 and Scala 2.12 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1566420453

   @ijuma , I think this PR is good to be merged now. And @divijvaidya has addressed all your review comments. If you don't have other comments, I'm going to merge it within this week. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1428471245

   @ijuma I have updated this PR by extracting out an interface for the new streams. Please take a look when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1084093810


##########
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##########
@@ -62,10 +68,11 @@ public void release(ByteBuffer buffer) {
                 }
             };
 
-            // Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
-            // in cases where the caller reads a small number of bytes (potentially a single byte).
-            return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer),
-                bufferPool), 16 * 1024);
+            // We do not use an intermediate buffer to store the decompressed data as a result of JNI read() calls using
+            // `ZstdInputStreamNoFinalizer` here. Every read() call to `DataInputStream` will be a JNI call and the
+            // caller is expected to balance the tradeoff between reading large amount of data vs. making multiple JNI
+            // calls.
+            return new DataInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer), bufferPool));

Review Comment:
   1. Thanks for pointing out. This is an artifact of some other changes I was trying to do. Fix it now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1185891570


##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the following differences:
+ * - Unlike {@link java.io.BufferedInputStream#skip(long)} this class could be configured to not push skip() to
+ * input stream. We may want to avoid pushing this to input stream because it's implementation maybe inefficient,
+ * e.g. the case of ZstdInputStream which allocates a new buffer from buffer pool, per skip call.
+ * - Unlike {@link java.io.BufferedInputStream}, which allocates an intermediate buffer, this uses a buffer supplier to
+ * create the intermediate buffer.
+ * <p>
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this.
+ * - the implementation of this class is performance sensitive. Minor changes such as usage of ByteBuffer instead of byte[]
+ * can significantly impact performance, hence, proceed with caution.
+ */
+public class ChunkedBytesStream extends FilterInputStream {
+    /**
+     * Supplies the ByteBuffer which is used as intermediate buffer to store the chunk of output data.
+     */
+    private final BufferSupplier bufferSupplier;
+    /**
+     * Intermediate buffer to store the chunk of output data. The ChunkedBytesStream is considered closed if
+     * this buffer is null.
+     */
+    private byte[] intermediateBuf;
+    /**
+     * The index one greater than the index of the last valid byte in
+     * the buffer.
+     * This value is always in the range <code>0</code> through <code>intermediateBuf.length</code>;
+     * elements <code>intermediateBuf[0]</code>  through <code>intermediateBuf[count-1]
+     * </code>contain buffered input data obtained
+     * from the underlying  input stream.
+     */
+    protected int count = 0;
+    /**
+     * The current position in the buffer. This is the index of the next
+     * character to be read from the <code>buf</code> array.
+     * <p>
+     * This value is always in the range <code>0</code>
+     * through <code>count</code>. If it is less
+     * than <code>count</code>, then  <code>intermediateBuf[pos]</code>
+     * is the next byte to be supplied as input;
+     * if it is equal to <code>count</code>, then
+     * the  next <code>read</code> or <code>skip</code>
+     * operation will require more bytes to be
+     * read from the contained  input stream.
+     */
+    protected int pos = 0;
+    /**
+     * Reference for the intermediate buffer. This reference is only kept for releasing the buffer from the
+     * buffer supplier.
+     */
+    private final ByteBuffer intermediateBufRef;
+    /**
+     * Determines if the skip be pushed down
+     */
+    private final boolean pushSkipToSourceStream;
+
+    public ChunkedBytesStream(InputStream in, BufferSupplier bufferSupplier, int intermediateBufSize, boolean pushSkipToSourceStream) {
+        super(in);
+        this.bufferSupplier = bufferSupplier;
+        intermediateBufRef = bufferSupplier.get(intermediateBufSize);
+        if (!intermediateBufRef.hasArray() || (intermediateBufRef.arrayOffset() != 0)) {
+            throw new IllegalArgumentException("provided ByteBuffer lacks array or has non-zero arrayOffset");
+        }
+        intermediateBuf = intermediateBufRef.array();
+        this.pushSkipToSourceStream = pushSkipToSourceStream;
+    }
+
+    /**
+     * Check to make sure that buffer has not been nulled out due to
+     * close; if not return it;
+     */
+    private byte[] getBufIfOpen() throws IOException {
+        byte[] buffer = intermediateBuf;
+        if (buffer == null)
+            throw new IOException("Stream closed");
+        return buffer;
+    }
+
+    /**
+     * See

Review Comment:
   See ?



##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java:
##########
@@ -356,164 +346,100 @@ private static DefaultRecord readFrom(ByteBuffer buffer,
                 throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
                         " bytes in record payload, but instead read " + (buffer.position() - recordStart));
 
-            return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
+            int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
+            return new DefaultRecord(totalSizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
             throw new InvalidRecordException("Found invalid record structure", e);
         }
     }
 
-    public static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                         byte[] skipArray,
+    public static PartialDefaultRecord readPartiallyFrom(InputStream input,
                                                          long baseOffset,
                                                          long baseTimestamp,
                                                          int baseSequence,
                                                          Long logAppendTime) throws IOException {
         int sizeOfBodyInBytes = ByteUtils.readVarint(input);
         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
 
-        return readPartiallyFrom(input, skipArray, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
+        return readPartiallyFrom(input, totalSizeInBytes, baseOffset, baseTimestamp,
             baseSequence, logAppendTime);
     }
 
-    private static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                          byte[] skipArray,
+    private static PartialDefaultRecord readPartiallyFrom(InputStream input,
                                                           int sizeInBytes,
-                                                          int sizeOfBodyInBytes,
                                                           long baseOffset,
                                                           long baseTimestamp,
                                                           int baseSequence,
                                                           Long logAppendTime) throws IOException {
-        ByteBuffer skipBuffer = ByteBuffer.wrap(skipArray);
-        // set its limit to 0 to indicate no bytes readable yet
-        skipBuffer.limit(0);
-
         try {
-            // reading the attributes / timestamp / offset and key-size does not require
-            // any byte array allocation and therefore we can just read them straight-forwardly
-            IntRef bytesRemaining = PrimitiveRef.ofInt(sizeOfBodyInBytes);
-
-            byte attributes = readByte(skipBuffer, input, bytesRemaining);
-            long timestampDelta = readVarLong(skipBuffer, input, bytesRemaining);
+            byte attributes = (byte) input.read();
+            long timestampDelta = ByteUtils.readVarlong(input);
             long timestamp = baseTimestamp + timestampDelta;
             if (logAppendTime != null)
                 timestamp = logAppendTime;
 
-            int offsetDelta = readVarInt(skipBuffer, input, bytesRemaining);
+            int offsetDelta = ByteUtils.readVarint(input);
             long offset = baseOffset + offsetDelta;
             int sequence = baseSequence >= 0 ?
                 DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) :
                 RecordBatch.NO_SEQUENCE;
 
-            // first skip key
-            int keySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+            // skip key
+            int keySize = ByteUtils.readVarint(input);
+            skipBytes(input, keySize);
 
-            // then skip value
-            int valueSize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+            // skip value
+            int valueSize = ByteUtils.readVarint(input);
+            skipBytes(input, valueSize);
 
-            // then skip header
-            int numHeaders = readVarInt(skipBuffer, input, bytesRemaining);
+            // skip header
+            int numHeaders = ByteUtils.readVarint(input);
             if (numHeaders < 0)
                 throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
             for (int i = 0; i < numHeaders; i++) {
-                int headerKeySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerKeySize = ByteUtils.readVarint(input);
                 if (headerKeySize < 0)
                     throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
+                skipBytes(input, headerKeySize);
 
                 // headerValueSize
-                skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerValueSize = ByteUtils.readVarint(input);
+                skipBytes(input, headerValueSize);
             }
 
-            if (bytesRemaining.value > 0 || skipBuffer.remaining() > 0)
-                throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
-                    " bytes in record payload, but there are still bytes remaining");
-
             return new PartialDefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, keySize, valueSize);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
             throw new InvalidRecordException("Found invalid record structure", e);
         }
     }
 
-    private static byte readByte(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 1 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return buffer.get();
-    }
-
-    private static long readVarLong(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 10 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return ByteUtils.readVarlong(buffer);
-    }
-
-    private static int readVarInt(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return ByteUtils.readVarint(buffer);
-    }
-
-    private static int skipLengthDelimitedField(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        boolean needMore = false;
-        int sizeInBytes = -1;
-        int bytesToSkip = -1;
-
-        while (true) {
-            if (needMore) {
-                readMore(buffer, input, bytesRemaining);
-                needMore = false;
-            }
-
-            if (bytesToSkip < 0) {
-                if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
-                    needMore = true;
-                } else {
-                    sizeInBytes = ByteUtils.readVarint(buffer);
-                    if (sizeInBytes <= 0)
-                        return sizeInBytes;
-                    else
-                        bytesToSkip = sizeInBytes;
 
+    /**
+     * Skips n bytes from the data input.

Review Comment:
   nit: Skips {@code bytesToSkip} bytes...



##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java:
##########
@@ -356,164 +346,100 @@ private static DefaultRecord readFrom(ByteBuffer buffer,
                 throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
                         " bytes in record payload, but instead read " + (buffer.position() - recordStart));
 
-            return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
+            int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
+            return new DefaultRecord(totalSizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
             throw new InvalidRecordException("Found invalid record structure", e);
         }
     }
 
-    public static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                         byte[] skipArray,
+    public static PartialDefaultRecord readPartiallyFrom(InputStream input,
                                                          long baseOffset,
                                                          long baseTimestamp,
                                                          int baseSequence,
                                                          Long logAppendTime) throws IOException {
         int sizeOfBodyInBytes = ByteUtils.readVarint(input);
         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
 
-        return readPartiallyFrom(input, skipArray, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
+        return readPartiallyFrom(input, totalSizeInBytes, baseOffset, baseTimestamp,
             baseSequence, logAppendTime);
     }
 
-    private static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                          byte[] skipArray,
+    private static PartialDefaultRecord readPartiallyFrom(InputStream input,
                                                           int sizeInBytes,
-                                                          int sizeOfBodyInBytes,
                                                           long baseOffset,
                                                           long baseTimestamp,
                                                           int baseSequence,
                                                           Long logAppendTime) throws IOException {
-        ByteBuffer skipBuffer = ByteBuffer.wrap(skipArray);
-        // set its limit to 0 to indicate no bytes readable yet
-        skipBuffer.limit(0);
-
         try {
-            // reading the attributes / timestamp / offset and key-size does not require
-            // any byte array allocation and therefore we can just read them straight-forwardly
-            IntRef bytesRemaining = PrimitiveRef.ofInt(sizeOfBodyInBytes);
-
-            byte attributes = readByte(skipBuffer, input, bytesRemaining);
-            long timestampDelta = readVarLong(skipBuffer, input, bytesRemaining);
+            byte attributes = (byte) input.read();
+            long timestampDelta = ByteUtils.readVarlong(input);
             long timestamp = baseTimestamp + timestampDelta;
             if (logAppendTime != null)
                 timestamp = logAppendTime;
 
-            int offsetDelta = readVarInt(skipBuffer, input, bytesRemaining);
+            int offsetDelta = ByteUtils.readVarint(input);
             long offset = baseOffset + offsetDelta;
             int sequence = baseSequence >= 0 ?
                 DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) :
                 RecordBatch.NO_SEQUENCE;
 
-            // first skip key
-            int keySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+            // skip key
+            int keySize = ByteUtils.readVarint(input);
+            skipBytes(input, keySize);
 
-            // then skip value
-            int valueSize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+            // skip value
+            int valueSize = ByteUtils.readVarint(input);
+            skipBytes(input, valueSize);
 
-            // then skip header
-            int numHeaders = readVarInt(skipBuffer, input, bytesRemaining);
+            // skip header
+            int numHeaders = ByteUtils.readVarint(input);
             if (numHeaders < 0)
                 throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
             for (int i = 0; i < numHeaders; i++) {
-                int headerKeySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerKeySize = ByteUtils.readVarint(input);
                 if (headerKeySize < 0)
                     throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
+                skipBytes(input, headerKeySize);
 
                 // headerValueSize
-                skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerValueSize = ByteUtils.readVarint(input);
+                skipBytes(input, headerValueSize);
             }
 
-            if (bytesRemaining.value > 0 || skipBuffer.remaining() > 0)
-                throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
-                    " bytes in record payload, but there are still bytes remaining");
-
             return new PartialDefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, keySize, valueSize);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
             throw new InvalidRecordException("Found invalid record structure", e);
         }
     }
 
-    private static byte readByte(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 1 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return buffer.get();
-    }
-
-    private static long readVarLong(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 10 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return ByteUtils.readVarlong(buffer);
-    }
-
-    private static int readVarInt(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return ByteUtils.readVarint(buffer);
-    }
-
-    private static int skipLengthDelimitedField(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        boolean needMore = false;
-        int sizeInBytes = -1;
-        int bytesToSkip = -1;
-
-        while (true) {
-            if (needMore) {
-                readMore(buffer, input, bytesRemaining);
-                needMore = false;
-            }
-
-            if (bytesToSkip < 0) {
-                if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
-                    needMore = true;
-                } else {
-                    sizeInBytes = ByteUtils.readVarint(buffer);
-                    if (sizeInBytes <= 0)
-                        return sizeInBytes;
-                    else
-                        bytesToSkip = sizeInBytes;
 
+    /**
+     * Skips n bytes from the data input.
+     *
+     * No-op for case where bytesToSkip <= 0. This could occur for cases where field is expected to be null.
+     * @throws  InvalidRecordException if the number of bytes could not be skipped.
+     */
+    private static void skipBytes(InputStream in, int bytesToSkip) throws IOException {
+        if (bytesToSkip <= 0) return;
+
+        // Starting JDK 12, this implementation could be replaced by InputStream#skipNBytes
+        while (bytesToSkip > 0) {
+            long ns = in.skip(bytesToSkip);
+            if (ns > 0 && ns <= bytesToSkip) {
+                // adjust number to skip
+                bytesToSkip -= ns;
+            } else if (ns == 0) { // no bytes skipped
+                // read one byte to check for EOS
+                if (in.read() == -1) {

Review Comment:
   What should we do if `ns == 0`, but not reaching EOS? Should we also throw an exception in this case?



##########
clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java:
##########
@@ -367,32 +403,164 @@ public void testStreamingIteratorConsistency() {
         }
     }
 
-    @Test
-    public void testSkipKeyValueIteratorCorrectness() {
-        Header[] headers = {new RecordHeader("k1", "v1".getBytes()), new RecordHeader("k2", "v2".getBytes())};
+    @ParameterizedTest
+    @EnumSource(value = CompressionType.class)
+    public void testSkipKeyValueIteratorCorrectness(CompressionType compressionType) throws NoSuchAlgorithmException {
+        Header[] headers = {new RecordHeader("k1", "v1".getBytes()), new RecordHeader("k2", null)};
+        byte[] largeRecordValue = new byte[200 * 1024]; // 200KB
+        RANDOM.nextBytes(largeRecordValue);
 
         MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
-            CompressionType.LZ4, TimestampType.CREATE_TIME,
+            compressionType, TimestampType.CREATE_TIME,
+            // one sample with small value size
             new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
-            new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
-            new SimpleRecord(3L, "c".getBytes(), "3".getBytes()),
-            new SimpleRecord(1000L, "abc".getBytes(), "0".getBytes()),
+            // one sample with null value
+            new SimpleRecord(2L, "b".getBytes(), null),
+            // one sample with null key
+            new SimpleRecord(3L, null, "3".getBytes()),
+            // one sample with null key and null value
+            new SimpleRecord(4L, null, (byte[]) null),
+            // one sample with large value size
+            new SimpleRecord(1000L, "abc".getBytes(), largeRecordValue),
+            // one sample with headers, one of the header has null value
             new SimpleRecord(9999L, "abc".getBytes(), "0".getBytes(), headers)
             );
+
         DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
-        try (CloseableIterator<Record> streamingIterator = batch.skipKeyValueIterator(BufferSupplier.NO_CACHING)) {
-            assertEquals(Arrays.asList(
-                new PartialDefaultRecord(9, (byte) 0, 0L, 1L, -1, 1, 1),
-                new PartialDefaultRecord(9, (byte) 0, 1L, 2L, -1, 1, 1),
-                new PartialDefaultRecord(9, (byte) 0, 2L, 3L, -1, 1, 1),
-                new PartialDefaultRecord(12, (byte) 0, 3L, 1000L, -1, 3, 1),
-                new PartialDefaultRecord(25, (byte) 0, 4L, 9999L, -1, 3, 1)
-                ),
-                Utils.toList(streamingIterator)
-            );
+
+        try (BufferSupplier bufferSupplier = BufferSupplier.create();
+             CloseableIterator<Record> skipKeyValueIterator = batch.skipKeyValueIterator(bufferSupplier)) {
+
+            if (CompressionType.NONE == compressionType) {
+                // assert that for uncompressed data stream record iterator is not used
+                assertTrue(skipKeyValueIterator instanceof DefaultRecordBatch.RecordIterator);
+                // superficial validation for correctness. Deep validation is already performed in other tests
+                assertEquals(Utils.toList(records.records()).size(), Utils.toList(skipKeyValueIterator).size());
+            } else {
+                // assert that a streaming iterator is used for compressed records
+                assertTrue(skipKeyValueIterator instanceof DefaultRecordBatch.StreamRecordIterator);
+                // assert correctness for compressed records
+                assertIterableEquals(Arrays.asList(
+                        new PartialDefaultRecord(9, (byte) 0, 0L, 1L, -1, 1, 1),
+                        new PartialDefaultRecord(8, (byte) 0, 1L, 2L, -1, 1, -1),
+                        new PartialDefaultRecord(8, (byte) 0, 2L, 3L, -1, -1, 1),
+                        new PartialDefaultRecord(7, (byte) 0, 3L, 4L, -1, -1, -1),
+                        new PartialDefaultRecord(15 + largeRecordValue.length, (byte) 0, 4L, 1000L, -1, 3, largeRecordValue.length),
+                        new PartialDefaultRecord(23, (byte) 0, 5L, 9999L, -1, 3, 1)
+                    ), Utils.toList(skipKeyValueIterator));
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    public void testBufferReuseInSkipKeyValueIterator(CompressionType compressionType, int expectedNumBufferAllocations, byte[] recordValue) {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+            compressionType, TimestampType.CREATE_TIME,
+            new SimpleRecord(1000L, "a".getBytes(), "0".getBytes()),
+            new SimpleRecord(9999L, "b".getBytes(), recordValue)
+        );
+
+        DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
+
+        try (BufferSupplier bufferSupplier = spy(BufferSupplier.create());
+             CloseableIterator<Record> streamingIterator = batch.skipKeyValueIterator(bufferSupplier)) {
+
+            // Consume through the iterator
+            Utils.toList(streamingIterator);
+
+            // Close the iterator to release any buffers
+            streamingIterator.close();
+
+            // assert number of buffer allocations
+            verify(bufferSupplier, times(expectedNumBufferAllocations)).get(anyInt());
+            verify(bufferSupplier, times(expectedNumBufferAllocations)).release(any(ByteBuffer.class));
+        }
+    }
+    private static Stream<Arguments> testBufferReuseInSkipKeyValueIterator() throws NoSuchAlgorithmException {
+        byte[] smallRecordValue = "1".getBytes();
+        byte[] largeRecordValue = new byte[512 * 1024]; // 512KB
+        RANDOM.nextBytes(largeRecordValue);
+
+        return Stream.of(
+            /*
+             * 1 allocation per batch (i.e. per iterator instance) for buffer holding uncompressed data
+             * = 1 buffer allocations
+             */
+            Arguments.of(CompressionType.GZIP, 1, smallRecordValue),
+            Arguments.of(CompressionType.GZIP, 1, largeRecordValue),
+            Arguments.of(CompressionType.SNAPPY, 1, smallRecordValue),
+            Arguments.of(CompressionType.SNAPPY, 1, largeRecordValue),
+            /*
+             * 1 allocation per batch (i.e. per iterator instance) for buffer holding compressed data
+             * 1 allocation per batch (i.e. per iterator instance) for buffer holding uncompressed data
+             * = 2 buffer allocations
+             */
+            Arguments.of(CompressionType.LZ4, 2, smallRecordValue),
+            Arguments.of(CompressionType.LZ4, 2, largeRecordValue),
+            Arguments.of(CompressionType.ZSTD, 2, smallRecordValue),
+            Arguments.of(CompressionType.ZSTD, 2, largeRecordValue)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    public void testZstdJniForSkipKeyValueIterator(int expectedJniCalls, byte[] recordValue) throws IOException {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+            CompressionType.ZSTD, TimestampType.CREATE_TIME,
+            new SimpleRecord(9L, "hakuna-matata".getBytes(), recordValue)
+        );
+
+        // Buffer containing compressed data
+        final ByteBuffer compressedBuf = records.buffer();
+        // Create a RecordBatch object
+        final DefaultRecordBatch batch = spy(new DefaultRecordBatch(compressedBuf.duplicate()));
+        final CompressionType mockCompression = mock(CompressionType.ZSTD.getClass());
+        doReturn(mockCompression).when(batch).compressionType();
+
+        // Buffer containing compressed records to be used for creating zstd-jni stream
+        ByteBuffer recordsBuffer = compressedBuf.duplicate();
+        recordsBuffer.position(RECORDS_OFFSET);
+
+        try (final BufferSupplier bufferSupplier = BufferSupplier.create();
+             final InputStream zstdStream = spy(ZstdFactory.wrapForInput(recordsBuffer, batch.magic(), bufferSupplier));
+             final InputStream chunkedStream = new ChunkedBytesStream(zstdStream, bufferSupplier, 16 * 1024, false)) {
+
+            when(mockCompression.wrapForInput(any(ByteBuffer.class), anyByte(), any(BufferSupplier.class))).thenReturn(chunkedStream);
+
+            try (CloseableIterator<Record> streamingIterator = batch.skipKeyValueIterator(bufferSupplier)) {
+                assertNotNull(streamingIterator);
+                Utils.toList(streamingIterator);
+                // verify the number of read() calls to zstd JNI stream. Each read() call is a JNI call.
+                verify(zstdStream, times(expectedJniCalls)).read(any(byte[].class), anyInt(), anyInt());
+                // verify that we don't use the underlying skip() functionality. The underlying skip() allocates
+                // 1 buffer per skip call from he buffer pool whereas our implementation

Review Comment:
   whereas our implementation?



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the following differences:

Review Comment:
   a copy of `ByteBufferInputStream`? `BufferedInputStream`?



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the following differences:
+ * - Unlike {@link java.io.BufferedInputStream#skip(long)} this class could be configured to not push skip() to
+ * input stream. We may want to avoid pushing this to input stream because it's implementation maybe inefficient,
+ * e.g. the case of ZstdInputStream which allocates a new buffer from buffer pool, per skip call.
+ * - Unlike {@link java.io.BufferedInputStream}, which allocates an intermediate buffer, this uses a buffer supplier to
+ * create the intermediate buffer.
+ * <p>
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this.
+ * - the implementation of this class is performance sensitive. Minor changes such as usage of ByteBuffer instead of byte[]
+ * can significantly impact performance, hence, proceed with caution.
+ */
+public class ChunkedBytesStream extends FilterInputStream {
+    /**
+     * Supplies the ByteBuffer which is used as intermediate buffer to store the chunk of output data.
+     */
+    private final BufferSupplier bufferSupplier;
+    /**
+     * Intermediate buffer to store the chunk of output data. The ChunkedBytesStream is considered closed if
+     * this buffer is null.
+     */
+    private byte[] intermediateBuf;
+    /**
+     * The index one greater than the index of the last valid byte in
+     * the buffer.
+     * This value is always in the range <code>0</code> through <code>intermediateBuf.length</code>;
+     * elements <code>intermediateBuf[0]</code>  through <code>intermediateBuf[count-1]
+     * </code>contain buffered input data obtained
+     * from the underlying  input stream.
+     */
+    protected int count = 0;
+    /**
+     * The current position in the buffer. This is the index of the next
+     * character to be read from the <code>buf</code> array.
+     * <p>
+     * This value is always in the range <code>0</code>
+     * through <code>count</code>. If it is less
+     * than <code>count</code>, then  <code>intermediateBuf[pos]</code>
+     * is the next byte to be supplied as input;
+     * if it is equal to <code>count</code>, then
+     * the  next <code>read</code> or <code>skip</code>
+     * operation will require more bytes to be
+     * read from the contained  input stream.
+     */
+    protected int pos = 0;
+    /**
+     * Reference for the intermediate buffer. This reference is only kept for releasing the buffer from the
+     * buffer supplier.
+     */
+    private final ByteBuffer intermediateBufRef;
+    /**
+     * Determines if the skip be pushed down
+     */
+    private final boolean pushSkipToSourceStream;

Review Comment:
   Could we add more explanation for this variable? It's quite difficult to know from this comment. Also, the variable name is really hard to understand. My understanding is if it's true, we don't even read anything from the source stream for N bytes, if false, we read N bytes and fill into intermediate buffer, and then skip it. Is that right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1082588469


##########
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##########
@@ -62,10 +68,11 @@ public void release(ByteBuffer buffer) {
                 }
             };
 
-            // Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
-            // in cases where the caller reads a small number of bytes (potentially a single byte).
-            return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer),
-                bufferPool), 16 * 1024);
+            // We do not use an intermediate buffer to store the decompressed data as a result of JNI read() calls using
+            // `ZstdInputStreamNoFinalizer` here. Every read() call to `DataInputStream` will be a JNI call and the
+            // caller is expected to balance the tradeoff between reading large amount of data vs. making multiple JNI
+            // calls.
+            return new DataInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer), bufferPool));

Review Comment:
   2 questions:
   1. Why do we wrap into DataInputStream?
   2. Have as checked that there are no workloads where we end up doing too many JNI calls?



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -47,6 +47,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             return new ByteBufferInputStream(buffer);
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 2 * 1024; // 2KB

Review Comment:
   What's the meaning of this for an uncompressed stream?



##########
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##########
@@ -26,21 +26,25 @@
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 public class ZstdFactory {
+    /**
+     * Default compression level
+     */
+    private static final int DEFAULT_COMPRESSION_LEVEL = 3;

Review Comment:
   Since this is unrelated, do we have to include it as part of this PR?



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier);
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 16 * 1024; // 16KB

Review Comment:
   We decided not to get this info from the zstd library?



##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##########
@@ -273,20 +272,32 @@ public int partitionLeaderEpoch() {
     public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
         final ByteBuffer buffer = this.buffer.duplicate();
         buffer.position(RECORDS_OFFSET);
-        return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier));
+        final InputStream decompressedStream = compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+        return decompressedStream instanceof DataInputStream ? (DataInputStream) decompressedStream : new DataInputStream(decompressedStream);
     }
 
     private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) {
         final DataInputStream inputStream = recordInputStream(bufferSupplier);
 
         if (skipKeyValue) {
             // this buffer is used to skip length delimited fields like key, value, headers
-            byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
+            final ByteBuffer skipBuffer = bufferSupplier.get(compressionType().getRecommendedDOutSize());

Review Comment:
   I thought we wanted to call the underlying skipBytes API versus doing the skipping by reading into a skip buffer. I don't see that change. What am I missing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1082623877


##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier);
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 16 * 1024; // 16KB

Review Comment:
   OK, that's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1200358594


##########
clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java:
##########
@@ -367,32 +403,164 @@ public void testStreamingIteratorConsistency() {
         }
     }
 
-    @Test
-    public void testSkipKeyValueIteratorCorrectness() {
-        Header[] headers = {new RecordHeader("k1", "v1".getBytes()), new RecordHeader("k2", "v2".getBytes())};
+    @ParameterizedTest
+    @EnumSource(value = CompressionType.class)
+    public void testSkipKeyValueIteratorCorrectness(CompressionType compressionType) throws NoSuchAlgorithmException {
+        Header[] headers = {new RecordHeader("k1", "v1".getBytes()), new RecordHeader("k2", null)};
+        byte[] largeRecordValue = new byte[200 * 1024]; // 200KB
+        RANDOM.nextBytes(largeRecordValue);
 
         MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
-            CompressionType.LZ4, TimestampType.CREATE_TIME,
+            compressionType, TimestampType.CREATE_TIME,
+            // one sample with small value size
             new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
-            new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
-            new SimpleRecord(3L, "c".getBytes(), "3".getBytes()),
-            new SimpleRecord(1000L, "abc".getBytes(), "0".getBytes()),
+            // one sample with null value
+            new SimpleRecord(2L, "b".getBytes(), null),
+            // one sample with null key
+            new SimpleRecord(3L, null, "3".getBytes()),
+            // one sample with null key and null value
+            new SimpleRecord(4L, null, (byte[]) null),
+            // one sample with large value size
+            new SimpleRecord(1000L, "abc".getBytes(), largeRecordValue),
+            // one sample with headers, one of the header has null value
             new SimpleRecord(9999L, "abc".getBytes(), "0".getBytes(), headers)
             );
+
         DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
-        try (CloseableIterator<Record> streamingIterator = batch.skipKeyValueIterator(BufferSupplier.NO_CACHING)) {
-            assertEquals(Arrays.asList(
-                new PartialDefaultRecord(9, (byte) 0, 0L, 1L, -1, 1, 1),
-                new PartialDefaultRecord(9, (byte) 0, 1L, 2L, -1, 1, 1),
-                new PartialDefaultRecord(9, (byte) 0, 2L, 3L, -1, 1, 1),
-                new PartialDefaultRecord(12, (byte) 0, 3L, 1000L, -1, 3, 1),
-                new PartialDefaultRecord(25, (byte) 0, 4L, 9999L, -1, 3, 1)
-                ),
-                Utils.toList(streamingIterator)
-            );
+
+        try (BufferSupplier bufferSupplier = BufferSupplier.create();
+             CloseableIterator<Record> skipKeyValueIterator = batch.skipKeyValueIterator(bufferSupplier)) {
+
+            if (CompressionType.NONE == compressionType) {
+                // assert that for uncompressed data stream record iterator is not used
+                assertTrue(skipKeyValueIterator instanceof DefaultRecordBatch.RecordIterator);
+                // superficial validation for correctness. Deep validation is already performed in other tests
+                assertEquals(Utils.toList(records.records()).size(), Utils.toList(skipKeyValueIterator).size());
+            } else {
+                // assert that a streaming iterator is used for compressed records
+                assertTrue(skipKeyValueIterator instanceof DefaultRecordBatch.StreamRecordIterator);
+                // assert correctness for compressed records
+                assertIterableEquals(Arrays.asList(
+                        new PartialDefaultRecord(9, (byte) 0, 0L, 1L, -1, 1, 1),
+                        new PartialDefaultRecord(8, (byte) 0, 1L, 2L, -1, 1, -1),
+                        new PartialDefaultRecord(8, (byte) 0, 2L, 3L, -1, -1, 1),
+                        new PartialDefaultRecord(7, (byte) 0, 3L, 4L, -1, -1, -1),
+                        new PartialDefaultRecord(15 + largeRecordValue.length, (byte) 0, 4L, 1000L, -1, 3, largeRecordValue.length),
+                        new PartialDefaultRecord(23, (byte) 0, 5L, 9999L, -1, 3, 1)
+                    ), Utils.toList(skipKeyValueIterator));
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    public void testBufferReuseInSkipKeyValueIterator(CompressionType compressionType, int expectedNumBufferAllocations, byte[] recordValue) {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+            compressionType, TimestampType.CREATE_TIME,
+            new SimpleRecord(1000L, "a".getBytes(), "0".getBytes()),
+            new SimpleRecord(9999L, "b".getBytes(), recordValue)
+        );
+
+        DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
+
+        try (BufferSupplier bufferSupplier = spy(BufferSupplier.create());
+             CloseableIterator<Record> streamingIterator = batch.skipKeyValueIterator(bufferSupplier)) {
+
+            // Consume through the iterator
+            Utils.toList(streamingIterator);
+
+            // Close the iterator to release any buffers
+            streamingIterator.close();
+
+            // assert number of buffer allocations
+            verify(bufferSupplier, times(expectedNumBufferAllocations)).get(anyInt());
+            verify(bufferSupplier, times(expectedNumBufferAllocations)).release(any(ByteBuffer.class));
+        }
+    }
+    private static Stream<Arguments> testBufferReuseInSkipKeyValueIterator() throws NoSuchAlgorithmException {
+        byte[] smallRecordValue = "1".getBytes();
+        byte[] largeRecordValue = new byte[512 * 1024]; // 512KB
+        RANDOM.nextBytes(largeRecordValue);
+
+        return Stream.of(
+            /*
+             * 1 allocation per batch (i.e. per iterator instance) for buffer holding uncompressed data
+             * = 1 buffer allocations
+             */
+            Arguments.of(CompressionType.GZIP, 1, smallRecordValue),
+            Arguments.of(CompressionType.GZIP, 1, largeRecordValue),
+            Arguments.of(CompressionType.SNAPPY, 1, smallRecordValue),
+            Arguments.of(CompressionType.SNAPPY, 1, largeRecordValue),
+            /*
+             * 1 allocation per batch (i.e. per iterator instance) for buffer holding compressed data
+             * 1 allocation per batch (i.e. per iterator instance) for buffer holding uncompressed data
+             * = 2 buffer allocations
+             */
+            Arguments.of(CompressionType.LZ4, 2, smallRecordValue),
+            Arguments.of(CompressionType.LZ4, 2, largeRecordValue),
+            Arguments.of(CompressionType.ZSTD, 2, smallRecordValue),
+            Arguments.of(CompressionType.ZSTD, 2, largeRecordValue)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    public void testZstdJniForSkipKeyValueIterator(int expectedJniCalls, byte[] recordValue) throws IOException {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+            CompressionType.ZSTD, TimestampType.CREATE_TIME,
+            new SimpleRecord(9L, "hakuna-matata".getBytes(), recordValue)
+        );
+
+        // Buffer containing compressed data
+        final ByteBuffer compressedBuf = records.buffer();
+        // Create a RecordBatch object
+        final DefaultRecordBatch batch = spy(new DefaultRecordBatch(compressedBuf.duplicate()));
+        final CompressionType mockCompression = mock(CompressionType.ZSTD.getClass());
+        doReturn(mockCompression).when(batch).compressionType();
+
+        // Buffer containing compressed records to be used for creating zstd-jni stream
+        ByteBuffer recordsBuffer = compressedBuf.duplicate();
+        recordsBuffer.position(RECORDS_OFFSET);
+
+        try (final BufferSupplier bufferSupplier = BufferSupplier.create();
+             final InputStream zstdStream = spy(ZstdFactory.wrapForInput(recordsBuffer, batch.magic(), bufferSupplier));
+             final InputStream chunkedStream = new ChunkedBytesStream(zstdStream, bufferSupplier, 16 * 1024, false)) {
+
+            when(mockCompression.wrapForInput(any(ByteBuffer.class), anyByte(), any(BufferSupplier.class))).thenReturn(chunkedStream);
+
+            try (CloseableIterator<Record> streamingIterator = batch.skipKeyValueIterator(bufferSupplier)) {
+                assertNotNull(streamingIterator);
+                Utils.toList(streamingIterator);
+                // verify the number of read() calls to zstd JNI stream. Each read() call is a JNI call.
+                verify(zstdStream, times(expectedJniCalls)).read(any(byte[].class), anyInt(), anyInt());
+                // verify that we don't use the underlying skip() functionality. The underlying skip() allocates
+                // 1 buffer per skip call from he buffer pool whereas our implementation

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1158577698


##########
clients/src/main/java/org/apache/kafka/common/utils/BytesStreamBufferSource.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides a BytesStream wrapper over a buffer.
+ */
+public class BytesStreamBufferSource implements BytesStream {
+    final private ByteBuffer buf;
+
+    public BytesStreamBufferSource(final ByteBuffer buffer) {
+        // we do not modify the markers of source buffer
+        buf = buffer.duplicate();
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+        return buf.get() & 0xFF;
+    }
+
+    @Override
+    public int skipBytes(int toSkip) {
+        if (toSkip <= 0) {
+            return 0;
+        }
+
+        int avail = Math.min(toSkip, buf.remaining());
+        buf.position(buf.position() + avail);
+        return avail;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+
+        len = Math.min(len, buf.remaining());
+        buf.get(b, off, len);
+        return len;
+    }
+
+    @Override
+    public byte readByte() throws IOException {
+        try {
+            return buf.get();

Review Comment:
   Would it be better to call `read` and check for `-1`? That would result in one exception (and resulting stacktrace) vs two.



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call crosses JNI boundary.
+ * <p>
+ * The functionality of this stream is a combination of DataInput and BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()

Review Comment:
   Incomplete sentence?



##########
clients/src/main/java/org/apache/kafka/common/utils/SkippableChunkedBytesStream.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * SkippableChunkedBytesStream is a variant of ChunkedBytesStream which does not push skip() to the sourceStream.
+ * <p>
+ * Unlike BufferedInputStream.skip() and ChunkedBytesStream.skip(), this does not push skip() to sourceStream.
+ * We want to avoid pushing this to sourceStream because it's implementation maybe inefficient, e.g. the case of Z
+ * stdInputStream which allocates a new buffer from buffer pool, per skip call.
+ *
+ * @see ChunkedBytesStream
+ */
+public class SkippableChunkedBytesStream extends ChunkedBytesStream {

Review Comment:
   Would it be simpler to make this behavior configurable in `ChunkedBytesStream`? Also, I thought the zstd issue was fixed?



##########
clients/src/test/java/org/apache/kafka/common/utils/SkippableChunkedBytesStreamTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SkippableChunkedBytesStreamTest {
+    private static final Random RANDOM = new Random(1337);
+    private final BufferSupplier supplier = BufferSupplier.NO_CACHING;
+
+    @ParameterizedTest
+    @MethodSource("provideSourceSkipValuesForTest")
+    public void skip_testCorrectness(int bytesToPreRead, ByteBuffer inputBuf, int numBytesToSkip) throws IOException {
+        int expectedInpLeftAfterSkip = inputBuf.remaining() - bytesToPreRead - numBytesToSkip;
+        int expectedSkippedBytes = Math.min(inputBuf.remaining() - bytesToPreRead, numBytesToSkip);
+
+        try (BytesStream is = new ChunkedBytesStream(new ByteBufferInputStream(inputBuf.duplicate()), supplier, 10)) {
+            int cnt = 0;
+            while (cnt++ < bytesToPreRead) {
+                is.readByte();
+            }
+
+            int res = is.skipBytes(numBytesToSkip);
+            assertEquals(expectedSkippedBytes, res);
+
+            // verify that we are able to read rest of the input
+            cnt = 0;
+            while (cnt++ < expectedInpLeftAfterSkip) {
+                is.readByte();
+            }
+        }
+    }
+
+    @Test
+    public void skip_testEndOfSource() throws IOException {

Review Comment:
   We don't typically use this kind of naming convention in our tests (i.e. using `_`).



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -64,17 +66,20 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         }
 
         @Override
-        public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
+        public BytesStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                // Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to
-                // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small
-                // number of bytes (potentially a single byte)
-                return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024),
-                        16 * 1024);
+                // Set output buffer (uncompressed) to 16 KB and Set input buffer (compressed) to 8 KB (0.5 KB by default) to ensure reasonable performance in cases

Review Comment:
   16 KB comes from `getRecommendedDOutSize` now, it's a bit confusing to have that part of the comment here.



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -108,12 +117,18 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         @Override
         public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
-                                                    messageVersion == RecordBatch.MAGIC_VALUE_V0);
+                return new ChunkedDataInputStream(
+                    new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier, messageVersion == RecordBatch.MAGIC_VALUE_V0),
+                    decompressionBufferSupplier, getRecommendedDOutSize());
             } catch (Throwable e) {
                 throw new KafkaException(e);
             }
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 2 * 1024; // 2KB

Review Comment:
   It's worth adding a comment in the method itself - it will be hard to remember otherwise.



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -90,8 +95,13 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         }
 
         @Override
-        public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
-            return SnappyFactory.wrapForInput(buffer);
+        public BytesStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
+            return new SkippableChunkedBytesStream(SnappyFactory.wrapForInput(buffer), decompressionBufferSupplier, getRecommendedDOutSize());
+        }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 8 * 1024; // 8KB

Review Comment:
   Why is this 8KB while the lz4 one is 2KB?



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks of configurable size. The

Review Comment:
   What is a `ByteReader`? Did you mean `BytesStream`?



##########
clients/src/main/java/org/apache/kafka/common/utils/BytesStreamBufferSource.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides a BytesStream wrapper over a buffer.
+ */
+public class BytesStreamBufferSource implements BytesStream {

Review Comment:
   Maybe call this `ByteBufferBytesStream`?



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call crosses JNI boundary.
+ * <p>
+ * The functionality of this stream is a combination of DataInput and BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()
+ * - Unlike BufferedInputStream, which allocates an intermediate buffer, this uses a buffer supplier to create the
+ * intermediate buffer
+ * - Unlike DataInputStream, the readByte method does not push the reading of a byte to sourceStream.
+ * <p>
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this.
+ * - many method are un-supported in this class because they aren't currently used in the caller code.
+ * - the implementation of this class is performance sensitive. Minor changes as usage of ByteBuffer instead of byte[]
+ *   can significantly impact performance, hence, proceed with caution.
+ */
+public class ChunkedBytesStream implements BytesStream {
+    /**
+     * Supplies the ByteBuffer which is used as intermediate buffer to store the chunk of output data.
+     */
+    private final BufferSupplier bufferSupplier;
+    /**
+     * Source stream containing compressed data.
+     */
+    private InputStream sourceStream;
+    /**
+     * Intermediate buffer to store the chunk of output data. The ChunkedBytesStream is considered closed if
+     * this buffer is null.
+     */
+    private byte[] intermediateBuf;
+    protected int limit;
+    /**
+     *
+     */
+    protected int pos;
+    /**
+     * Reference for the intermediate buffer. This reference is only kept for releasing the buffer from the '
+     * buffer supplier.
+     */
+    private ByteBuffer intermediateBufRef;
+
+
+    public ChunkedBytesStream(InputStream sourceStream, BufferSupplier bufferSupplier, int intermediateBufSize) {
+        this.bufferSupplier = bufferSupplier;
+        this.sourceStream = sourceStream;
+        intermediateBufRef = bufferSupplier.get(intermediateBufSize);
+        if (!intermediateBufRef.hasArray() || (intermediateBufRef.arrayOffset() != 0)) {
+            throw new IllegalArgumentException("provided ByteBuffer lacks array or has non-zero arrayOffset");
+        }
+        intermediateBuf = intermediateBufRef.array();
+    }
+
+    private byte[] getBufIfOpen() throws IOException {
+        byte[] buffer = intermediateBuf;
+        if (buffer == null)
+            throw new IOException("Stream closed");
+        return buffer;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (pos >= limit) {
+            fill();
+            if (pos >= limit)
+                return -1;
+        }
+
+        return getBufIfOpen()[pos++] & 0xff;
+    }
+
+    InputStream getInIfOpen() throws IOException {
+        InputStream input = sourceStream;
+        if (input == null)
+            throw new IOException("Stream closed");
+        return input;
+    }
+
+    /**
+     * Fills the intermediate buffer with more data. The amount of new data read is equal to the remaining empty space
+     * in the buffer. For optimal performance, read as much data as possible in this call.
+     */
+    int fill() throws IOException {
+        byte[] buffer = getBufIfOpen();
+
+        // switch to writing mode
+        pos = 0;
+        limit = pos;
+        int bytesRead = getInIfOpen().read(buffer, pos, buffer.length - pos);
+
+        if (bytesRead > 0)
+            limit = bytesRead + pos;
+
+        return bytesRead;
+    }
+
+    @Override
+    public void close() throws IOException {
+        byte[] mybuf = intermediateBuf;
+        intermediateBuf = null;
+
+        InputStream input = sourceStream;
+        sourceStream = null;
+
+        if (mybuf != null)
+            bufferSupplier.release(intermediateBufRef);
+        if (input != null)
+            input.close();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int totalRead = 0;
+        int bytesRead = 0;
+        while (totalRead < len) {
+            bytesRead = 0;
+            int toRead = len - totalRead;
+            if (pos >= limit) {
+                if (toRead >= getBufIfOpen().length) {
+                    // don't use intermediate buffer if we need to read more than it's capacity
+                    bytesRead = getInIfOpen().read(b, off + totalRead, toRead);
+                    if (bytesRead < 0)
+                        break;
+                } else {
+                    fill();
+                    if (pos >= limit)
+                        break;
+                }
+            } else {
+                int avail = limit - pos;
+                toRead = (avail < toRead) ? avail : toRead;
+                System.arraycopy(getBufIfOpen(), pos, b, off + totalRead, toRead);
+                pos += toRead;
+                bytesRead = toRead;
+            }
+
+            totalRead += bytesRead;
+        }
+
+        if ((bytesRead <= 0) && (totalRead < len))
+            return -1;
+
+        return totalRead;
+    }
+
+    @Override
+    public int skipBytes(int toSkip) throws IOException {
+        if (toSkip <= 0) {
+            return 0;
+        }
+        int totalSkipped = 0;
+
+        // Skip what exists in the intermediate buffer first
+        int avail = limit - pos;
+        int bytesToRead = (avail < (toSkip - totalSkipped)) ? avail : (toSkip - totalSkipped);
+        pos += bytesToRead;
+        totalSkipped += bytesToRead;
+
+        // Use sourceStream's skip() to skip the rest
+        while ((totalSkipped < toSkip) && ((bytesToRead = (int) getInIfOpen().skip(toSkip - totalSkipped)) > 0)) {
+            totalSkipped += bytesToRead;
+        }
+
+        return totalSkipped;
+    }
+
+    @Override
+    public byte readByte() throws IOException {
+        if (pos >= limit) {
+            fill();
+            if (pos >= limit)
+                throw new EOFException();
+        }
+        return getBufIfOpen()[pos++];
+    }
+
+    // visible for testing
+    public InputStream getSourceStream() {

Review Comment:
   Nit: we typically avoid the `get` prefix in Kafka.



##########
clients/src/main/java/org/apache/kafka/common/utils/BytesStream.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * This interface provides for reading bytes from an underlying source. The source could be a buffer or a stream.
+ * It extends the {@link Closeable} interface to ensure that the source is appropriately closed (if required).
+ */
+public interface BytesStream extends Closeable {
+    /**
+     * The interface is based on {@link InputStream#read()} and follows the same contract.

Review Comment:
   Can we have the description inline with a see also vs not having anything useful here? Same for the other methods.



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call crosses JNI boundary.
+ * <p>
+ * The functionality of this stream is a combination of DataInput and BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()
+ * - Unlike BufferedInputStream, which allocates an intermediate buffer, this uses a buffer supplier to create the
+ * intermediate buffer
+ * - Unlike DataInputStream, the readByte method does not push the reading of a byte to sourceStream.
+ * <p>
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this.
+ * - many method are un-supported in this class because they aren't currently used in the caller code.

Review Comment:
   Is this still true?



##########
raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java:
##########
@@ -215,14 +215,14 @@ private Batch<T> readBatch(DefaultRecordBatch batch) {
             }
 
             List<T> records = new ArrayList<>(numRecords);
-            DataInputStream input = new DataInputStream(batch.recordInputStream(bufferSupplier));
+            BytesStream input = batch.recordInputStream(bufferSupplier);
             try {
                 for (int i = 0; i < numRecords; i++) {
                     T record = readRecord(input, batch.sizeInBytes());
                     records.add(record);
                 }
             } finally {
-                Utils.closeQuietly(input, "DataInputStream");
+                Utils.closeQuietly(input, "RecordBatchBytesStream");

Review Comment:
   It's a bit odd to include this string when the type above is `BytesStream`.



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -157,7 +186,14 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu
      *                                    batch. As such, a supplier that reuses buffers will have a significant
      *                                    performance impact.
      */
-    public abstract InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier);
+    public abstract BytesStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier);
+
+    /**
+     * Recommended size of buffer for storing decompressed output.
+     */
+    public int getRecommendedDOutSize() {

Review Comment:
   We don't typically use `get` as a prefix for method. Also, this name is a bit obscure, could it be `decompressedOutputSize` or something like that?



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier);
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 16 * 1024; // 16KB

Review Comment:
   Also, can you please remind me if the 16 KB number is different from what was there before for this case?



##########
clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java:
##########
@@ -29,6 +30,11 @@ public ByteBufferInputStream(ByteBuffer buffer) {
         this.buffer = buffer;
     }
 
+    @Override
+    public int available() throws IOException {

Review Comment:
   Did any tests fail as a result of this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1165568140


##########
raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java:
##########
@@ -215,14 +215,14 @@ private Batch<T> readBatch(DefaultRecordBatch batch) {
             }
 
             List<T> records = new ArrayList<>(numRecords);
-            DataInputStream input = new DataInputStream(batch.recordInputStream(bufferSupplier));
+            BytesStream input = batch.recordInputStream(bufferSupplier);
             try {
                 for (int i = 0; i < numRecords; i++) {
                     T record = readRecord(input, batch.sizeInBytes());
                     records.add(record);
                 }
             } finally {
-                Utils.closeQuietly(input, "DataInputStream");
+                Utils.closeQuietly(input, "RecordBatchBytesStream");

Review Comment:
   Changed it to "BytesStream for input containing records" to make it more explicit as to what are we closing here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1172882018


##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -108,12 +117,18 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         @Override
         public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
-                                                    messageVersion == RecordBatch.MAGIC_VALUE_V0);
+                return new ChunkedDataInputStream(
+                    new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier, messageVersion == RecordBatch.MAGIC_VALUE_V0),
+                    decompressionBufferSupplier, getRecommendedDOutSize());
             } catch (Throwable e) {
                 throw new KafkaException(e);
             }
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 2 * 1024; // 2KB

Review Comment:
   Done in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1172883282


##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -157,7 +186,14 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu
      *                                    batch. As such, a supplier that reuses buffers will have a significant
      *                                    performance impact.
      */
-    public abstract InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier);
+    public abstract BytesStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier);
+
+    /**
+     * Recommended size of buffer for storing decompressed output.
+     */
+    public int getRecommendedDOutSize() {

Review Comment:
   changed in latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1200352336


##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java:
##########
@@ -356,164 +346,100 @@ private static DefaultRecord readFrom(ByteBuffer buffer,
                 throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
                         " bytes in record payload, but instead read " + (buffer.position() - recordStart));
 
-            return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
+            int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
+            return new DefaultRecord(totalSizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
             throw new InvalidRecordException("Found invalid record structure", e);
         }
     }
 
-    public static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                         byte[] skipArray,
+    public static PartialDefaultRecord readPartiallyFrom(InputStream input,
                                                          long baseOffset,
                                                          long baseTimestamp,
                                                          int baseSequence,
                                                          Long logAppendTime) throws IOException {
         int sizeOfBodyInBytes = ByteUtils.readVarint(input);
         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
 
-        return readPartiallyFrom(input, skipArray, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
+        return readPartiallyFrom(input, totalSizeInBytes, baseOffset, baseTimestamp,
             baseSequence, logAppendTime);
     }
 
-    private static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                          byte[] skipArray,
+    private static PartialDefaultRecord readPartiallyFrom(InputStream input,
                                                           int sizeInBytes,
-                                                          int sizeOfBodyInBytes,
                                                           long baseOffset,
                                                           long baseTimestamp,
                                                           int baseSequence,
                                                           Long logAppendTime) throws IOException {
-        ByteBuffer skipBuffer = ByteBuffer.wrap(skipArray);
-        // set its limit to 0 to indicate no bytes readable yet
-        skipBuffer.limit(0);
-
         try {
-            // reading the attributes / timestamp / offset and key-size does not require
-            // any byte array allocation and therefore we can just read them straight-forwardly
-            IntRef bytesRemaining = PrimitiveRef.ofInt(sizeOfBodyInBytes);
-
-            byte attributes = readByte(skipBuffer, input, bytesRemaining);
-            long timestampDelta = readVarLong(skipBuffer, input, bytesRemaining);
+            byte attributes = (byte) input.read();
+            long timestampDelta = ByteUtils.readVarlong(input);
             long timestamp = baseTimestamp + timestampDelta;
             if (logAppendTime != null)
                 timestamp = logAppendTime;
 
-            int offsetDelta = readVarInt(skipBuffer, input, bytesRemaining);
+            int offsetDelta = ByteUtils.readVarint(input);
             long offset = baseOffset + offsetDelta;
             int sequence = baseSequence >= 0 ?
                 DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) :
                 RecordBatch.NO_SEQUENCE;
 
-            // first skip key
-            int keySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+            // skip key
+            int keySize = ByteUtils.readVarint(input);
+            skipBytes(input, keySize);
 
-            // then skip value
-            int valueSize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+            // skip value
+            int valueSize = ByteUtils.readVarint(input);
+            skipBytes(input, valueSize);
 
-            // then skip header
-            int numHeaders = readVarInt(skipBuffer, input, bytesRemaining);
+            // skip header
+            int numHeaders = ByteUtils.readVarint(input);
             if (numHeaders < 0)
                 throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
             for (int i = 0; i < numHeaders; i++) {
-                int headerKeySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerKeySize = ByteUtils.readVarint(input);
                 if (headerKeySize < 0)
                     throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
+                skipBytes(input, headerKeySize);
 
                 // headerValueSize
-                skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerValueSize = ByteUtils.readVarint(input);
+                skipBytes(input, headerValueSize);
             }
 
-            if (bytesRemaining.value > 0 || skipBuffer.remaining() > 0)
-                throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
-                    " bytes in record payload, but there are still bytes remaining");
-
             return new PartialDefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, keySize, valueSize);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
             throw new InvalidRecordException("Found invalid record structure", e);
         }
     }
 
-    private static byte readByte(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 1 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return buffer.get();
-    }
-
-    private static long readVarLong(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 10 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return ByteUtils.readVarlong(buffer);
-    }
-
-    private static int readVarInt(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return ByteUtils.readVarint(buffer);
-    }
-
-    private static int skipLengthDelimitedField(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
-        boolean needMore = false;
-        int sizeInBytes = -1;
-        int bytesToSkip = -1;
-
-        while (true) {
-            if (needMore) {
-                readMore(buffer, input, bytesRemaining);
-                needMore = false;
-            }
-
-            if (bytesToSkip < 0) {
-                if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
-                    needMore = true;
-                } else {
-                    sizeInBytes = ByteUtils.readVarint(buffer);
-                    if (sizeInBytes <= 0)
-                        return sizeInBytes;
-                    else
-                        bytesToSkip = sizeInBytes;
 
+    /**
+     * Skips n bytes from the data input.

Review Comment:
   Ah! Thanks for catching. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1200353574


##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the following differences:

Review Comment:
   `BufferedInputStream` is the correct one. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1200402627


##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the following differences:
+ * - Unlike {@link java.io.BufferedInputStream#skip(long)} this class could be configured to not push skip() to
+ * input stream. We may want to avoid pushing this to input stream because it's implementation maybe inefficient,
+ * e.g. the case of ZstdInputStream which allocates a new buffer from buffer pool, per skip call.
+ * - Unlike {@link java.io.BufferedInputStream}, which allocates an intermediate buffer, this uses a buffer supplier to
+ * create the intermediate buffer.
+ * <p>
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this.
+ * - the implementation of this class is performance sensitive. Minor changes such as usage of ByteBuffer instead of byte[]
+ * can significantly impact performance, hence, proceed with caution.
+ */
+public class ChunkedBytesStream extends FilterInputStream {
+    /**
+     * Supplies the ByteBuffer which is used as intermediate buffer to store the chunk of output data.
+     */
+    private final BufferSupplier bufferSupplier;
+    /**
+     * Intermediate buffer to store the chunk of output data. The ChunkedBytesStream is considered closed if
+     * this buffer is null.
+     */
+    private byte[] intermediateBuf;
+    /**
+     * The index one greater than the index of the last valid byte in
+     * the buffer.
+     * This value is always in the range <code>0</code> through <code>intermediateBuf.length</code>;
+     * elements <code>intermediateBuf[0]</code>  through <code>intermediateBuf[count-1]
+     * </code>contain buffered input data obtained
+     * from the underlying  input stream.
+     */
+    protected int count = 0;
+    /**
+     * The current position in the buffer. This is the index of the next
+     * character to be read from the <code>buf</code> array.
+     * <p>
+     * This value is always in the range <code>0</code>
+     * through <code>count</code>. If it is less
+     * than <code>count</code>, then  <code>intermediateBuf[pos]</code>
+     * is the next byte to be supplied as input;
+     * if it is equal to <code>count</code>, then
+     * the  next <code>read</code> or <code>skip</code>
+     * operation will require more bytes to be
+     * read from the contained  input stream.
+     */
+    protected int pos = 0;
+    /**
+     * Reference for the intermediate buffer. This reference is only kept for releasing the buffer from the
+     * buffer supplier.
+     */
+    private final ByteBuffer intermediateBufRef;
+    /**
+     * Determines if the skip be pushed down
+     */
+    private final boolean pushSkipToSourceStream;

Review Comment:
   Your understanding is correct.
   
   Renamed the variable to `delegateSkipToSourceStream` and updated the comment.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1162921002


##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks of configurable size. The

Review Comment:
   yes. This is fixed in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1172892464


##########
clients/src/test/java/org/apache/kafka/common/utils/SkippableChunkedBytesStreamTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SkippableChunkedBytesStreamTest {
+    private static final Random RANDOM = new Random(1337);
+    private final BufferSupplier supplier = BufferSupplier.NO_CACHING;
+
+    @ParameterizedTest
+    @MethodSource("provideSourceSkipValuesForTest")
+    public void skip_testCorrectness(int bytesToPreRead, ByteBuffer inputBuf, int numBytesToSkip) throws IOException {
+        int expectedInpLeftAfterSkip = inputBuf.remaining() - bytesToPreRead - numBytesToSkip;
+        int expectedSkippedBytes = Math.min(inputBuf.remaining() - bytesToPreRead, numBytesToSkip);
+
+        try (BytesStream is = new ChunkedBytesStream(new ByteBufferInputStream(inputBuf.duplicate()), supplier, 10)) {
+            int cnt = 0;
+            while (cnt++ < bytesToPreRead) {
+                is.readByte();
+            }
+
+            int res = is.skipBytes(numBytesToSkip);
+            assertEquals(expectedSkippedBytes, res);
+
+            // verify that we are able to read rest of the input
+            cnt = 0;
+            while (cnt++ < expectedInpLeftAfterSkip) {
+                is.readByte();
+            }
+        }
+    }
+
+    @Test
+    public void skip_testEndOfSource() throws IOException {

Review Comment:
   Fixed in latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1176575715


##########
clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java:
##########
@@ -29,6 +30,11 @@ public ByteBufferInputStream(ByteBuffer buffer) {
         this.buffer = buffer;
     }
 
+    @Override
+    public int available() throws IOException {

Review Comment:
   I have added tests now which fail prior to the fix and are successful after it. I caught this bug when the read() of an InputStream calls in.available() and it was returning 0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1172884561


##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call crosses JNI boundary.
+ * <p>
+ * The functionality of this stream is a combination of DataInput and BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()

Review Comment:
   Fixed.



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call crosses JNI boundary.
+ * <p>
+ * The functionality of this stream is a combination of DataInput and BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()
+ * - Unlike BufferedInputStream, which allocates an intermediate buffer, this uses a buffer supplier to create the
+ * intermediate buffer
+ * - Unlike DataInputStream, the readByte method does not push the reading of a byte to sourceStream.
+ * <p>
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this.
+ * - many method are un-supported in this class because they aren't currently used in the caller code.

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1172881788


##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -90,8 +95,13 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         }
 
         @Override
-        public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
-            return SnappyFactory.wrapForInput(buffer);
+        public BytesStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
+            return new SkippableChunkedBytesStream(SnappyFactory.wrapForInput(buffer), decompressionBufferSupplier, getRecommendedDOutSize());
+        }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 8 * 1024; // 8KB

Review Comment:
   I honestly don't remember now. I changed it back to 2KB and benchmarked again, it didn't change anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon merged pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon merged PR #13135:
URL: https://github.com/apache/kafka/pull/13135


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org