You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/27 07:18:26 UTC

kafka git commit: KAFKA-2882: Add constructor cache for Snappy and LZ4 Output/Input streams in Compressor.java

Repository: kafka
Updated Branches:
  refs/heads/trunk c18a1bd64 -> 4a0e011be


KAFKA-2882: Add constructor cache for Snappy and LZ4 Output/Input streams in Compressor.java

In `wrapForOutput` and `wrapForInput` methods of `org.apache.kafka.common.record.Compressor`,  `Class.forName("[compression codec]")` and `getConstructor` methods are invoked for each `wrapForOutput` / `wrapForInput` call. Reflection calls are expensive and impact performance at high volumes. This patch adds a cache for `Constructor` to reduce the reflection overhead.

In our production deployments, this has reduced producer CPU usage by about 20%

Author: Maksim Logvinenko <ml...@gmail.com>

Reviewers: Ismael Juma

Closes #580 from logarithm/compressor-getclass-cache


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a0e011b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a0e011b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a0e011b

Branch: refs/heads/trunk
Commit: 4a0e011be3d038763d6326bb0092524f809c3f4d
Parents: c18a1bd
Author: Maksim Logvinenko <ml...@gmail.com>
Authored: Thu Nov 26 22:18:21 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 26 22:18:21 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/common/record/Compressor.java  | 87 ++++++++++++++++----
 1 file changed, 69 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4a0e011b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index 27f757a..1aee389 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import java.lang.reflect.Constructor;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.Utils;
 
@@ -46,6 +47,40 @@ public class Compressor {
         }
     }
 
+    // dynamically load the snappy and lz4 classes to avoid runtime dependency if we are not using compression
+    // caching constructors to avoid invoking of Class.forName method for each batch
+    private static MemoizingConstructorSupplier snappyOutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+            return Class.forName("org.xerial.snappy.SnappyOutputStream")
+                .getConstructor(OutputStream.class, Integer.TYPE);
+        }
+    });
+
+    private static MemoizingConstructorSupplier lz4OutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+            return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream")
+                .getConstructor(OutputStream.class);
+        }
+    });
+
+    private static MemoizingConstructorSupplier snappyInputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+            return Class.forName("org.xerial.snappy.SnappyInputStream")
+                .getConstructor(InputStream.class);
+        }
+    });
+
+    private static MemoizingConstructorSupplier lz4InputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+            return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
+                .getConstructor(InputStream.class);
+        }
+    });
+
     private final CompressionType type;
     private final DataOutputStream appendStream;
     private final ByteBufferOutputStream bufferStream;
@@ -79,7 +114,7 @@ public class Compressor {
     public ByteBuffer buffer() {
         return bufferStream.buffer();
     }
-    
+
     public double compressionRate() {
         ByteBuffer buffer = bufferStream.buffer();
         if (this.writtenUncompressed == 0)
@@ -209,21 +244,15 @@ public class Compressor {
                 case GZIP:
                     return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
                 case SNAPPY:
-                    // dynamically load the snappy class to avoid runtime dependency
-                    // on snappy if we are not using it
                     try {
-                        Class<?> outputStreamClass = Class.forName("org.xerial.snappy.SnappyOutputStream");
-                        OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class, Integer.TYPE)
-                            .newInstance(buffer, bufferSize);
+                        OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
                         return new DataOutputStream(stream);
                     } catch (Exception e) {
                         throw new KafkaException(e);
                     }
                 case LZ4:
                     try {
-                        Class<?> outputStreamClass = Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream");
-                        OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class)
-                            .newInstance(buffer);
+                        OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
                         return new DataOutputStream(stream);
                     } catch (Exception e) {
                         throw new KafkaException(e);
@@ -244,22 +273,15 @@ public class Compressor {
                 case GZIP:
                     return new DataInputStream(new GZIPInputStream(buffer));
                 case SNAPPY:
-                    // dynamically load the snappy class to avoid runtime dependency
-                    // on snappy if we are not using it
                     try {
-                        Class<?> inputStreamClass = Class.forName("org.xerial.snappy.SnappyInputStream");
-                        InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
-                            .newInstance(buffer);
+                        InputStream stream = (InputStream) snappyInputStreamSupplier.get().newInstance(buffer);
                         return new DataInputStream(stream);
                     } catch (Exception e) {
                         throw new KafkaException(e);
                     }
                 case LZ4:
-                    // dynamically load LZ4 class to avoid runtime dependency
                     try {
-                        Class<?> inputStreamClass = Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream");
-                        InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
-                            .newInstance(buffer);
+                        InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer);
                         return new DataInputStream(stream);
                     } catch (Exception e) {
                         throw new KafkaException(e);
@@ -271,4 +293,33 @@ public class Compressor {
             throw new KafkaException(e);
         }
     }
+
+    private interface ConstructorSupplier {
+        Constructor get() throws ClassNotFoundException, NoSuchMethodException;
+    }
+
+    // this code is based on Guava's @see{com.google.common.base.Suppliers.MemoizingSupplier}
+    private static class MemoizingConstructorSupplier {
+        final ConstructorSupplier delegate;
+        transient volatile boolean initialized;
+        transient Constructor value;
+
+        public MemoizingConstructorSupplier(ConstructorSupplier delegate) {
+            this.delegate = delegate;
+        }
+
+        public Constructor get() throws NoSuchMethodException, ClassNotFoundException {
+            if (!initialized) {
+                synchronized (this) {
+                    if (!initialized) {
+                        Constructor constructor = delegate.get();
+                        value = constructor;
+                        initialized = true;
+                        return constructor;
+                    }
+                }
+            }
+            return value;
+        }
+    }
 }