You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/03/27 16:33:31 UTC

kafka git commit: MINOR: Use method handles instead of reflection for creating Snappy and LZ4 streams

Repository: kafka
Updated Branches:
  refs/heads/trunk d348ac92c -> d27e09e60


MINOR: Use method handles instead of reflection for creating Snappy and LZ4 streams

1. Use Initialization-on-demand holder idiom that relies on JVM lazy-loading instead of explicit initialization check.
2. Method handles were designed to be faster than Core Reflection, particularly if the method handle can be stored in a static final field (the JVM can then optimise the call as if it was a regular method call). Since the code is of similar complexity (and simpler if we consider the whole PR), I am treating this as a clean-up instead of a performance improvement (which would require doing benchmarks).
3. Remove unused `ByteBufferReceive`.
4. I removed the snappy library from the classpath and verified that `CompressionTypeTest` (which uses LZ4) still passes. This shows that the right level of laziness is achieved even if we use one of the lazily loaded compression algorithms.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2740 from ijuma/use-method-handles-for-compressed-stream-supplier


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d27e09e6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d27e09e6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d27e09e6

Branch: refs/heads/trunk
Commit: d27e09e60c53520bd24872f8b7dbfee87b523d04
Parents: d348ac9
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Mar 27 09:28:46 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Mar 27 09:28:46 2017 -0700

----------------------------------------------------------------------
 .../kafka/common/network/ByteBufferReceive.java | 57 ------------
 .../kafka/common/record/CompressionType.java    | 94 +++++++-------------
 2 files changed, 33 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d27e09e6/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java
deleted file mode 100644
index 5b0af02..0000000
--- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.network;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ScatteringByteChannel;
-
-/**
- * A receive backed by an array of ByteBuffers
- */
-public class ByteBufferReceive implements Receive {
-
-    private final String source;
-    private final ByteBuffer[] buffers;
-    private int remaining;
-
-    public ByteBufferReceive(String source, ByteBuffer... buffers) {
-        super();
-        this.source = source;
-        this.buffers = buffers;
-        for (ByteBuffer buffer : buffers)
-            remaining += buffer.remaining();
-    }
-
-    @Override
-    public String source() {
-        return source;
-    }
-
-    @Override
-    public boolean complete() {
-        return remaining > 0;
-    }
-
-    @Override
-    public long readFrom(ScatteringByteChannel channel) throws IOException {
-        long read = channel.read(buffers);
-        remaining += read;
-        return read;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d27e09e6/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 093d5b3..a78c5a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -22,7 +22,9 @@ import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.lang.reflect.Constructor;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -66,8 +68,8 @@ public enum CompressionType {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) {
             try {
-                return (OutputStream) SNAPPY_OUTPUT_STREAM_SUPPLIER.get().newInstance(buffer, bufferSize);
-            } catch (Exception e) {
+                return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer, bufferSize);
+            } catch (Throwable e) {
                 throw new KafkaException(e);
             }
         }
@@ -75,8 +77,8 @@ public enum CompressionType {
         @Override
         public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) {
             try {
-                return (InputStream) SNAPPY_INPUT_STREAM_SUPPLIER.get().newInstance(buffer);
-            } catch (Exception e) {
+                return (InputStream) SnappyConstructors.INPUT.invoke(buffer);
+            } catch (Throwable e) {
                 throw new KafkaException(e);
             }
         }
@@ -86,9 +88,9 @@ public enum CompressionType {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) {
             try {
-                return (OutputStream) LZ4_OUTPUT_STREAM_SUPPLIER.get().newInstance(buffer,
+                return (OutputStream) LZ4Constructors.OUTPUT.invoke(buffer,
                         messageVersion == RecordBatch.MAGIC_VALUE_V0);
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 throw new KafkaException(e);
             }
         }
@@ -96,9 +98,9 @@ public enum CompressionType {
         @Override
         public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) {
             try {
-                return (InputStream) LZ4_INPUT_STREAM_SUPPLIER.get().newInstance(buffer,
+                return (InputStream) LZ4Constructors.INPUT.invoke(buffer,
                         messageVersion == RecordBatch.MAGIC_VALUE_V0);
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 throw new KafkaException(e);
             }
         }
@@ -146,64 +148,34 @@ public enum CompressionType {
             throw new IllegalArgumentException("Unknown compression name: " + name);
     }
 
-    // dynamically load the snappy and lz4 classes to avoid runtime dependency if we are not using compression
-    // caching constructors to avoid invoking of Class.forName method for each batch
-    private static final MemoizingConstructorSupplier SNAPPY_OUTPUT_STREAM_SUPPLIER = new MemoizingConstructorSupplier(new ConstructorSupplier() {
-        @Override
-        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
-            return Class.forName("org.xerial.snappy.SnappyOutputStream")
-                    .getConstructor(OutputStream.class, Integer.TYPE);
-        }
-    });
+    // Dynamically load the Snappy and LZ4 classes so that we only have a runtime dependency on compression algorithms
+    // that are used. This is important for platforms that are not supported by the underlying libraries.
+    // Note that we are using the initialization-on-demand holder idiom, so it's important that the initialisation
+    // is done in separate classes (one per compression type).
 
-    private static final MemoizingConstructorSupplier LZ4_OUTPUT_STREAM_SUPPLIER = new MemoizingConstructorSupplier(new ConstructorSupplier() {
-        @Override
-        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
-            return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream")
-                    .getConstructor(OutputStream.class, Boolean.TYPE);
-        }
-    });
+    private static class LZ4Constructors {
+        static final MethodHandle INPUT = findConstructor(
+                "org.apache.kafka.common.record.KafkaLZ4BlockInputStream",
+                MethodType.methodType(void.class, InputStream.class, Boolean.TYPE));
 
-    private static final MemoizingConstructorSupplier SNAPPY_INPUT_STREAM_SUPPLIER = new MemoizingConstructorSupplier(new ConstructorSupplier() {
-        @Override
-        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
-            return Class.forName("org.xerial.snappy.SnappyInputStream")
-                    .getConstructor(InputStream.class);
-        }
-    });
-
-    private static final MemoizingConstructorSupplier LZ4_INPUT_STREAM_SUPPLIER = new MemoizingConstructorSupplier(new ConstructorSupplier() {
-        @Override
-        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
-            return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
-                    .getConstructor(InputStream.class, Boolean.TYPE);
-        }
-    });
+        static final MethodHandle OUTPUT = findConstructor(
+                "org.apache.kafka.common.record.KafkaLZ4BlockOutputStream",
+                MethodType.methodType(void.class, OutputStream.class, Boolean.TYPE));
 
-    private interface ConstructorSupplier {
-        Constructor get() throws ClassNotFoundException, NoSuchMethodException;
     }
 
-    // this code is based on Guava's @see{com.google.common.base.Suppliers.MemoizingSupplier}
-    private static class MemoizingConstructorSupplier {
-        final ConstructorSupplier delegate;
-        transient volatile boolean initialized;
-        transient Constructor value;
-
-        public MemoizingConstructorSupplier(ConstructorSupplier delegate) {
-            this.delegate = delegate;
-        }
+    private static class SnappyConstructors {
+        static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream",
+                MethodType.methodType(void.class, InputStream.class));
+        static final MethodHandle OUTPUT = findConstructor("org.xerial.snappy.SnappyOutputStream",
+                MethodType.methodType(void.class, OutputStream.class, Integer.TYPE));
+    }
 
-        public Constructor get() throws NoSuchMethodException, ClassNotFoundException {
-            if (!initialized) {
-                synchronized (this) {
-                    if (!initialized) {
-                        value = delegate.get();
-                        initialized = true;
-                    }
-                }
-            }
-            return value;
+    private static MethodHandle findConstructor(String className, MethodType methodType) {
+        try {
+            return MethodHandles.publicLookup().findConstructor(Class.forName(className), methodType);
+        } catch (ReflectiveOperationException e) {
+            throw new RuntimeException(e);
         }
     }