You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ijuma (via GitHub)" <gi...@apache.org> on 2023/04/05 14:48:00 UTC

[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1158577698


##########
clients/src/main/java/org/apache/kafka/common/utils/BytesStreamBufferSource.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides a BytesStream wrapper over a buffer.
+ */
+public class BytesStreamBufferSource implements BytesStream {
+    final private ByteBuffer buf;
+
+    public BytesStreamBufferSource(final ByteBuffer buffer) {
+        // we do not modify the markers of source buffer
+        buf = buffer.duplicate();
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+        return buf.get() & 0xFF;
+    }
+
+    @Override
+    public int skipBytes(int toSkip) {
+        if (toSkip <= 0) {
+            return 0;
+        }
+
+        int avail = Math.min(toSkip, buf.remaining());
+        buf.position(buf.position() + avail);
+        return avail;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+
+        len = Math.min(len, buf.remaining());
+        buf.get(b, off, len);
+        return len;
+    }
+
+    @Override
+    public byte readByte() throws IOException {
+        try {
+            return buf.get();

Review Comment:
   Would it be better to call `read` and check for `-1`? That would result in one exception (and resulting stacktrace) vs two.



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call crosses JNI boundary.
+ * <p>
+ * The functionality of this stream is a combination of DataInput and BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()

Review Comment:
   Incomplete sentence?



##########
clients/src/main/java/org/apache/kafka/common/utils/SkippableChunkedBytesStream.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * SkippableChunkedBytesStream is a variant of ChunkedBytesStream which does not push skip() to the sourceStream.
+ * <p>
+ * Unlike BufferedInputStream.skip() and ChunkedBytesStream.skip(), this does not push skip() to sourceStream.
+ * We want to avoid pushing this to sourceStream because it's implementation maybe inefficient, e.g. the case of Z
+ * stdInputStream which allocates a new buffer from buffer pool, per skip call.
+ *
+ * @see ChunkedBytesStream
+ */
+public class SkippableChunkedBytesStream extends ChunkedBytesStream {

Review Comment:
   Would it be simpler to make this behavior configurable in `ChunkedBytesStream`? Also, I thought the zstd issue was fixed?



##########
clients/src/test/java/org/apache/kafka/common/utils/SkippableChunkedBytesStreamTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SkippableChunkedBytesStreamTest {
+    private static final Random RANDOM = new Random(1337);
+    private final BufferSupplier supplier = BufferSupplier.NO_CACHING;
+
+    @ParameterizedTest
+    @MethodSource("provideSourceSkipValuesForTest")
+    public void skip_testCorrectness(int bytesToPreRead, ByteBuffer inputBuf, int numBytesToSkip) throws IOException {
+        int expectedInpLeftAfterSkip = inputBuf.remaining() - bytesToPreRead - numBytesToSkip;
+        int expectedSkippedBytes = Math.min(inputBuf.remaining() - bytesToPreRead, numBytesToSkip);
+
+        try (BytesStream is = new ChunkedBytesStream(new ByteBufferInputStream(inputBuf.duplicate()), supplier, 10)) {
+            int cnt = 0;
+            while (cnt++ < bytesToPreRead) {
+                is.readByte();
+            }
+
+            int res = is.skipBytes(numBytesToSkip);
+            assertEquals(expectedSkippedBytes, res);
+
+            // verify that we are able to read rest of the input
+            cnt = 0;
+            while (cnt++ < expectedInpLeftAfterSkip) {
+                is.readByte();
+            }
+        }
+    }
+
+    @Test
+    public void skip_testEndOfSource() throws IOException {

Review Comment:
   We don't typically use this kind of naming convention in our tests (i.e. using `_`).



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -64,17 +66,20 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         }
 
         @Override
-        public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
+        public BytesStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                // Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to
-                // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small
-                // number of bytes (potentially a single byte)
-                return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024),
-                        16 * 1024);
+                // Set output buffer (uncompressed) to 16 KB and Set input buffer (compressed) to 8 KB (0.5 KB by default) to ensure reasonable performance in cases

Review Comment:
   16 KB comes from `getRecommendedDOutSize` now, it's a bit confusing to have that part of the comment here.



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -108,12 +117,18 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         @Override
         public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
-                                                    messageVersion == RecordBatch.MAGIC_VALUE_V0);
+                return new ChunkedDataInputStream(
+                    new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier, messageVersion == RecordBatch.MAGIC_VALUE_V0),
+                    decompressionBufferSupplier, getRecommendedDOutSize());
             } catch (Throwable e) {
                 throw new KafkaException(e);
             }
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 2 * 1024; // 2KB

Review Comment:
   It's worth adding a comment in the method itself - it will be hard to remember otherwise.



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -90,8 +95,13 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         }
 
         @Override
-        public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
-            return SnappyFactory.wrapForInput(buffer);
+        public BytesStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
+            return new SkippableChunkedBytesStream(SnappyFactory.wrapForInput(buffer), decompressionBufferSupplier, getRecommendedDOutSize());
+        }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 8 * 1024; // 8KB

Review Comment:
   Why is this 8KB while the lz4 one is 2KB?



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks of configurable size. The

Review Comment:
   What is a `ByteReader`? Did you mean `BytesStream`?



##########
clients/src/main/java/org/apache/kafka/common/utils/BytesStreamBufferSource.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides a BytesStream wrapper over a buffer.
+ */
+public class BytesStreamBufferSource implements BytesStream {

Review Comment:
   Maybe call this `ByteBufferBytesStream`?



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call crosses JNI boundary.
+ * <p>
+ * The functionality of this stream is a combination of DataInput and BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()
+ * - Unlike BufferedInputStream, which allocates an intermediate buffer, this uses a buffer supplier to create the
+ * intermediate buffer
+ * - Unlike DataInputStream, the readByte method does not push the reading of a byte to sourceStream.
+ * <p>
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this.
+ * - many method are un-supported in this class because they aren't currently used in the caller code.
+ * - the implementation of this class is performance sensitive. Minor changes as usage of ByteBuffer instead of byte[]
+ *   can significantly impact performance, hence, proceed with caution.
+ */
+public class ChunkedBytesStream implements BytesStream {
+    /**
+     * Supplies the ByteBuffer which is used as intermediate buffer to store the chunk of output data.
+     */
+    private final BufferSupplier bufferSupplier;
+    /**
+     * Source stream containing compressed data.
+     */
+    private InputStream sourceStream;
+    /**
+     * Intermediate buffer to store the chunk of output data. The ChunkedBytesStream is considered closed if
+     * this buffer is null.
+     */
+    private byte[] intermediateBuf;
+    protected int limit;
+    /**
+     *
+     */
+    protected int pos;
+    /**
+     * Reference for the intermediate buffer. This reference is only kept for releasing the buffer from the '
+     * buffer supplier.
+     */
+    private ByteBuffer intermediateBufRef;
+
+
+    public ChunkedBytesStream(InputStream sourceStream, BufferSupplier bufferSupplier, int intermediateBufSize) {
+        this.bufferSupplier = bufferSupplier;
+        this.sourceStream = sourceStream;
+        intermediateBufRef = bufferSupplier.get(intermediateBufSize);
+        if (!intermediateBufRef.hasArray() || (intermediateBufRef.arrayOffset() != 0)) {
+            throw new IllegalArgumentException("provided ByteBuffer lacks array or has non-zero arrayOffset");
+        }
+        intermediateBuf = intermediateBufRef.array();
+    }
+
+    private byte[] getBufIfOpen() throws IOException {
+        byte[] buffer = intermediateBuf;
+        if (buffer == null)
+            throw new IOException("Stream closed");
+        return buffer;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (pos >= limit) {
+            fill();
+            if (pos >= limit)
+                return -1;
+        }
+
+        return getBufIfOpen()[pos++] & 0xff;
+    }
+
+    InputStream getInIfOpen() throws IOException {
+        InputStream input = sourceStream;
+        if (input == null)
+            throw new IOException("Stream closed");
+        return input;
+    }
+
+    /**
+     * Fills the intermediate buffer with more data. The amount of new data read is equal to the remaining empty space
+     * in the buffer. For optimal performance, read as much data as possible in this call.
+     */
+    int fill() throws IOException {
+        byte[] buffer = getBufIfOpen();
+
+        // switch to writing mode
+        pos = 0;
+        limit = pos;
+        int bytesRead = getInIfOpen().read(buffer, pos, buffer.length - pos);
+
+        if (bytesRead > 0)
+            limit = bytesRead + pos;
+
+        return bytesRead;
+    }
+
+    @Override
+    public void close() throws IOException {
+        byte[] mybuf = intermediateBuf;
+        intermediateBuf = null;
+
+        InputStream input = sourceStream;
+        sourceStream = null;
+
+        if (mybuf != null)
+            bufferSupplier.release(intermediateBufRef);
+        if (input != null)
+            input.close();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int totalRead = 0;
+        int bytesRead = 0;
+        while (totalRead < len) {
+            bytesRead = 0;
+            int toRead = len - totalRead;
+            if (pos >= limit) {
+                if (toRead >= getBufIfOpen().length) {
+                    // don't use intermediate buffer if we need to read more than it's capacity
+                    bytesRead = getInIfOpen().read(b, off + totalRead, toRead);
+                    if (bytesRead < 0)
+                        break;
+                } else {
+                    fill();
+                    if (pos >= limit)
+                        break;
+                }
+            } else {
+                int avail = limit - pos;
+                toRead = (avail < toRead) ? avail : toRead;
+                System.arraycopy(getBufIfOpen(), pos, b, off + totalRead, toRead);
+                pos += toRead;
+                bytesRead = toRead;
+            }
+
+            totalRead += bytesRead;
+        }
+
+        if ((bytesRead <= 0) && (totalRead < len))
+            return -1;
+
+        return totalRead;
+    }
+
+    @Override
+    public int skipBytes(int toSkip) throws IOException {
+        if (toSkip <= 0) {
+            return 0;
+        }
+        int totalSkipped = 0;
+
+        // Skip what exists in the intermediate buffer first
+        int avail = limit - pos;
+        int bytesToRead = (avail < (toSkip - totalSkipped)) ? avail : (toSkip - totalSkipped);
+        pos += bytesToRead;
+        totalSkipped += bytesToRead;
+
+        // Use sourceStream's skip() to skip the rest
+        while ((totalSkipped < toSkip) && ((bytesToRead = (int) getInIfOpen().skip(toSkip - totalSkipped)) > 0)) {
+            totalSkipped += bytesToRead;
+        }
+
+        return totalSkipped;
+    }
+
+    @Override
+    public byte readByte() throws IOException {
+        if (pos >= limit) {
+            fill();
+            if (pos >= limit)
+                throw new EOFException();
+        }
+        return getBufIfOpen()[pos++];
+    }
+
+    // visible for testing
+    public InputStream getSourceStream() {

Review Comment:
   Nit: we typically avoid the `get` prefix in Kafka.



##########
clients/src/main/java/org/apache/kafka/common/utils/BytesStream.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * This interface provides for reading bytes from an underlying source. The source could be a buffer or a stream.
+ * It extends the {@link Closeable} interface to ensure that the source is appropriately closed (if required).
+ */
+public interface BytesStream extends Closeable {
+    /**
+     * The interface is based on {@link InputStream#read()} and follows the same contract.

Review Comment:
   Can we have the description inline with a see also vs not having anything useful here? Same for the other methods.



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call crosses JNI boundary.
+ * <p>
+ * The functionality of this stream is a combination of DataInput and BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()
+ * - Unlike BufferedInputStream, which allocates an intermediate buffer, this uses a buffer supplier to create the
+ * intermediate buffer
+ * - Unlike DataInputStream, the readByte method does not push the reading of a byte to sourceStream.
+ * <p>
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this.
+ * - many method are un-supported in this class because they aren't currently used in the caller code.

Review Comment:
   Is this still true?



##########
raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java:
##########
@@ -215,14 +215,14 @@ private Batch<T> readBatch(DefaultRecordBatch batch) {
             }
 
             List<T> records = new ArrayList<>(numRecords);
-            DataInputStream input = new DataInputStream(batch.recordInputStream(bufferSupplier));
+            BytesStream input = batch.recordInputStream(bufferSupplier);
             try {
                 for (int i = 0; i < numRecords; i++) {
                     T record = readRecord(input, batch.sizeInBytes());
                     records.add(record);
                 }
             } finally {
-                Utils.closeQuietly(input, "DataInputStream");
+                Utils.closeQuietly(input, "RecordBatchBytesStream");

Review Comment:
   It's a bit odd to include this string when the type above is `BytesStream`.



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -157,7 +186,14 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu
      *                                    batch. As such, a supplier that reuses buffers will have a significant
      *                                    performance impact.
      */
-    public abstract InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier);
+    public abstract BytesStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier);
+
+    /**
+     * Recommended size of buffer for storing decompressed output.
+     */
+    public int getRecommendedDOutSize() {

Review Comment:
   We don't typically use `get` as a prefix for method. Also, this name is a bit obscure, could it be `decompressedOutputSize` or something like that?



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier);
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 16 * 1024; // 16KB

Review Comment:
   Also, can you please remind me if the 16 KB number is different from what was there before for this case?



##########
clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java:
##########
@@ -29,6 +30,11 @@ public ByteBufferInputStream(ByteBuffer buffer) {
         this.buffer = buffer;
     }
 
+    @Override
+    public int available() throws IOException {

Review Comment:
   Did any tests fail as a result of this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org