You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/04/24 10:17:41 UTC

[GitHub] eolivelli closed pull request #1361: Issue #791: Eliminate byte[] copies in AddEntry code path

eolivelli closed pull request #1361: Issue #791: Eliminate byte[] copies in AddEntry code path
URL: https://github.com/apache/bookkeeper/pull/1361
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 2d3446080..3c53a03ce 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -172,7 +172,8 @@
             this.throttler = null;
         }
 
-        macManager = DigestManager.instantiate(ledgerId, password, BookKeeper.DigestType.toProtoDigestType(digestType));
+        macManager = DigestManager.instantiate(ledgerId, password, BookKeeper.DigestType.toProtoDigestType(digestType),
+                bk.getConf().getUseV2WireProtocol());
 
         // If the password is empty, pass the same random ledger key which is generated by the hash of the empty
         // password, so that the bookie can avoid processing the keys for each entry
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index b125d9721..10fc2537c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -26,6 +26,7 @@
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.UnsafeByteOperations;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
@@ -524,11 +525,19 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB
                 .setVersion(ProtocolVersion.VERSION_THREE)
                 .setOperation(OperationType.WRITE_LAC)
                 .setTxnId(txnId);
+        ByteString body;
+        if (toSend.hasArray()) {
+            body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes());
+        } else if (toSend.size() == 1) {
+            body = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(0).nioBuffer());
+        } else {
+            body = UnsafeByteOperations.unsafeWrap(toSend.toArray());
+        }
         WriteLacRequest.Builder writeLacBuilder = WriteLacRequest.newBuilder()
                 .setLedgerId(ledgerId)
                 .setLac(lac)
-                .setMasterKey(ByteString.copyFrom(masterKey))
-                .setBody(ByteString.copyFrom(toSend.toArray()));
+                .setMasterKey(UnsafeByteOperations.unsafeWrap(masterKey))
+                .setBody(body);
 
         final Request writeLacRequest = Request.newBuilder()
                 .setHeader(headerBuilder)
@@ -578,12 +587,19 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf
                 headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
             }
 
-            byte[] toSendArray = toSend.toArray();
+            ByteString body;
+            if (toSend.hasArray()) {
+                body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes());
+            } else if (toSend.size() == 1) {
+                body = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(0).nioBuffer());
+            } else {
+                body = UnsafeByteOperations.unsafeWrap(toSend.toArray());
+            }
             AddRequest.Builder addBuilder = AddRequest.newBuilder()
                     .setLedgerId(ledgerId)
                     .setEntryId(entryId)
-                    .setMasterKey(ByteString.copyFrom(masterKey))
-                    .setBody(ByteString.copyFrom(toSendArray));
+                    .setMasterKey(UnsafeByteOperations.unsafeWrap(masterKey))
+                    .setBody(body);
 
             if (((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
                 addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
index 138702895..d6d1949b4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
@@ -38,8 +38,8 @@ protected MutableInt initialValue() throws Exception {
         }
     };
 
-    public CRC32CDigestManager(long ledgerId) {
-        super(ledgerId);
+    public CRC32CDigestManager(long ledgerId, boolean useV2Protocol) {
+        super(ledgerId, useV2Protocol);
         if (!Sse42Crc32C.isSupported()) {
             log.error("Sse42Crc32C is not supported, will use less slower CRC32C implementation.");
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
index 26508280e..b71ab5962 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
@@ -46,8 +46,8 @@ protected CRC32Digest initialValue() {
         }
     };
 
-    public CRC32DigestManager(long ledgerId) {
-        super(ledgerId);
+    public CRC32DigestManager(long ledgerId, boolean useV2Protocol) {
+        super(ledgerId, useV2Protocol);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index 2f85e4f75..2627db822 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -43,7 +43,8 @@
     public static final int METADATA_LENGTH = 32;
     public static final int LAC_METADATA_LENGTH = 16;
 
-    long ledgerId;
+    final long ledgerId;
+    final boolean useV2Protocol;
 
     abstract int getMacCodeLength();
 
@@ -57,22 +58,28 @@ void update(byte[] data) {
 
     final int macCodeLength;
 
-    public DigestManager(long ledgerId) {
+    public DigestManager(long ledgerId, boolean useV2Protocol) {
         this.ledgerId = ledgerId;
+        this.useV2Protocol = useV2Protocol;
         macCodeLength = getMacCodeLength();
     }
 
     public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType)
             throws GeneralSecurityException {
+        return instantiate(ledgerId, passwd, digestType, false);
+    }
+
+    public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType,
+            boolean useV2Protocol) throws GeneralSecurityException {
         switch(digestType) {
         case HMAC:
-            return new MacDigestManager(ledgerId, passwd);
+            return new MacDigestManager(ledgerId, passwd, useV2Protocol);
         case CRC32:
-            return new CRC32DigestManager(ledgerId);
+            return new CRC32DigestManager(ledgerId, useV2Protocol);
         case CRC32C:
-            return new CRC32CDigestManager(ledgerId);
+            return new CRC32CDigestManager(ledgerId, useV2Protocol);
         case DUMMY:
-            return new DummyDigestManager(ledgerId);
+            return new DummyDigestManager(ledgerId, useV2Protocol);
         default:
             throw new GeneralSecurityException("Unknown checksum type: " + digestType);
         }
@@ -89,17 +96,40 @@ public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType
      */
     public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length,
             ByteBuf data) {
-        ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(METADATA_LENGTH + macCodeLength);
-        headersBuffer.writeLong(ledgerId);
-        headersBuffer.writeLong(entryId);
-        headersBuffer.writeLong(lastAddConfirmed);
-        headersBuffer.writeLong(length);
-
-        update(headersBuffer);
-        update(data);
-        populateValueAndReset(headersBuffer);
-
-        return ByteBufList.get(headersBuffer, data);
+        if (this.useV2Protocol) {
+            /*
+             * For V2 protocol, use pooled direct ByteBuf's to avoid object allocation in DigestManager.
+             */
+            ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(METADATA_LENGTH + macCodeLength);
+            headersBuffer.writeLong(ledgerId);
+            headersBuffer.writeLong(entryId);
+            headersBuffer.writeLong(lastAddConfirmed);
+            headersBuffer.writeLong(length);
+
+            update(headersBuffer);
+            update(data);
+            populateValueAndReset(headersBuffer);
+
+            return ByteBufList.get(headersBuffer, data);
+        } else {
+            /*
+             * For V3 protocol, use unpooled heap ByteBuf's (backed by accessible array): The one object
+             * allocation here saves us later allocations when converting to protobuf ByteString.
+             */
+            ByteBuf sendBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength + data.readableBytes());
+            sendBuffer.writeLong(ledgerId);
+            sendBuffer.writeLong(entryId);
+            sendBuffer.writeLong(lastAddConfirmed);
+            sendBuffer.writeLong(length);
+
+            update(sendBuffer);
+            update(data);
+            populateValueAndReset(sendBuffer);
+
+            sendBuffer.writeBytes(data, data.readerIndex(), data.readableBytes());
+
+            return ByteBufList.get(sendBuffer);
+        }
     }
 
     /**
@@ -110,7 +140,12 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC
      */
 
     public ByteBufList computeDigestAndPackageForSendingLac(long lac) {
-        ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(LAC_METADATA_LENGTH + macCodeLength);
+        ByteBuf headersBuffer;
+        if (this.useV2Protocol) {
+            headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(LAC_METADATA_LENGTH + macCodeLength);
+        } else {
+            headersBuffer = Unpooled.buffer(LAC_METADATA_LENGTH + macCodeLength);
+        }
         headersBuffer.writeLong(ledgerId);
         headersBuffer.writeLong(lac);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
index aedc5ad67..1b771f078 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
@@ -27,8 +27,8 @@
  * This class provides a noop digest implementation.
  */
 public class DummyDigestManager extends DigestManager {
-    public DummyDigestManager(long ledgerId) {
-        super(ledgerId);
+    public DummyDigestManager(long ledgerId, boolean useV2Protocol) {
+        super(ledgerId, useV2Protocol);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
index d897a9ec6..8d830a488 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
@@ -21,6 +21,7 @@
 import static com.google.common.base.Charsets.UTF_8;
 
 import io.netty.buffer.ByteBuf;
+
 import java.security.GeneralSecurityException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -61,8 +62,9 @@ protected Mac initialValue() {
         }
     };
 
-    public MacDigestManager(long ledgerId, byte[] passwd) throws GeneralSecurityException {
-        super(ledgerId);
+    public MacDigestManager(long ledgerId, byte[] passwd, boolean useV2Protocol)
+            throws GeneralSecurityException {
+        super(ledgerId, useV2Protocol);
         this.passwd = Arrays.copyOf(passwd, passwd.length);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
index 37d4c72c9..f9e4cffce 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -199,6 +199,7 @@ public int getBytes(byte[] dst) {
     }
 
     /**
+     * Creates a copy of the readable content of the internal buffers and returns the copy.
      * @return an array containing all the internal buffers content
      */
     public byte[] toArray() {
@@ -207,6 +208,35 @@ public int getBytes(byte[] dst) {
         return a;
     }
 
+    /**
+     * Returns {@code true} if this buffer has a single backing byte array.
+     * If this method returns true, you can safely call {@link #array()} and
+     * {@link #arrayOffset()}.
+     * @return true, if this {@link ByteBufList} is backed by a single array
+     */
+    public boolean hasArray() {
+        return buffers.size() == 1 && buffers.get(0).hasArray();
+    }
+
+    /**
+     * Returns a reference to the array backing this {@link ByteBufList}.
+     * This method must only be called if {@link #hasArray()} returns {@code true}.
+     * @return the array backing this {@link ByteBufList}
+     */
+    public byte[] array() {
+        return buffers.get(0).array();
+    }
+
+    /**
+     * Returns the offset of the first byte within the backing byte array of
+     * this buffer.
+     * This method must only be called if {@link #hasArray()} returns {@code true}.
+     * @return the offset of the first byte within the backing byte array.
+     */
+    public int arrayOffset() {
+        return buffers.get(0).arrayOffset();
+    }
+
     /**
      * @return a single buffer with the content of both individual buffers
      */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services