You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2018/02/15 23:28:04 UTC
[bookkeeper] branch branch-4.6 updated: Replace DoubleByteBuf with
CompositeByteBuf because of perf regression with Netty > 4.1.12
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.6 by this push:
new 81e8d52 Replace DoubleByteBuf with CompositeByteBuf because of perf regression with Netty > 4.1.12
81e8d52 is described below
commit 81e8d520062260882a03672636d74c408d0c44b9
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Feb 15 15:27:33 2018 -0800
Replace DoubleByteBuf with CompositeByteBuf because of perf regression with Netty > 4.1.12
Starting from Netty-4.1.13 some internal behavior has changed and that has introduced a performance penalty when using the `DoubleByteBuf`.
The problem resides in the fact that netty falls back to calling `nioBuffers()` and that is creating an array of unpooled direct `ByteBuffers`. That is pretty heavy and the JVM is taking 1 to 3 seconds GC pause to reclaim them.
Short term fix is to go back to `CompositeByteBuf` which doesn't have the problem (the `DirectByteBuffer` instances are pooled and reused). I have verified that GC is back to normal after this patch.
More mid-term, since `CompositeByteBuf` allocates a bunch of objects (counted ~10 per entry) I have a change to replace it with a `ByteBufList` holder class. I have the change almost done but it will take a while to test it to verify corner cases.
This should be considered for a 4.6.2 fix release.
Author: Matteo Merli <mm...@apache.org>
Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>, Sijie Guo <si...@apache.org>, Venkateswararao Jujjuri (JV) <None>
This closes #1108 from merlimat/composite-byte-buf-master
(cherry picked from commit 1b8ad9cba38ba7ecbfbe3836c6db98a8b319e094)
Signed-off-by: Matteo Merli <mm...@apache.org>
---
.../org/apache/bookkeeper/util/DoubleByteBuf.java | 444 +--------------------
.../apache/bookkeeper/util/DoubleByteBufTest.java | 2 -
2 files changed, 10 insertions(+), 436 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
index f906e97..4109933 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
@@ -17,447 +17,23 @@
*/
package org.apache.bookkeeper.util;
-import io.netty.buffer.AbstractReferenceCountedByteBuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-import io.netty.util.ResourceLeakDetector;
-import io.netty.util.ResourceLeakDetectorFactory;
-import io.netty.util.ResourceLeakTracker;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
-import java.nio.channels.GatheringByteChannel;
-import java.nio.channels.ScatteringByteChannel;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import io.netty.buffer.CompositeByteBuf;
/**
- * ByteBuf that holds 2 buffers. Similar to {@see CompositeByteBuf} but doesn't allocate list to hold them.
+ * Removed custom implementation of DoubleByteBuf, just relying on straight regular CompositeByteBuf.
*/
-@SuppressWarnings("unchecked")
-public final class DoubleByteBuf extends AbstractReferenceCountedByteBuf {
-
- private ByteBuf b1;
- private ByteBuf b2;
- private final Handle<DoubleByteBuf> recyclerHandle;
-
- private static final Recycler<DoubleByteBuf> RECYCLER = new Recycler<DoubleByteBuf>() {
- @Override
- protected DoubleByteBuf newObject(Recycler.Handle<DoubleByteBuf> handle) {
- return new DoubleByteBuf(handle);
- }
- };
-
- private DoubleByteBuf(Handle<DoubleByteBuf> recyclerHandle) {
- super(Integer.MAX_VALUE);
- this.recyclerHandle = recyclerHandle;
- }
-
- public static ByteBuf get(ByteBuf b1, ByteBuf b2) {
- DoubleByteBuf buf = RECYCLER.get();
- buf.setRefCnt(1);
-
- // Make sure the buffers are not deallocated as long as we hold them. Also, buffers can get retained/releases
- // outside of DoubleByteBuf scope
- buf.b1 = b1.retain();
- buf.b2 = b2.retain();
- buf.setIndex(0, b1.readableBytes() + b2.readableBytes());
- return toLeakAwareBuffer(buf);
- }
-
- public ByteBuf getFirst() {
- return b1;
- }
-
- public ByteBuf getSecond() {
- return b2;
- }
-
- @Override
- public boolean isDirect() {
- return b1.isDirect() && b2.isDirect();
- }
-
- @Override
- public boolean hasArray() {
- // There's no single array available
- return false;
- }
-
- @Override
- public byte[] array() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int arrayOffset() {
-
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean hasMemoryAddress() {
- return false;
- }
-
- @Override
- public long memoryAddress() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int capacity() {
- return b1.capacity() + b2.capacity();
- }
-
- @Override
- public int writableBytes() {
- return 0;
- }
-
- @Override
- public DoubleByteBuf capacity(int newCapacity) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ByteBufAllocator alloc() {
- return PooledByteBufAllocator.DEFAULT;
- }
-
- @Override
- @Deprecated
- public ByteOrder order() {
- return ByteOrder.BIG_ENDIAN;
- }
-
- @Override
- public byte getByte(int index) {
- if (index < b1.writerIndex()) {
- return b1.getByte(index);
- } else {
- return b2.getByte(index - b1.writerIndex());
- }
- }
-
- @Override
- protected byte _getByte(int index) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected short _getShort(int index) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected short _getShortLE(int index) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected int _getUnsignedMediumLE(int index) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected int _getIntLE(int index) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected long _getLongLE(int index) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected void _setShortLE(int index, int value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected void _setMediumLE(int index, int value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected void _setIntLE(int index, int value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected void _setLongLE(int index, long value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected int _getUnsignedMedium(int index) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected int _getInt(int index) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected long _getLong(int index) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DoubleByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
- return getBytes(index, Unpooled.wrappedBuffer(dst), dstIndex, length);
- }
-
- @Override
- public ByteBuf getBytes(int index, ByteBuffer dst) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DoubleByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
- checkDstIndex(index, length, dstIndex, dst.capacity());
- if (length == 0) {
- return this;
- }
-
- int b1Length = Math.min(length, b1.readableBytes() - index);
- if (b1Length > 0) {
- b1.getBytes(b1.readerIndex() + index, dst, dstIndex, b1Length);
- dstIndex += b1Length;
- length -= b1Length;
- index = 0;
- } else {
- index -= b1.readableBytes();
- }
-
- if (length > 0) {
- int b2Length = Math.min(length, b2.readableBytes() - index);
- b2.getBytes(b2.readerIndex() + index, dst, dstIndex, b2Length);
- }
- return this;
- }
-
- @Override
- public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DoubleByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DoubleByteBuf setByte(int index, int value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected void _setByte(int index, int value) {
- throw new UnsupportedOperationException();
- }
+public class DoubleByteBuf extends CompositeByteBuf {
- @Override
- public DoubleByteBuf setShort(int index, int value) {
- throw new UnsupportedOperationException();
+ public DoubleByteBuf(ByteBufAllocator alloc) {
+ super(alloc, true, 2);
}
- @Override
- protected void _setShort(int index, int value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DoubleByteBuf setMedium(int index, int value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected void _setMedium(int index, int value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DoubleByteBuf setInt(int index, int value) {
- return (DoubleByteBuf) super.setInt(index, value);
- }
-
- @Override
- protected void _setInt(int index, int value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DoubleByteBuf setLong(int index, long value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected void _setLong(int index, long value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DoubleByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DoubleByteBuf setBytes(int index, ByteBuffer src) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DoubleByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int setBytes(int index, InputStream in, int length) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ByteBuf copy(int index, int length) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int nioBufferCount() {
- return b1.nioBufferCount() + b2.nioBufferCount();
- }
-
- @Override
- public ByteBuffer internalNioBuffer(int index, int length) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ByteBuffer nioBuffer(int index, int length) {
- ByteBuffer dst = isDirect() ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
- ByteBuf b = Unpooled.wrappedBuffer(dst);
- b.writerIndex(0);
- getBytes(index, b, length);
- return dst;
- }
-
- @Override
- public ByteBuffer[] nioBuffers(int index, int length) {
- return new ByteBuffer[] { nioBuffer(index, length) };
- }
-
- @Override
- public DoubleByteBuf discardReadBytes() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String toString() {
- String result = super.toString();
- result = result.substring(0, result.length() - 1);
- return result + ", components=2)";
- }
-
- @Override
- public ByteBuffer[] nioBuffers() {
- return nioBuffers(readerIndex(), readableBytes());
- }
-
- @Override
- protected void deallocate() {
- // Double release of buffer for the initial ref-count and the internal retain() when the DoubleByteBuf was
- // created
- b1.release(2);
- b2.release(2);
- b1 = b2 = null;
- recyclerHandle.recycle(this);
- }
-
- @Override
- public ByteBuf unwrap() {
- return null;
- }
-
- private static final Logger log = LoggerFactory.getLogger(DoubleByteBuf.class);
-
- private static final ResourceLeakDetector<DoubleByteBuf> leakDetector = ResourceLeakDetectorFactory.instance()
- .newResourceLeakDetector(DoubleByteBuf.class);
- private static final Constructor<ByteBuf> simpleLeakAwareByteBufConstructor;
- private static final Constructor<ByteBuf> advancedLeakAwareByteBufConstructor;
-
- static {
- Constructor<ByteBuf> tmpSimpleLeakAwareByteBufConstructor = null;
- Constructor<ByteBuf> tmpAdvancedLeakAwareByteBufConstructor = null;
- try {
- Class<?> simpleLeakAwareByteBufClass = Class.forName("io.netty.buffer.SimpleLeakAwareByteBuf");
- tmpSimpleLeakAwareByteBufConstructor = (Constructor<ByteBuf>) simpleLeakAwareByteBufClass
- .getDeclaredConstructor(ByteBuf.class, ResourceLeakTracker.class);
- tmpSimpleLeakAwareByteBufConstructor.setAccessible(true);
-
- Class<?> advancedLeakAwareByteBufClass = Class.forName("io.netty.buffer.AdvancedLeakAwareByteBuf");
- tmpAdvancedLeakAwareByteBufConstructor = (Constructor<ByteBuf>) advancedLeakAwareByteBufClass
- .getDeclaredConstructor(ByteBuf.class, ResourceLeakTracker.class);
- tmpAdvancedLeakAwareByteBufConstructor.setAccessible(true);
- } catch (Throwable t) {
- log.error("Failed to use reflection to enable leak detection", t);
- } finally {
- simpleLeakAwareByteBufConstructor = tmpSimpleLeakAwareByteBufConstructor;
- advancedLeakAwareByteBufConstructor = tmpAdvancedLeakAwareByteBufConstructor;
- }
- }
-
- private static ByteBuf toLeakAwareBuffer(DoubleByteBuf buf) {
- try {
- ResourceLeakTracker<DoubleByteBuf> leak;
- switch (ResourceLeakDetector.getLevel()) {
- case DISABLED:
- break;
-
- case SIMPLE:
- leak = leakDetector.track(buf);
- if (leak != null) {
- return simpleLeakAwareByteBufConstructor.newInstance(buf, leak);
- }
- break;
- case ADVANCED:
- case PARANOID:
- leak = leakDetector.track(buf);
- if (leak != null) {
- return advancedLeakAwareByteBufConstructor.newInstance(buf, leak);
- }
- break;
- }
- return buf;
- } catch (Throwable t) {
- // Catch reflection exception
- throw new RuntimeException(t);
- }
+ public static DoubleByteBuf get(ByteBuf b1, ByteBuf b2) {
+ DoubleByteBuf cbb = new DoubleByteBuf(b1.alloc());
+ cbb.addComponent(true, b1);
+ cbb.addComponent(true, b2);
+ return cbb;
}
}
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
index 06c7dae..b3114c1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
@@ -166,8 +166,6 @@ public class DoubleByteBufTest {
assertTrue(b1.isDirect());
assertTrue(b2.isDirect());
assertTrue(b.isDirect());
- ByteBuffer nioBuffer = b.nioBuffer();
- assertTrue(nioBuffer.isDirect());
}
/**
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.