You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/27 07:18:26 UTC
kafka git commit: KAFKA-2882: Add constructor cache for Snappy and
LZ4 Output/Input streams in Compressor.java
Repository: kafka
Updated Branches:
refs/heads/trunk c18a1bd64 -> 4a0e011be
KAFKA-2882: Add constructor cache for Snappy and LZ4 Output/Input streams in Compressor.java
In `wrapForOutput` and `wrapForInput` methods of `org.apache.kafka.common.record.Compressor`, `Class.forName("[compression codec]")` and `getConstructor` methods are invoked for each `wrapForOutput` / `wrapForInput` call. Reflection calls are expensive and impact performance at high volumes. This patch adds a cache for `Constructor` to reduce the reflection overhead.
In our production deployments, this has reduced producer CPU usage by about 20%
Author: Maksim Logvinenko <ml...@gmail.com>
Reviewers: Ismael Juma
Closes #580 from logarithm/compressor-getclass-cache
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a0e011b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a0e011b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a0e011b
Branch: refs/heads/trunk
Commit: 4a0e011be3d038763d6326bb0092524f809c3f4d
Parents: c18a1bd
Author: Maksim Logvinenko <ml...@gmail.com>
Authored: Thu Nov 26 22:18:21 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 26 22:18:21 2015 -0800
----------------------------------------------------------------------
.../apache/kafka/common/record/Compressor.java | 87 ++++++++++++++++----
1 file changed, 69 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a0e011b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index 27f757a..1aee389 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;
+import java.lang.reflect.Constructor;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Utils;
@@ -46,6 +47,40 @@ public class Compressor {
}
}
+ // dynamically load the snappy and lz4 classes to avoid runtime dependency if we are not using compression
+ // caching constructors to avoid invoking of Class.forName method for each batch
+ private static MemoizingConstructorSupplier snappyOutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+ @Override
+ public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+ return Class.forName("org.xerial.snappy.SnappyOutputStream")
+ .getConstructor(OutputStream.class, Integer.TYPE);
+ }
+ });
+
+ private static MemoizingConstructorSupplier lz4OutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+ @Override
+ public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+ return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream")
+ .getConstructor(OutputStream.class);
+ }
+ });
+
+ private static MemoizingConstructorSupplier snappyInputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+ @Override
+ public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+ return Class.forName("org.xerial.snappy.SnappyInputStream")
+ .getConstructor(InputStream.class);
+ }
+ });
+
+ private static MemoizingConstructorSupplier lz4InputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+ @Override
+ public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+ return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
+ .getConstructor(InputStream.class);
+ }
+ });
+
private final CompressionType type;
private final DataOutputStream appendStream;
private final ByteBufferOutputStream bufferStream;
@@ -79,7 +114,7 @@ public class Compressor {
public ByteBuffer buffer() {
return bufferStream.buffer();
}
-
+
public double compressionRate() {
ByteBuffer buffer = bufferStream.buffer();
if (this.writtenUncompressed == 0)
@@ -209,21 +244,15 @@ public class Compressor {
case GZIP:
return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
case SNAPPY:
- // dynamically load the snappy class to avoid runtime dependency
- // on snappy if we are not using it
try {
- Class<?> outputStreamClass = Class.forName("org.xerial.snappy.SnappyOutputStream");
- OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class, Integer.TYPE)
- .newInstance(buffer, bufferSize);
+ OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
case LZ4:
try {
- Class<?> outputStreamClass = Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream");
- OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class)
- .newInstance(buffer);
+ OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
@@ -244,22 +273,15 @@ public class Compressor {
case GZIP:
return new DataInputStream(new GZIPInputStream(buffer));
case SNAPPY:
- // dynamically load the snappy class to avoid runtime dependency
- // on snappy if we are not using it
try {
- Class<?> inputStreamClass = Class.forName("org.xerial.snappy.SnappyInputStream");
- InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
- .newInstance(buffer);
+ InputStream stream = (InputStream) snappyInputStreamSupplier.get().newInstance(buffer);
return new DataInputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
case LZ4:
- // dynamically load LZ4 class to avoid runtime dependency
try {
- Class<?> inputStreamClass = Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream");
- InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
- .newInstance(buffer);
+ InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer);
return new DataInputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
@@ -271,4 +293,33 @@ public class Compressor {
throw new KafkaException(e);
}
}
+
+ private interface ConstructorSupplier {
+ Constructor get() throws ClassNotFoundException, NoSuchMethodException;
+ }
+
+ // this code is based on Guava's @see{com.google.common.base.Suppliers.MemoizingSupplier}
+ private static class MemoizingConstructorSupplier {
+ final ConstructorSupplier delegate;
+ transient volatile boolean initialized;
+ transient Constructor value;
+
+ public MemoizingConstructorSupplier(ConstructorSupplier delegate) {
+ this.delegate = delegate;
+ }
+
+ public Constructor get() throws NoSuchMethodException, ClassNotFoundException {
+ if (!initialized) {
+ synchronized (this) {
+ if (!initialized) {
+ Constructor constructor = delegate.get();
+ value = constructor;
+ initialized = true;
+ return constructor;
+ }
+ }
+ }
+ return value;
+ }
+ }
}