You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/03/27 16:33:31 UTC
kafka git commit: MINOR: Use method handles instead of reflection for
creating Snappy and LZ4 streams
Repository: kafka
Updated Branches:
refs/heads/trunk d348ac92c -> d27e09e60
MINOR: Use method handles instead of reflection for creating Snappy and LZ4 streams
1. Use Initialization-on-demand holder idiom that relies on JVM lazy-loading instead of explicit initialization check.
2. Method handles were designed to be faster than Core Reflection, particularly if the method handle can be stored in a static final field (the JVM can then optimise the call as if it was a regular method call). Since the code is of similar complexity (and simpler if we consider the whole PR), I am treating this as a clean-up instead of a performance improvement (which would require doing benchmarks).
3. Remove unused `ByteBufferReceive`.
4. I removed the snappy library from the classpath and verified that `CompressionTypeTest` (which uses LZ4) still passes. This shows that the right level of laziness is achieved even if we use one of the lazily loaded compression algorithms.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #2740 from ijuma/use-method-handles-for-compressed-stream-supplier
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d27e09e6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d27e09e6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d27e09e6
Branch: refs/heads/trunk
Commit: d27e09e60c53520bd24872f8b7dbfee87b523d04
Parents: d348ac9
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Mar 27 09:28:46 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Mar 27 09:28:46 2017 -0700
----------------------------------------------------------------------
.../kafka/common/network/ByteBufferReceive.java | 57 ------------
.../kafka/common/record/CompressionType.java | 94 +++++++-------------
2 files changed, 33 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d27e09e6/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java
deleted file mode 100644
index 5b0af02..0000000
--- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.network;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ScatteringByteChannel;
-
-/**
- * A receive backed by an array of ByteBuffers
- */
-public class ByteBufferReceive implements Receive {
-
- private final String source;
- private final ByteBuffer[] buffers;
- private int remaining;
-
- public ByteBufferReceive(String source, ByteBuffer... buffers) {
- super();
- this.source = source;
- this.buffers = buffers;
- for (ByteBuffer buffer : buffers)
- remaining += buffer.remaining();
- }
-
- @Override
- public String source() {
- return source;
- }
-
- @Override
- public boolean complete() {
- return remaining > 0;
- }
-
- @Override
- public long readFrom(ScatteringByteChannel channel) throws IOException {
- long read = channel.read(buffers);
- remaining += read;
- return read;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d27e09e6/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 093d5b3..a78c5a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -22,7 +22,9 @@ import org.apache.kafka.common.utils.ByteBufferOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
-import java.lang.reflect.Constructor;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -66,8 +68,8 @@ public enum CompressionType {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) {
try {
- return (OutputStream) SNAPPY_OUTPUT_STREAM_SUPPLIER.get().newInstance(buffer, bufferSize);
- } catch (Exception e) {
+ return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer, bufferSize);
+ } catch (Throwable e) {
throw new KafkaException(e);
}
}
@@ -75,8 +77,8 @@ public enum CompressionType {
@Override
public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) {
try {
- return (InputStream) SNAPPY_INPUT_STREAM_SUPPLIER.get().newInstance(buffer);
- } catch (Exception e) {
+ return (InputStream) SnappyConstructors.INPUT.invoke(buffer);
+ } catch (Throwable e) {
throw new KafkaException(e);
}
}
@@ -86,9 +88,9 @@ public enum CompressionType {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) {
try {
- return (OutputStream) LZ4_OUTPUT_STREAM_SUPPLIER.get().newInstance(buffer,
+ return (OutputStream) LZ4Constructors.OUTPUT.invoke(buffer,
messageVersion == RecordBatch.MAGIC_VALUE_V0);
- } catch (Exception e) {
+ } catch (Throwable e) {
throw new KafkaException(e);
}
}
@@ -96,9 +98,9 @@ public enum CompressionType {
@Override
public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) {
try {
- return (InputStream) LZ4_INPUT_STREAM_SUPPLIER.get().newInstance(buffer,
+ return (InputStream) LZ4Constructors.INPUT.invoke(buffer,
messageVersion == RecordBatch.MAGIC_VALUE_V0);
- } catch (Exception e) {
+ } catch (Throwable e) {
throw new KafkaException(e);
}
}
@@ -146,64 +148,34 @@ public enum CompressionType {
throw new IllegalArgumentException("Unknown compression name: " + name);
}
- // dynamically load the snappy and lz4 classes to avoid runtime dependency if we are not using compression
- // caching constructors to avoid invoking of Class.forName method for each batch
- private static final MemoizingConstructorSupplier SNAPPY_OUTPUT_STREAM_SUPPLIER = new MemoizingConstructorSupplier(new ConstructorSupplier() {
- @Override
- public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
- return Class.forName("org.xerial.snappy.SnappyOutputStream")
- .getConstructor(OutputStream.class, Integer.TYPE);
- }
- });
+ // Dynamically load the Snappy and LZ4 classes so that we only have a runtime dependency on compression algorithms
+ // that are used. This is important for platforms that are not supported by the underlying libraries.
+ // Note that we are using the initialization-on-demand holder idiom, so it's important that the initialisation
+ // is done in separate classes (one per compression type).
- private static final MemoizingConstructorSupplier LZ4_OUTPUT_STREAM_SUPPLIER = new MemoizingConstructorSupplier(new ConstructorSupplier() {
- @Override
- public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
- return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream")
- .getConstructor(OutputStream.class, Boolean.TYPE);
- }
- });
+ private static class LZ4Constructors {
+ static final MethodHandle INPUT = findConstructor(
+ "org.apache.kafka.common.record.KafkaLZ4BlockInputStream",
+ MethodType.methodType(void.class, InputStream.class, Boolean.TYPE));
- private static final MemoizingConstructorSupplier SNAPPY_INPUT_STREAM_SUPPLIER = new MemoizingConstructorSupplier(new ConstructorSupplier() {
- @Override
- public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
- return Class.forName("org.xerial.snappy.SnappyInputStream")
- .getConstructor(InputStream.class);
- }
- });
-
- private static final MemoizingConstructorSupplier LZ4_INPUT_STREAM_SUPPLIER = new MemoizingConstructorSupplier(new ConstructorSupplier() {
- @Override
- public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
- return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
- .getConstructor(InputStream.class, Boolean.TYPE);
- }
- });
+ static final MethodHandle OUTPUT = findConstructor(
+ "org.apache.kafka.common.record.KafkaLZ4BlockOutputStream",
+ MethodType.methodType(void.class, OutputStream.class, Boolean.TYPE));
- private interface ConstructorSupplier {
- Constructor get() throws ClassNotFoundException, NoSuchMethodException;
}
- // this code is based on Guava's @see{com.google.common.base.Suppliers.MemoizingSupplier}
- private static class MemoizingConstructorSupplier {
- final ConstructorSupplier delegate;
- transient volatile boolean initialized;
- transient Constructor value;
-
- public MemoizingConstructorSupplier(ConstructorSupplier delegate) {
- this.delegate = delegate;
- }
+ private static class SnappyConstructors {
+ static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream",
+ MethodType.methodType(void.class, InputStream.class));
+ static final MethodHandle OUTPUT = findConstructor("org.xerial.snappy.SnappyOutputStream",
+ MethodType.methodType(void.class, OutputStream.class, Integer.TYPE));
+ }
- public Constructor get() throws NoSuchMethodException, ClassNotFoundException {
- if (!initialized) {
- synchronized (this) {
- if (!initialized) {
- value = delegate.get();
- initialized = true;
- }
- }
- }
- return value;
+ private static MethodHandle findConstructor(String className, MethodType methodType) {
+ try {
+ return MethodHandles.publicLookup().findConstructor(Class.forName(className), methodType);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
}
}