You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2019/04/30 02:31:12 UTC

[bookkeeper] branch master updated: ISSUE #2067: reduce byte[] allocation in add entry

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new bd699e6  ISSUE #2067: reduce byte[] allocation in add entry
bd699e6 is described below

commit bd699e61d729f327b5de302089d11bc3dcadfeb8
Author: mtang01 <49...@users.noreply.github.com>
AuthorDate: Mon Apr 29 19:31:04 2019 -0700

    ISSUE #2067: reduce byte[] allocation in add entry
    
    Descriptions of the changes in this PR:
    This change removes a byte[] copy in DigestManager digest calculation
    (computeDigestAndPackageForSending)
    that puts crc header and payload in a continuous buffer. Instead,
    it uses protobuf ByteString.concat to concatenate header and payload
    without copy when building protobuf message.
    
    
    
    
    ### Motivation
    
    In add entry code path, I see lots of byte[] allocated to do digest calculation.  It's possible to not allocate byte[].
    
    ### Changes
    
    Don't allocate a ByteBuf to copy data. Keep header and data separate, but use ByteString.concat when construct protobuf message.
    
    Master Issue: #2067
    
    
    
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #2068 from mtang01/add_entry_mem, closes #2067
---
 .../bookkeeper/proto/PerChannelBookieClient.java   | 12 ++++--
 .../bookkeeper/proto/checksum/DigestManager.java   | 46 ++++++----------------
 2 files changed, 21 insertions(+), 37 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index d411e98..1fdb403 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -753,13 +753,15 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
             }
 
-            ByteString body;
+            ByteString body = null;
             if (toSend.hasArray()) {
                 body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes());
-            } else if (toSend.size() == 1) {
-                body = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(0).nioBuffer());
             } else {
-                body = UnsafeByteOperations.unsafeWrap(toSend.toArray());
+                for (int i = 0; i < toSend.size(); i++) {
+                    ByteString piece = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(i).nioBuffer());
+                    // use ByteString.concat to avoid byte[] allocation when toSend has multiple ByteBufs
+                    body = (body == null) ? piece : body.concat(piece);
+                }
             }
             AddRequest.Builder addBuilder = AddRequest.newBuilder()
                     .setLedgerId(ledgerId)
@@ -1982,6 +1984,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     }
 
     private final Recycler<AddCompletion> addCompletionRecycler = new Recycler<AddCompletion>() {
+            @Override
             protected AddCompletion newObject(Recycler.Handle<AddCompletion> handle) {
                 return new AddCompletion(handle);
             }
@@ -2184,6 +2187,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     }
 
     private final Recycler<V2CompletionKey> v2KeyRecycler = new Recycler<V2CompletionKey>() {
+            @Override
             protected V2CompletionKey newObject(
                     Recycler.Handle<V2CompletionKey> handle) {
                 return new V2CompletionKey(handle);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index 2dabf82..034dd6e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -20,7 +20,6 @@ package org.apache.bookkeeper.proto.checksum;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
-import io.netty.util.ReferenceCountUtil;
 
 import java.security.GeneralSecurityException;
 import java.security.NoSuchAlgorithmException;
@@ -99,41 +98,22 @@ public abstract class DigestManager {
      */
     public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length,
             ByteBuf data) {
+        ByteBuf headersBuffer;
         if (this.useV2Protocol) {
-            /*
-             * For V2 protocol, use pooled direct ByteBuf's to avoid object allocation in DigestManager.
-             */
-            ByteBuf headersBuffer = allocator.buffer(METADATA_LENGTH + macCodeLength);
-            headersBuffer.writeLong(ledgerId);
-            headersBuffer.writeLong(entryId);
-            headersBuffer.writeLong(lastAddConfirmed);
-            headersBuffer.writeLong(length);
-
-            update(headersBuffer);
-            update(data);
-            populateValueAndReset(headersBuffer);
-
-            return ByteBufList.get(headersBuffer, data);
+            headersBuffer = allocator.buffer(METADATA_LENGTH + macCodeLength);
         } else {
-            /*
-             * For V3 protocol, use unpooled heap ByteBuf's (backed by accessible array): The one object
-             * allocation here saves us later allocations when converting to protobuf ByteString.
-             */
-            ByteBuf sendBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength + data.readableBytes());
-            sendBuffer.writeLong(ledgerId);
-            sendBuffer.writeLong(entryId);
-            sendBuffer.writeLong(lastAddConfirmed);
-            sendBuffer.writeLong(length);
-
-            update(sendBuffer);
-            update(data);
-            populateValueAndReset(sendBuffer);
-
-            sendBuffer.writeBytes(data, data.readerIndex(), data.readableBytes());
-            ReferenceCountUtil.release(data);
-
-            return ByteBufList.get(sendBuffer);
+            headersBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength);
         }
+        headersBuffer.writeLong(ledgerId);
+        headersBuffer.writeLong(entryId);
+        headersBuffer.writeLong(lastAddConfirmed);
+        headersBuffer.writeLong(length);
+
+        update(headersBuffer);
+        update(data);
+        populateValueAndReset(headersBuffer);
+
+        return ByteBufList.get(headersBuffer, data);
     }
 
     /**