You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/03/29 13:17:33 UTC

[GitHub] jiazhai closed pull request #1307: Use reflection based CRC32 to pass direct memory pointer

jiazhai closed pull request #1307: Use reflection based CRC32 to pass direct memory pointer
URL: https://github.com/apache/bookkeeper/pull/1307
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
index 1f066405a..26508280e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
@@ -19,13 +19,30 @@
 */
 
 import io.netty.buffer.ByteBuf;
-import java.util.zip.CRC32;
+import io.netty.util.concurrent.FastThreadLocal;
 
+/**
+ * Digest manager for CRC32 checksum.
+ */
 class CRC32DigestManager extends DigestManager {
-    private final ThreadLocal<CRC32> crc = new ThreadLocal<CRC32>() {
+
+    /**
+     * Interface that abstracts different implementations of the CRC32 digest.
+     */
+    interface CRC32Digest {
+        long getValueAndReset();
+
+        void update(ByteBuf buf);
+    }
+
+    private static final FastThreadLocal<CRC32Digest> crc = new FastThreadLocal<CRC32Digest>() {
         @Override
-        protected CRC32 initialValue() {
-            return new CRC32();
+        protected CRC32Digest initialValue() {
+            if (DirectMemoryCRC32Digest.isSupported()) {
+                return new DirectMemoryCRC32Digest();
+            } else {
+                return new StandardCRC32Digest();
+            }
         }
     };
 
@@ -40,12 +57,11 @@ int getMacCodeLength() {
 
     @Override
     void populateValueAndReset(ByteBuf buf) {
-        buf.writeLong(crc.get().getValue());
-        crc.get().reset();
+        buf.writeLong(crc.get().getValueAndReset());
     }
 
     @Override
     void update(ByteBuf data) {
-        crc.get().update(data.nioBuffer());
+        crc.get().update(data);
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java
new file mode 100644
index 000000000..a895153f5
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java
@@ -0,0 +1,94 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.bookkeeper.proto.checksum;
+
+import io.netty.buffer.ByteBuf;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.zip.CRC32;
+
+import org.apache.bookkeeper.proto.checksum.CRC32DigestManager.CRC32Digest;
+
+/**
+ * Specialized implementation of CRC32 digest that uses reflection on {@link CRC32} class to get access to
+ * "updateByteBuffer" method and pass a direct memory pointer.
+ */
+class DirectMemoryCRC32Digest implements CRC32Digest {
+
+    public static boolean isSupported() {
+        return updateBytes != null;
+    }
+
+    private int crcValue;
+
+    @Override
+    public long getValueAndReset() {
+        long value = crcValue & 0xffffffffL;
+        crcValue = 0;
+        return value;
+    }
+
+    @Override
+    public void update(ByteBuf buf) {
+        int index = buf.readerIndex();
+        int length = buf.readableBytes();
+
+        try {
+            if (buf.hasMemoryAddress()) {
+                // Calculate CRC directly from the direct memory pointer
+                crcValue = (int) updateByteBuffer.invoke(null, crcValue, buf.memoryAddress(), index, length);
+            } else if (buf.hasArray()) {
+                // Use the internal method to update from array based
+                crcValue = (int) updateBytes.invoke(null, crcValue, buf.array(), buf.arrayOffset() + index, length);
+            } else {
+                // Fallback to data copy if buffer is not contiguous
+                byte[] b = new byte[length];
+                buf.getBytes(index, b, 0, length);
+                crcValue = (int) updateBytes.invoke(null, crcValue, b, 0, b.length);
+            }
+        } catch (IllegalAccessException | InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static final Method updateByteBuffer;
+    private static final Method updateBytes;
+
+    static {
+        // Access CRC32 class private native methods to compute the crc on the ByteBuf direct memory,
+        // without necessity to convert to a nio ByteBuffer.
+        Method updateByteBufferMethod = null;
+        Method updateBytesMethod = null;
+        try {
+            updateByteBufferMethod = CRC32.class.getDeclaredMethod("updateByteBuffer", int.class, long.class, int.class,
+                    int.class);
+            updateByteBufferMethod.setAccessible(true);
+
+            updateBytesMethod = CRC32.class.getDeclaredMethod("updateBytes", int.class, byte[].class, int.class,
+                    int.class);
+            updateBytesMethod.setAccessible(true);
+        } catch (NoSuchMethodException | SecurityException e) {
+            updateByteBufferMethod = null;
+            updateBytesMethod = null;
+        }
+
+        updateByteBuffer = updateByteBufferMethod;
+        updateBytes = updateBytesMethod;
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java
new file mode 100644
index 000000000..f103b14c4
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.bookkeeper.proto.checksum;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.zip.CRC32;
+
+import org.apache.bookkeeper.proto.checksum.CRC32DigestManager.CRC32Digest;
+
+/**
+ * Regular implementation of CRC32 digest that makes use of {@link CRC32} class.
+ */
+class StandardCRC32Digest implements CRC32Digest {
+
+    private final CRC32 crc = new CRC32();
+
+    @Override
+    public long getValueAndReset() {
+        long value = crc.getValue();
+        crc.reset();
+        return value;
+    }
+
+    @Override
+    public void update(ByteBuf buf) {
+        crc.update(buf.nioBuffer());
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services