You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2018/04/24 10:17:39 UTC

[bookkeeper] branch master updated: Issue #791: Eliminate byte[] copies in AddEntry code path

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new e021ac3  Issue #791: Eliminate byte[] copies in AddEntry code path
e021ac3 is described below

commit e021ac31cb1acb8de6d4161a618e76127d04d1c8
Author: Nicolas Michael <nm...@salesforce.com>
AuthorDate: Tue Apr 24 12:17:32 2018 +0200

    Issue #791: Eliminate byte[] copies in AddEntry code path
    
    The AddEntry code path in Bookkeeper Client performs several copies of the send buffer before sending it out to bookies, which not only costs CPU cycles for array copying, but also drives up object allocation rate and thus increases young GC frequency.
    
    Currently, the send buffer is prepared by DigestManager.computeDigestAndPackageForSending(), which wraps the netty ByteBuf containing the data together with a (direct) header buffer into a ByteBufList. So far, we haven't yet allocated or copied any new buffers. But in order to send the buffer to the bookies, PendingAddOp.safeRun() will invoke sendWriteRequest() for each bookie in the write set, which will eventually invoke PerChannelBookieClient.addEntry() for each bookie. In here, we [...]
        byte[] toSendArray = toSend.toArray();      // 1st byte[] allocation and copy
        .setBody(ByteString.copyFrom(toSendArray)); // 2nd byte[] allocation and copy
    If for example we're using a write set size of 3, then we allocate 6 byte arrays and copy the same buffer 6 times.
    
    All 6 copies can be eliminated if DigestManager.computeDigestAndPackageForSending() would return a ByteBuf (or ByteBufList) that is backed by an array, which we can access and use without further copying it. PerChannelBookieClient.addEntry() could then wrap this array into a protobuf ByteString without performing any additional copies.
    
    This change eliminates all byte[] allocations and copies in PCBC by:
    1. allocating a new (pooled) array-backed heap ByteBuf for header, digest, and data in DigestManager.computeDigestAndPackageForSending()
    2. copying the data payload into this newly allocated ByteBuf and returning it wrapped into a ByteBufList of size 1 (instead of header + body separately)
    3. enhancing ByteBufList to give access to the underlying array if it consists of only a single ByteBuf that's backed by an array
    4. extracting the underlying array in PerChannelBookieClient.addEntry() from ByteBufList and wrapping it into protobuf ByteString without copying it.
    
    With this change, no more object allocations are required in this code path. DigestManager uses a pooled array (1). We're adding one arraycopy in (2) compared to the original code path, trading this against the allocations and copies we save later on.. Later steps, in particular step (4), is free of object allocation and array copies.
    As a result, the object allocation rate in the AddEntry code path is reduced significantly. In case of a write set size of 3, it is reduced from 6 to 0. This translates into much less frequent young generation garbage collections (in my experiments with write-heavy workload by ~4x overall).
    
    Author: Nicolas Michael <nm...@salesforce.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1361 from nicmichael/AddEntryNoCopy, closes #791
---
 .../org/apache/bookkeeper/client/LedgerHandle.java |  3 +-
 .../bookkeeper/proto/PerChannelBookieClient.java   | 26 ++++++--
 .../proto/checksum/CRC32CDigestManager.java        |  4 +-
 .../proto/checksum/CRC32DigestManager.java         |  4 +-
 .../bookkeeper/proto/checksum/DigestManager.java   | 71 ++++++++++++++++------
 .../proto/checksum/DummyDigestManager.java         |  4 +-
 .../proto/checksum/MacDigestManager.java           |  6 +-
 .../org/apache/bookkeeper/util/ByteBufList.java    | 30 +++++++++
 8 files changed, 116 insertions(+), 32 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 2d34460..3c53a03 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -172,7 +172,8 @@ public class LedgerHandle implements WriteHandle {
             this.throttler = null;
         }
 
-        macManager = DigestManager.instantiate(ledgerId, password, BookKeeper.DigestType.toProtoDigestType(digestType));
+        macManager = DigestManager.instantiate(ledgerId, password, BookKeeper.DigestType.toProtoDigestType(digestType),
+                bk.getConf().getUseV2WireProtocol());
 
         // If the password is empty, pass the same random ledger key which is generated by the hash of the empty
         // password, so that the bookie can avoid processing the keys for each entry
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index b125d97..10fc253 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.UnsafeByteOperations;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
@@ -524,11 +525,19 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 .setVersion(ProtocolVersion.VERSION_THREE)
                 .setOperation(OperationType.WRITE_LAC)
                 .setTxnId(txnId);
+        ByteString body;
+        if (toSend.hasArray()) {
+            body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes());
+        } else if (toSend.size() == 1) {
+            body = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(0).nioBuffer());
+        } else {
+            body = UnsafeByteOperations.unsafeWrap(toSend.toArray());
+        }
         WriteLacRequest.Builder writeLacBuilder = WriteLacRequest.newBuilder()
                 .setLedgerId(ledgerId)
                 .setLac(lac)
-                .setMasterKey(ByteString.copyFrom(masterKey))
-                .setBody(ByteString.copyFrom(toSend.toArray()));
+                .setMasterKey(UnsafeByteOperations.unsafeWrap(masterKey))
+                .setBody(body);
 
         final Request writeLacRequest = Request.newBuilder()
                 .setHeader(headerBuilder)
@@ -578,12 +587,19 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
             }
 
-            byte[] toSendArray = toSend.toArray();
+            ByteString body;
+            if (toSend.hasArray()) {
+                body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes());
+            } else if (toSend.size() == 1) {
+                body = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(0).nioBuffer());
+            } else {
+                body = UnsafeByteOperations.unsafeWrap(toSend.toArray());
+            }
             AddRequest.Builder addBuilder = AddRequest.newBuilder()
                     .setLedgerId(ledgerId)
                     .setEntryId(entryId)
-                    .setMasterKey(ByteString.copyFrom(masterKey))
-                    .setBody(ByteString.copyFrom(toSendArray));
+                    .setMasterKey(UnsafeByteOperations.unsafeWrap(masterKey))
+                    .setBody(body);
 
             if (((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
                 addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
index 1387028..d6d1949 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
@@ -38,8 +38,8 @@ class CRC32CDigestManager extends DigestManager {
         }
     };
 
-    public CRC32CDigestManager(long ledgerId) {
-        super(ledgerId);
+    public CRC32CDigestManager(long ledgerId, boolean useV2Protocol) {
+        super(ledgerId, useV2Protocol);
         if (!Sse42Crc32C.isSupported()) {
             log.error("Sse42Crc32C is not supported, will use less slower CRC32C implementation.");
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
index 2650828..b71ab59 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
@@ -46,8 +46,8 @@ class CRC32DigestManager extends DigestManager {
         }
     };
 
-    public CRC32DigestManager(long ledgerId) {
-        super(ledgerId);
+    public CRC32DigestManager(long ledgerId, boolean useV2Protocol) {
+        super(ledgerId, useV2Protocol);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index 2f85e4f..2627db8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -43,7 +43,8 @@ public abstract class DigestManager {
     public static final int METADATA_LENGTH = 32;
     public static final int LAC_METADATA_LENGTH = 16;
 
-    long ledgerId;
+    final long ledgerId;
+    final boolean useV2Protocol;
 
     abstract int getMacCodeLength();
 
@@ -57,22 +58,28 @@ public abstract class DigestManager {
 
     final int macCodeLength;
 
-    public DigestManager(long ledgerId) {
+    public DigestManager(long ledgerId, boolean useV2Protocol) {
         this.ledgerId = ledgerId;
+        this.useV2Protocol = useV2Protocol;
         macCodeLength = getMacCodeLength();
     }
 
     public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType)
             throws GeneralSecurityException {
+        return instantiate(ledgerId, passwd, digestType, false);
+    }
+
+    public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType,
+            boolean useV2Protocol) throws GeneralSecurityException {
         switch(digestType) {
         case HMAC:
-            return new MacDigestManager(ledgerId, passwd);
+            return new MacDigestManager(ledgerId, passwd, useV2Protocol);
         case CRC32:
-            return new CRC32DigestManager(ledgerId);
+            return new CRC32DigestManager(ledgerId, useV2Protocol);
         case CRC32C:
-            return new CRC32CDigestManager(ledgerId);
+            return new CRC32CDigestManager(ledgerId, useV2Protocol);
         case DUMMY:
-            return new DummyDigestManager(ledgerId);
+            return new DummyDigestManager(ledgerId, useV2Protocol);
         default:
             throw new GeneralSecurityException("Unknown checksum type: " + digestType);
         }
@@ -89,17 +96,40 @@ public abstract class DigestManager {
      */
     public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length,
             ByteBuf data) {
-        ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(METADATA_LENGTH + macCodeLength);
-        headersBuffer.writeLong(ledgerId);
-        headersBuffer.writeLong(entryId);
-        headersBuffer.writeLong(lastAddConfirmed);
-        headersBuffer.writeLong(length);
-
-        update(headersBuffer);
-        update(data);
-        populateValueAndReset(headersBuffer);
-
-        return ByteBufList.get(headersBuffer, data);
+        if (this.useV2Protocol) {
+            /*
+             * For V2 protocol, use pooled direct ByteBuf's to avoid object allocation in DigestManager.
+             */
+            ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(METADATA_LENGTH + macCodeLength);
+            headersBuffer.writeLong(ledgerId);
+            headersBuffer.writeLong(entryId);
+            headersBuffer.writeLong(lastAddConfirmed);
+            headersBuffer.writeLong(length);
+
+            update(headersBuffer);
+            update(data);
+            populateValueAndReset(headersBuffer);
+
+            return ByteBufList.get(headersBuffer, data);
+        } else {
+            /*
+             * For V3 protocol, use unpooled heap ByteBuf's (backed by accessible array): The one object
+             * allocation here saves us later allocations when converting to protobuf ByteString.
+             */
+            ByteBuf sendBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength + data.readableBytes());
+            sendBuffer.writeLong(ledgerId);
+            sendBuffer.writeLong(entryId);
+            sendBuffer.writeLong(lastAddConfirmed);
+            sendBuffer.writeLong(length);
+
+            update(sendBuffer);
+            update(data);
+            populateValueAndReset(sendBuffer);
+
+            sendBuffer.writeBytes(data, data.readerIndex(), data.readableBytes());
+
+            return ByteBufList.get(sendBuffer);
+        }
     }
 
     /**
@@ -110,7 +140,12 @@ public abstract class DigestManager {
      */
 
     public ByteBufList computeDigestAndPackageForSendingLac(long lac) {
-        ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(LAC_METADATA_LENGTH + macCodeLength);
+        ByteBuf headersBuffer;
+        if (this.useV2Protocol) {
+            headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(LAC_METADATA_LENGTH + macCodeLength);
+        } else {
+            headersBuffer = Unpooled.buffer(LAC_METADATA_LENGTH + macCodeLength);
+        }
         headersBuffer.writeLong(ledgerId);
         headersBuffer.writeLong(lac);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
index aedc5ad..1b771f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
@@ -27,8 +27,8 @@ import io.netty.buffer.ByteBuf;
  * This class provides a noop digest implementation.
  */
 public class DummyDigestManager extends DigestManager {
-    public DummyDigestManager(long ledgerId) {
-        super(ledgerId);
+    public DummyDigestManager(long ledgerId, boolean useV2Protocol) {
+        super(ledgerId, useV2Protocol);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
index d897a9e..8d830a4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.proto.checksum;
 import static com.google.common.base.Charsets.UTF_8;
 
 import io.netty.buffer.ByteBuf;
+
 import java.security.GeneralSecurityException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -61,8 +62,9 @@ public class MacDigestManager extends DigestManager {
         }
     };
 
-    public MacDigestManager(long ledgerId, byte[] passwd) throws GeneralSecurityException {
-        super(ledgerId);
+    public MacDigestManager(long ledgerId, byte[] passwd, boolean useV2Protocol)
+            throws GeneralSecurityException {
+        super(ledgerId, useV2Protocol);
         this.passwd = Arrays.copyOf(passwd, passwd.length);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
index 37d4c72..f9e4cff 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -199,6 +199,7 @@ public class ByteBufList extends AbstractReferenceCounted {
     }
 
     /**
+     * Creates a copy of the readable content of the internal buffers and returns the copy.
      * @return an array containing all the internal buffers content
      */
     public byte[] toArray() {
@@ -208,6 +209,35 @@ public class ByteBufList extends AbstractReferenceCounted {
     }
 
     /**
+     * Returns {@code true} if this buffer has a single backing byte array.
+     * If this method returns true, you can safely call {@link #array()} and
+     * {@link #arrayOffset()}.
+     * @return true, if this {@link ByteBufList} is backed by a single array
+     */
+    public boolean hasArray() {
+        return buffers.size() == 1 && buffers.get(0).hasArray();
+    }
+
+    /**
+     * Returns a reference to the array backing this {@link ByteBufList}.
+     * This method must only be called if {@link #hasArray()} returns {@code true}.
+     * @return the array backing this {@link ByteBufList}
+     */
+    public byte[] array() {
+        return buffers.get(0).array();
+    }
+
+    /**
+     * Returns the offset of the first byte within the backing byte array of
+     * this buffer.
+     * This method must only be called if {@link #hasArray()} returns {@code true}.
+     * @return the offset of the first byte within the backing byte array.
+     */
+    public int arrayOffset() {
+        return buffers.get(0).arrayOffset();
+    }
+
+    /**
      * @return a single buffer with the content of both individual buffers
      */
     @VisibleForTesting

-- 
To stop receiving notification emails like this one, please contact
eolivelli@apache.org.