You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2018/02/15 23:28:04 UTC

[bookkeeper] branch branch-4.6 updated: Replace DoubleByteBuf with CompositeByteBuf because of perf regression with Netty > 4.1.12

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.6 by this push:
     new 81e8d52  Replace DoubleByteBuf with CompositeByteBuf because of perf regression with Netty > 4.1.12
81e8d52 is described below

commit 81e8d520062260882a03672636d74c408d0c44b9
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Feb 15 15:27:33 2018 -0800

    Replace DoubleByteBuf with CompositeByteBuf because of perf regression with Netty > 4.1.12
    
    Starting from Netty-4.1.13 some internal behavior has changed and that has introduced a performance penalty when using the `DoubleByteBuf`.
    
    The problem resides in the fact that netty falls back to calling `nioBuffers()` and that is creating an array of unpooled direct `ByteBuffers`. That is pretty heavy and the JVM is taking 1 to 3 seconds GC pause to reclaim them.
    
    Short term fix is to go back to `CompositeByteBuf` which doesn't have the problem (the `DirectByteBuffer` instances are pooled and reused). I have verified that GC is back to normal after this patch.
    
    More mid-term, since `CompositeByteBuf` allocates a bunch of objects (counted ~10 per entry) I have a change to replace it with a `ByteBufList` holder class. I have the change almost done but it will take a while to test it to verify corner cases.
    
    This should be considered for a 4.6.2 fix release.
    
    Author: Matteo Merli <mm...@apache.org>
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>, Sijie Guo <si...@apache.org>, Venkateswararao Jujjuri (JV) <None>
    
    This closes #1108 from merlimat/composite-byte-buf-master
    
    (cherry picked from commit 1b8ad9cba38ba7ecbfbe3836c6db98a8b319e094)
    Signed-off-by: Matteo Merli <mm...@apache.org>
---
 .../org/apache/bookkeeper/util/DoubleByteBuf.java  | 444 +--------------------
 .../apache/bookkeeper/util/DoubleByteBufTest.java  |   2 -
 2 files changed, 10 insertions(+), 436 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
index f906e97..4109933 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
@@ -17,447 +17,23 @@
 */
 package org.apache.bookkeeper.util;
 
-import io.netty.buffer.AbstractReferenceCountedByteBuf;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-import io.netty.util.ResourceLeakDetector;
-import io.netty.util.ResourceLeakDetectorFactory;
-import io.netty.util.ResourceLeakTracker;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
-import java.nio.channels.GatheringByteChannel;
-import java.nio.channels.ScatteringByteChannel;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import io.netty.buffer.CompositeByteBuf;
 
 /**
- * ByteBuf that holds 2 buffers. Similar to {@see CompositeByteBuf} but doesn't allocate list to hold them.
+ * Removed custom implementation of DoubleByteBuf, just relying on straight regular CompositeByteBuf.
  */
-@SuppressWarnings("unchecked")
-public final class DoubleByteBuf extends AbstractReferenceCountedByteBuf {
-
-    private ByteBuf b1;
-    private ByteBuf b2;
-    private final Handle<DoubleByteBuf> recyclerHandle;
-
-    private static final Recycler<DoubleByteBuf> RECYCLER = new Recycler<DoubleByteBuf>() {
-        @Override
-        protected DoubleByteBuf newObject(Recycler.Handle<DoubleByteBuf> handle) {
-            return new DoubleByteBuf(handle);
-        }
-    };
-
-    private DoubleByteBuf(Handle<DoubleByteBuf> recyclerHandle) {
-        super(Integer.MAX_VALUE);
-        this.recyclerHandle = recyclerHandle;
-    }
-
-    public static ByteBuf get(ByteBuf b1, ByteBuf b2) {
-        DoubleByteBuf buf = RECYCLER.get();
-        buf.setRefCnt(1);
-
-        // Make sure the buffers are not deallocated as long as we hold them. Also, buffers can get retained/releases
-        // outside of DoubleByteBuf scope
-        buf.b1 = b1.retain();
-        buf.b2 = b2.retain();
-        buf.setIndex(0, b1.readableBytes() + b2.readableBytes());
-        return toLeakAwareBuffer(buf);
-    }
-
-    public ByteBuf getFirst() {
-        return b1;
-    }
-
-    public ByteBuf getSecond() {
-        return b2;
-    }
-
-    @Override
-    public boolean isDirect() {
-        return b1.isDirect() && b2.isDirect();
-    }
-
-    @Override
-    public boolean hasArray() {
-        // There's no single array available
-        return false;
-    }
-
-    @Override
-    public byte[] array() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int arrayOffset() {
-
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean hasMemoryAddress() {
-        return false;
-    }
-
-    @Override
-    public long memoryAddress() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int capacity() {
-        return b1.capacity() + b2.capacity();
-    }
-
-    @Override
-    public int writableBytes() {
-        return 0;
-    }
-
-    @Override
-    public DoubleByteBuf capacity(int newCapacity) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public ByteBufAllocator alloc() {
-        return PooledByteBufAllocator.DEFAULT;
-    }
-
-    @Override
-    @Deprecated
-    public ByteOrder order() {
-        return ByteOrder.BIG_ENDIAN;
-    }
-
-    @Override
-    public byte getByte(int index) {
-        if (index < b1.writerIndex()) {
-            return b1.getByte(index);
-        } else {
-            return b2.getByte(index - b1.writerIndex());
-        }
-    }
-
-    @Override
-    protected byte _getByte(int index) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected short _getShort(int index) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected short _getShortLE(int index) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected int _getUnsignedMediumLE(int index) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected int _getIntLE(int index) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected long _getLongLE(int index) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected void _setShortLE(int index, int value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected void _setMediumLE(int index, int value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected void _setIntLE(int index, int value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected void _setLongLE(int index, long value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected int _getUnsignedMedium(int index) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected int _getInt(int index) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected long _getLong(int index) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public DoubleByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
-        return getBytes(index, Unpooled.wrappedBuffer(dst), dstIndex, length);
-    }
-
-    @Override
-    public ByteBuf getBytes(int index, ByteBuffer dst) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public DoubleByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
-        checkDstIndex(index, length, dstIndex, dst.capacity());
-        if (length == 0) {
-            return this;
-        }
-
-        int b1Length = Math.min(length, b1.readableBytes() - index);
-        if (b1Length > 0) {
-            b1.getBytes(b1.readerIndex() + index, dst, dstIndex, b1Length);
-            dstIndex += b1Length;
-            length -= b1Length;
-            index = 0;
-        } else {
-            index -= b1.readableBytes();
-        }
-
-        if (length > 0) {
-            int b2Length = Math.min(length, b2.readableBytes() - index);
-            b2.getBytes(b2.readerIndex() + index, dst, dstIndex, b2Length);
-        }
-        return this;
-    }
-
-    @Override
-    public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public DoubleByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public DoubleByteBuf setByte(int index, int value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected void _setByte(int index, int value) {
-        throw new UnsupportedOperationException();
-    }
+public class DoubleByteBuf extends CompositeByteBuf {
 
-    @Override
-    public DoubleByteBuf setShort(int index, int value) {
-        throw new UnsupportedOperationException();
+    public DoubleByteBuf(ByteBufAllocator alloc) {
+        super(alloc, true, 2);
     }
 
-    @Override
-    protected void _setShort(int index, int value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public DoubleByteBuf setMedium(int index, int value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected void _setMedium(int index, int value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public DoubleByteBuf setInt(int index, int value) {
-        return (DoubleByteBuf) super.setInt(index, value);
-    }
-
-    @Override
-    protected void _setInt(int index, int value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public DoubleByteBuf setLong(int index, long value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected void _setLong(int index, long value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public DoubleByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public DoubleByteBuf setBytes(int index, ByteBuffer src) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public DoubleByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int setBytes(int index, InputStream in, int length) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public ByteBuf copy(int index, int length) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int nioBufferCount() {
-        return b1.nioBufferCount() + b2.nioBufferCount();
-    }
-
-    @Override
-    public ByteBuffer internalNioBuffer(int index, int length) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public ByteBuffer nioBuffer(int index, int length) {
-        ByteBuffer dst = isDirect() ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
-        ByteBuf b = Unpooled.wrappedBuffer(dst);
-        b.writerIndex(0);
-        getBytes(index, b, length);
-        return dst;
-    }
-
-    @Override
-    public ByteBuffer[] nioBuffers(int index, int length) {
-        return new ByteBuffer[] { nioBuffer(index, length) };
-    }
-
-    @Override
-    public DoubleByteBuf discardReadBytes() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public String toString() {
-        String result = super.toString();
-        result = result.substring(0, result.length() - 1);
-        return result + ", components=2)";
-    }
-
-    @Override
-    public ByteBuffer[] nioBuffers() {
-        return nioBuffers(readerIndex(), readableBytes());
-    }
-
-    @Override
-    protected void deallocate() {
-        // Double release of buffer for the initial ref-count and the internal retain() when the DoubleByteBuf was
-        // created
-        b1.release(2);
-        b2.release(2);
-        b1 = b2 = null;
-        recyclerHandle.recycle(this);
-    }
-
-    @Override
-    public ByteBuf unwrap() {
-        return null;
-    }
-
-    private static final Logger log = LoggerFactory.getLogger(DoubleByteBuf.class);
-
-    private static final ResourceLeakDetector<DoubleByteBuf> leakDetector = ResourceLeakDetectorFactory.instance()
-            .newResourceLeakDetector(DoubleByteBuf.class);
-    private static final Constructor<ByteBuf> simpleLeakAwareByteBufConstructor;
-    private static final Constructor<ByteBuf> advancedLeakAwareByteBufConstructor;
-
-    static {
-        Constructor<ByteBuf> tmpSimpleLeakAwareByteBufConstructor = null;
-        Constructor<ByteBuf> tmpAdvancedLeakAwareByteBufConstructor = null;
-        try {
-            Class<?> simpleLeakAwareByteBufClass = Class.forName("io.netty.buffer.SimpleLeakAwareByteBuf");
-            tmpSimpleLeakAwareByteBufConstructor = (Constructor<ByteBuf>) simpleLeakAwareByteBufClass
-                    .getDeclaredConstructor(ByteBuf.class, ResourceLeakTracker.class);
-            tmpSimpleLeakAwareByteBufConstructor.setAccessible(true);
-
-            Class<?> advancedLeakAwareByteBufClass = Class.forName("io.netty.buffer.AdvancedLeakAwareByteBuf");
-            tmpAdvancedLeakAwareByteBufConstructor = (Constructor<ByteBuf>) advancedLeakAwareByteBufClass
-                    .getDeclaredConstructor(ByteBuf.class, ResourceLeakTracker.class);
-            tmpAdvancedLeakAwareByteBufConstructor.setAccessible(true);
-        } catch (Throwable t) {
-            log.error("Failed to use reflection to enable leak detection", t);
-        } finally {
-            simpleLeakAwareByteBufConstructor = tmpSimpleLeakAwareByteBufConstructor;
-            advancedLeakAwareByteBufConstructor = tmpAdvancedLeakAwareByteBufConstructor;
-        }
-    }
-
-    private static ByteBuf toLeakAwareBuffer(DoubleByteBuf buf) {
-        try {
-            ResourceLeakTracker<DoubleByteBuf> leak;
-            switch (ResourceLeakDetector.getLevel()) {
-            case DISABLED:
-                break;
-
-            case SIMPLE:
-                leak = leakDetector.track(buf);
-                if (leak != null) {
-                    return simpleLeakAwareByteBufConstructor.newInstance(buf, leak);
-                }
-                break;
-            case ADVANCED:
-            case PARANOID:
-                leak = leakDetector.track(buf);
-                if (leak != null) {
-                    return advancedLeakAwareByteBufConstructor.newInstance(buf, leak);
-                }
-                break;
-            }
-            return buf;
-        } catch (Throwable t) {
-            // Catch reflection exception
-            throw new RuntimeException(t);
-        }
+    public static DoubleByteBuf get(ByteBuf b1, ByteBuf b2) {
+        DoubleByteBuf cbb = new DoubleByteBuf(b1.alloc());
+        cbb.addComponent(true, b1);
+        cbb.addComponent(true, b2);
+        return cbb;
     }
 }
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
index 06c7dae..b3114c1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
@@ -166,8 +166,6 @@ public class DoubleByteBufTest {
         assertTrue(b1.isDirect());
         assertTrue(b2.isDirect());
         assertTrue(b.isDirect());
-        ByteBuffer nioBuffer = b.nioBuffer();
-        assertTrue(nioBuffer.isDirect());
     }
 
     /**

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.