You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "divijvaidya (via GitHub)" <gi...@apache.org> on 2023/02/08 13:08:47 UTC

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1100085805


##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -108,12 +117,18 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         @Override
         public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
-                                                    messageVersion == RecordBatch.MAGIC_VALUE_V0);
+                return new ChunkedDataInputStream(

Review Comment:
   Note to reviewer:
   
   We use `ChunkedDataInputStream` here instead of `SkippableChunkedDataInputStream` because we wish to push down the `skip()` implementation to KafkaLZ4BlockInputStream since it is optimized (unlike `ZstdInputStream#skip()` where we have to use our custom implementation of skip in `SkippableChunkedDataInputStream`)



##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##########
@@ -270,30 +270,28 @@ public int partitionLeaderEpoch() {
         return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET);
     }
 
-    public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
+    public InputStream recordInputStream(BufferSupplier bufferSupplier) {
         final ByteBuffer buffer = this.buffer.duplicate();
         buffer.position(RECORDS_OFFSET);
-        return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier));
+        InputStream is = compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+        return (is instanceof DataInput) ? is : new DataInputStream(is);
     }
 
     private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) {
-        final DataInputStream inputStream = recordInputStream(bufferSupplier);
+        final InputStream inputStream = recordInputStream(bufferSupplier);
 
         if (skipKeyValue) {
-            // this buffer is used to skip length delimited fields like key, value, headers
-            byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];

Review Comment:
   Note to reviewers
   
   Removal of this intermediate buffer affects all compression types. For Zstd and the ones which were already using an intermediate buffer, this is an optimisation since we reduce buffer allocation and data copy. For the ones which weren't using an intermediate buffer, we started using intermediate buffer of size 2KB (similar to this) using ChunkedDataInputStream



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java:
##########
@@ -103,6 +104,12 @@ public void init() {
         }
     }
 
+    @TearDown
+    public void cleanUp() {
+        if (requestLocal != null)
+            requestLocal.close();

Review Comment:
   Note to reviewers
   
   This benchmark has a flaw. We use `@Setup` and `@TearDown` at the default level of `Level.Trial`, which means this method will be executed before/after each run of the benchmark. A benchmark may be executed using multiple threads and hence, it is not guaranteed that each invocation of the benchmark will use the same BufferPool.
   
   I will fix this in a separate PR. This does not have negative impact on the results, at the worst, it will downplay the affect of re-using buffers in the code.



##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java:
##########
@@ -363,157 +361,79 @@ private static DefaultRecord readFrom(ByteBuffer buffer,
     }
 
     public static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                         byte[] skipArray,
                                                          long baseOffset,
                                                          long baseTimestamp,
                                                          int baseSequence,
                                                          Long logAppendTime) throws IOException {
         int sizeOfBodyInBytes = ByteUtils.readVarint(input);
         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
 
-        return readPartiallyFrom(input, skipArray, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
+        return readPartiallyFrom(input, totalSizeInBytes, baseOffset, baseTimestamp,
             baseSequence, logAppendTime);
     }
 
     private static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                          byte[] skipArray,
                                                           int sizeInBytes,
-                                                          int sizeOfBodyInBytes,
                                                           long baseOffset,
                                                           long baseTimestamp,
                                                           int baseSequence,
                                                           Long logAppendTime) throws IOException {
-        ByteBuffer skipBuffer = ByteBuffer.wrap(skipArray);
-        // set its limit to 0 to indicate no bytes readable yet
-        skipBuffer.limit(0);
-
         try {
-            // reading the attributes / timestamp / offset and key-size does not require
-            // any byte array allocation and therefore we can just read them straight-forwardly
-            IntRef bytesRemaining = PrimitiveRef.ofInt(sizeOfBodyInBytes);
-
-            byte attributes = readByte(skipBuffer, input, bytesRemaining);
-            long timestampDelta = readVarLong(skipBuffer, input, bytesRemaining);
+            byte attributes = input.readByte();
+            long timestampDelta = ByteUtils.readVarlong(input);
             long timestamp = baseTimestamp + timestampDelta;
             if (logAppendTime != null)
                 timestamp = logAppendTime;
 
-            int offsetDelta = readVarInt(skipBuffer, input, bytesRemaining);
+            int offsetDelta = ByteUtils.readVarint(input);
             long offset = baseOffset + offsetDelta;
             int sequence = baseSequence >= 0 ?
                 DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) :
                 RecordBatch.NO_SEQUENCE;
 
-            // first skip key
-            int keySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+            // skip key
+            int keySize = ByteUtils.readVarint(input);
+            skipBytes(input, keySize);
 
-            // then skip value
-            int valueSize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+            // skip value
+            int valueSize = ByteUtils.readVarint(input);
+            skipBytes(input, valueSize);
 
-            // then skip header
-            int numHeaders = readVarInt(skipBuffer, input, bytesRemaining);
+            // skip header
+            int numHeaders = ByteUtils.readVarint(input);
             if (numHeaders < 0)
                 throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
             for (int i = 0; i < numHeaders; i++) {
-                int headerKeySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerKeySize = ByteUtils.readVarint(input);
                 if (headerKeySize < 0)
                     throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
+                skipBytes(input, headerKeySize);
 
                 // headerValueSize
-                skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerValueSize = ByteUtils.readVarint(input);
+                skipBytes(input, headerValueSize);
             }
 
-            if (bytesRemaining.value > 0 || skipBuffer.remaining() > 0)
-                throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
-                    " bytes in record payload, but there are still bytes remaining");

Review Comment:
   Note for reviewer:
   It is ok to remove this validation because we perform a similar validation at the end of every record. see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L608



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -108,12 +117,18 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         @Override
         public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
-                                                    messageVersion == RecordBatch.MAGIC_VALUE_V0);
+                return new ChunkedDataInputStream(
+                    new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier, messageVersion == RecordBatch.MAGIC_VALUE_V0),
+                    decompressionBufferSupplier, getRecommendedDOutSize());
             } catch (Throwable e) {
                 throw new KafkaException(e);
             }
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 2 * 1024; // 2KB

Review Comment:
   The size of 2KB is based on the size of `skipArray` which has now been removed in this PR.



##########
clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java:
##########
@@ -29,6 +30,11 @@ public ByteBufferInputStream(ByteBuffer buffer) {
         this.buffer = buffer;
     }
 
+    @Override
+    public int available() throws IOException {

Review Comment:
   Note to reviewer
   
   This was a bug. Prior to this, `available()` calls `InputStream` `available()` which always returns 0. This impacts the cases where we may rely on value of available() to determine end of input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org