You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/02/14 16:19:35 UTC

[kafka] 02/02: KAFKA-12327: Remove MethodHandle usage in CompressionType (#10123)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 55c751ecd2899dd8bfd9da9de01df5e9a4c7690d
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sun Feb 14 08:12:25 2021 -0800

    KAFKA-12327: Remove MethodHandle usage in CompressionType (#10123)
    
    We don't really need it and it causes problems in older Android versions
    and GraalVM native image usage (there are workarounds for the latter).
    
    Move the logic to separate classes that are only invoked when the
    relevant compression library is actually used. Place such classes
    in their own package and enforce via checkstyle that only these
    classes refer to compression library packages.
    
    To avoid cyclic dependencies, moved `BufferSupplier` to the `utils`
    package.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>
---
 checkstyle/import-control.xml                      | 12 +++-
 .../kafka/clients/consumer/internals/Fetcher.java  |  2 +-
 .../KafkaLZ4BlockInputStream.java                  | 11 +--
 .../KafkaLZ4BlockOutputStream.java                 |  2 +-
 .../kafka/common/compress/SnappyFactory.java       | 50 +++++++++++++
 .../apache/kafka/common/compress/ZstdFactory.java  | 58 +++++++++++++++
 .../common/record/AbstractLegacyRecordBatch.java   |  1 +
 .../kafka/common/record/CompressionType.java       | 82 +++++-----------------
 .../kafka/common/record/DefaultRecordBatch.java    |  1 +
 .../kafka/common/record/FileLogInputStream.java    |  1 +
 .../apache/kafka/common/record/MemoryRecords.java  |  1 +
 .../kafka/common/record/MutableRecordBatch.java    |  1 +
 .../apache/kafka/common/record/RecordBatch.java    |  1 +
 .../common/{record => utils}/BufferSupplier.java   |  2 +-
 .../clients/consumer/internals/FetcherTest.java    |  2 +-
 .../common/{record => compress}/KafkaLZ4Test.java  |  5 +-
 .../kafka/common/record/BufferSupplierTest.java    |  1 +
 .../kafka/common/record/CompressionTypeTest.java   |  3 +
 .../common/record/DefaultRecordBatchTest.java      |  1 +
 .../kafka/common/record/MemoryRecordsTest.java     |  1 +
 core/src/main/scala/kafka/log/LogCleaner.scala     |  3 +-
 core/src/main/scala/kafka/log/LogSegment.scala     |  3 +-
 core/src/main/scala/kafka/log/LogValidator.scala   |  5 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  3 +-
 gradle/spotbugs-exclude.xml                        |  2 +-
 .../kafka/jmh/record/BaseRecordBatchBenchmark.java |  2 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  2 +-
 .../kafka/raft/internals/RecordsBatchReader.java   |  2 +-
 .../raft/internals/RecordsBatchReaderTest.java     |  2 +-
 .../apache/kafka/snapshot/FileRawSnapshotTest.java |  2 +-
 .../apache/kafka/snapshot/SnapshotWriterTest.java  |  2 +-
 31 files changed, 171 insertions(+), 95 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b658370..bc0491e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -69,6 +69,15 @@
       <allow pkg="org.apache.kafka.common.metrics" />
     </subpackage>
 
+    <!-- Third-party compression libraries should only be references from this package -->
+    <subpackage name="compress">
+      <allow pkg="com.github.luben.zstd" />
+      <allow pkg="net.jpountz.lz4" />
+      <allow pkg="net.jpountz.xxhash" />
+      <allow pkg="org.apache.kafka.common.compress" />
+      <allow pkg="org.xerial.snappy" />
+    </subpackage>
+
     <subpackage name="message">
       <allow pkg="com.fasterxml.jackson" />
       <allow pkg="org.apache.kafka.common.protocol" />
@@ -144,7 +153,7 @@
     </subpackage>
 
     <subpackage name="record">
-      <allow pkg="net.jpountz" />
+      <allow pkg="org.apache.kafka.common.compress" />
       <allow pkg="org.apache.kafka.common.header" />
       <allow pkg="org.apache.kafka.common.record" />
       <allow pkg="org.apache.kafka.common.message" />
@@ -152,7 +161,6 @@
       <allow pkg="org.apache.kafka.common.protocol" />
       <allow pkg="org.apache.kafka.common.protocol.types" />
       <allow pkg="org.apache.kafka.common.errors" />
-      <allow pkg="com.github.luben.zstd" />
     </subpackage>
 
     <subpackage name="header">
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index f71d2c4..78b26e6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -62,7 +62,7 @@ import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.metrics.stats.WindowedCount;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.BufferSupplier;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java
similarity index 95%
rename from clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
rename to clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java
index 850b1e9..85e7f7b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.record;
+package org.apache.kafka.common.compress;
 
 import net.jpountz.lz4.LZ4Exception;
 import net.jpountz.lz4.LZ4Factory;
@@ -22,16 +22,17 @@ import net.jpountz.lz4.LZ4SafeDecompressor;
 import net.jpountz.xxhash.XXHash32;
 import net.jpountz.xxhash.XXHashFactory;
 
-import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
-import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
+import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.BD;
+import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.FLG;
+import org.apache.kafka.common.utils.BufferSupplier;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
+import static org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.MAGIC;
 
 /**
  * A partial implementation of the v1.5.1 LZ4 Frame format.
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockOutputStream.java
similarity index 99%
rename from clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
rename to clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockOutputStream.java
index 591ab16..5c5aee4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockOutputStream.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.record;
+package org.apache.kafka.common.compress;
 
 import java.io.IOException;
 import java.io.OutputStream;
diff --git a/clients/src/main/java/org/apache/kafka/common/compress/SnappyFactory.java b/clients/src/main/java/org/apache/kafka/common/compress/SnappyFactory.java
new file mode 100644
index 0000000..b56273d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/compress/SnappyFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class SnappyFactory {
+
+    private SnappyFactory() { }
+
+    public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) {
+        try {
+            return new SnappyOutputStream(buffer);
+        } catch (Throwable e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public static InputStream wrapForInput(ByteBuffer buffer) {
+        try {
+            return new SnappyInputStream(new ByteBufferInputStream(buffer));
+        } catch (Throwable e) {
+            throw new KafkaException(e);
+        }
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java b/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java
new file mode 100644
index 0000000..8f4735e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.compress;
+
+import com.github.luben.zstd.RecyclingBufferPool;
+import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
+import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class ZstdFactory {
+
+    private ZstdFactory() { }
+
+    public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) {
+        try {
+            // Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
+            // in cases where the caller passes a small number of bytes to write (potentially a single byte).
+            return new BufferedOutputStream(new ZstdOutputStreamNoFinalizer(buffer, RecyclingBufferPool.INSTANCE), 16 * 1024);
+        } catch (Throwable e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public static InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
+        try {
+            // Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
+            // in cases where the caller reads a small number of bytes (potentially a single byte).
+            return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer),
+                RecyclingBufferPool.INSTANCE), 16 * 1024);
+        } catch (Throwable e) {
+            throw new KafkaException(e);
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 8363764..59b2c683 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.CloseableIterator;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index c2694ca..1b9754f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -16,9 +16,12 @@
  */
 package org.apache.kafka.common.record;
 
-import com.github.luben.zstd.BufferPool;
-import com.github.luben.zstd.RecyclingBufferPool;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.compress.KafkaLZ4BlockInputStream;
+import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream;
+import org.apache.kafka.common.compress.SnappyFactory;
+import org.apache.kafka.common.compress.ZstdFactory;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
@@ -26,9 +29,6 @@ import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.lang.invoke.MethodHandle;
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.MethodType;
 import java.nio.ByteBuffer;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
@@ -49,6 +49,7 @@ public enum CompressionType {
         }
     },
 
+    // Shipped with the JDK
     GZIP(1, "gzip", 1.0f) {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
@@ -76,23 +77,21 @@ public enum CompressionType {
         }
     },
 
+    // We should only load classes from a given compression library when we actually use said compression library. This
+    // is because compression libraries include native code for a set of platforms and we want to avoid errors
+    // in case the platform is not supported and the compression library is not actually used.
+    // To ensure this, we only reference compression library code from classes that are only invoked when actual usage
+    // happens.
+
     SNAPPY(2, "snappy", 1.0f) {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
-            try {
-                return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer);
-            } catch (Throwable e) {
-                throw new KafkaException(e);
-            }
+            return SnappyFactory.wrapForOutput(buffer);
         }
 
         @Override
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
-            try {
-                return (InputStream) SnappyConstructors.INPUT.invoke(new ByteBufferInputStream(buffer));
-            } catch (Throwable e) {
-                throw new KafkaException(e);
-            }
+            return SnappyFactory.wrapForInput(buffer);
         }
     },
 
@@ -120,28 +119,12 @@ public enum CompressionType {
     ZSTD(4, "zstd", 1.0f) {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
-            try {
-                // Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
-                // in cases where the caller passes a small number of bytes to write (potentially a single byte).
-                // It's ok to reference `RecyclingBufferPool` since it doesn't load any native libraries
-                return new BufferedOutputStream((OutputStream) ZstdConstructors.OUTPUT.invoke(buffer, RecyclingBufferPool.INSTANCE),
-                    16 * 1024);
-            } catch (Throwable e) {
-                throw new KafkaException(e);
-            }
+            return ZstdFactory.wrapForOutput(buffer);
         }
 
         @Override
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
-            try {
-                // Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
-                // in cases where the caller reads a small number of bytes (potentially a single byte).
-                // It's ok to reference `RecyclingBufferPool` since it doesn't load any native libraries.
-                return new BufferedInputStream((InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer),
-                    RecyclingBufferPool.INSTANCE), 16 * 1024);
-            } catch (Throwable e) {
-                throw new KafkaException(e);
-            }
+            return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier);
         }
     };
 
@@ -207,37 +190,4 @@ public enum CompressionType {
         else
             throw new IllegalArgumentException("Unknown compression name: " + name);
     }
-
-    // We should only have a runtime dependency on compression algorithms in case the native libraries don't support
-    // some platforms.
-    //
-    // For Snappy and Zstd, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure
-    // they're only loaded if used.
-    //
-    // For LZ4 we are using org.apache.kafka classes, which should always be in the classpath, and would not trigger
-    // an error until KafkaLZ4BlockInputStream is initialized, which only happens if LZ4 is actually used.
-
-    private static class SnappyConstructors {
-        static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream",
-                MethodType.methodType(void.class, InputStream.class));
-        static final MethodHandle OUTPUT = findConstructor("org.xerial.snappy.SnappyOutputStream",
-                MethodType.methodType(void.class, OutputStream.class));
-    }
-
-    private static class ZstdConstructors {
-        // It's ok to reference `BufferPool` since it doesn't load any native libraries
-        static final MethodHandle INPUT = findConstructor("com.github.luben.zstd.ZstdInputStreamNoFinalizer",
-            MethodType.methodType(void.class, InputStream.class, BufferPool.class));
-        static final MethodHandle OUTPUT = findConstructor("com.github.luben.zstd.ZstdOutputStreamNoFinalizer",
-            MethodType.methodType(void.class, OutputStream.class, BufferPool.class));
-    }
-
-    private static MethodHandle findConstructor(String className, MethodType methodType) {
-        try {
-            return MethodHandles.publicLookup().findConstructor(Class.forName(className), methodType);
-        } catch (ReflectiveOperationException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 33709c0..62cab8f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.InvalidRecordException;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.CloseableIterator;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 15c09de..10837d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.record.AbstractLegacyRecordBatch.LegacyFileChannelRecordBatch;
 import org.apache.kafka.common.record.DefaultRecordBatch.DefaultFileChannelRecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Utils;
 
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 82a54af..7d14f67 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.message.LeaderChangeMessage;
 import org.apache.kafka.common.network.TransferableChannel;
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
 import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Time;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
index 8c0dc23..fc924b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.CloseableIterator;
 
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index 65a6a95..1cff7a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.CloseableIterator;
 
 import java.nio.ByteBuffer;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java b/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java
similarity index 99%
rename from clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java
rename to clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java
index 1a6c92c..1688d10 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.common.record;
+package org.apache.kafka.common.utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 7af9ede..2b9df62 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -65,7 +65,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.BufferSupplier;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.record.DefaultRecordBatch;
diff --git a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java
similarity index 98%
rename from clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
rename to clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java
index 5f35f7d..a03c830 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
+++ b/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java
@@ -14,10 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.record;
+package org.apache.kafka.common.compress;
 
 import net.jpountz.xxhash.XXHashFactory;
 
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -34,7 +35,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.stream.Stream;
 
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
diff --git a/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java b/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java
index 9ead288..e580be5 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.junit.jupiter.api.Test;
 
 import java.nio.ByteBuffer;
diff --git a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
index af696c3..16b560d 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.compress.KafkaLZ4BlockInputStream;
+import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.junit.jupiter.api.Test;
 
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
index 4e86906..9cd744a 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.InvalidRecordException;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index cab667a..ebac0bd 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.message.LeaderChangeMessage;
 import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.extension.ExtensionContext;
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 3225d9d..df9722c 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -21,7 +21,6 @@ import java.io.{File, IOException}
 import java.nio._
 import java.util.Date
 import java.util.concurrent.TimeUnit
-
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{BrokerReconfigurable, KafkaConfig, LogDirFailureChannel}
@@ -32,7 +31,7 @@ import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageExcep
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{BufferSupplier, Time}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ListBuffer
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index b43833d..37882ff 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -20,7 +20,6 @@ import java.io.{File, IOException}
 import java.nio.file.{Files, NoSuchFileException}
 import java.nio.file.attribute.FileTime
 import java.util.concurrent.TimeUnit
-
 import kafka.common.LogSegmentOffsetOverflowException
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.server.epoch.LeaderEpochFileCache
@@ -30,7 +29,7 @@ import org.apache.kafka.common.InvalidRecordException
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset}
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{BufferSupplier, Time}
 
 import scala.jdk.CollectionConverters._
 import scala.math._
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index b2e6222..056be10 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -17,19 +17,18 @@
 package kafka.log
 
 import java.nio.ByteBuffer
-
 import kafka.api.{ApiVersion, KAFKA_2_1_IV0}
 import kafka.common.{LongRef, RecordValidationException}
 import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec}
 import kafka.server.BrokerTopicStats
 import kafka.utils.Logging
 import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
-import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
+import org.apache.kafka.common.record.{AbstractRecords, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
 import org.apache.kafka.common.InvalidRecordException
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.ProduceResponse.RecordError
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{BufferSupplier, Time}
 
 import scala.collection.{Seq, mutable}
 import scala.jdk.CollectionConverters._
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index dab9eb1..1a953c5 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -23,7 +23,6 @@ import java.nio.file.{Files, Paths}
 import java.util.concurrent.{Callable, Executors}
 import java.util.regex.Pattern
 import java.util.{Collections, Optional, Properties}
-
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.common.{OffsetsOutOfOrderException, RecordValidationException, UnexpectedAppendOffsetException}
 import kafka.log.Log.DeleteDirSuffix
@@ -41,7 +40,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
 import org.easymock.EasyMock
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 3d0e0d2..ab60dfd 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -95,7 +95,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
 
     <!-- false positive in Java 11, related to https://github.com/spotbugs/spotbugs/issues/756 but more complex -->
     <Match>
-        <Class name="org.apache.kafka.common.record.KafkaLZ4BlockOutputStream"/>
+        <Class name="org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream"/>
         <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE"/>
     </Match>
 
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
index 834652e..30f908e 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
@@ -19,7 +19,7 @@ package org.apache.kafka.jmh.record;
 import kafka.server.BrokerTopicStats;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.record.AbstractRecords;
-import org.apache.kafka.common.record.BufferSupplier;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 09f8672..2823186 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -40,7 +40,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.BufferSupplier;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Records;
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
index 9a4c6b5..0817138 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
@@ -18,7 +18,7 @@ package org.apache.kafka.raft.internals;
 
 import org.apache.kafka.common.protocol.DataInputStreamReadable;
 import org.apache.kafka.common.protocol.Readable;
-import org.apache.kafka.common.record.BufferSupplier;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
index 7dc1769..78ffd51 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.raft.internals;
 
-import org.apache.kafka.common.record.BufferSupplier;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
diff --git a/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java b/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java
index 37dac9f..dc4f635 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.snapshot;
 
-import org.apache.kafka.common.record.BufferSupplier.GrowableBufferSupplier;
+import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java
index 35652c7..27bdff2 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java
@@ -22,7 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
-import org.apache.kafka.common.record.BufferSupplier.GrowableBufferSupplier;
+import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.raft.RaftClientTestContext;