You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/01/17 19:09:34 UTC
[bookkeeper] branch branch-4.6 updated: DoubleByteBuf fix for Netty
> 4.1.12
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.6 by this push:
new c2e4f45 DoubleByteBuf fix for Netty > 4.1.12
c2e4f45 is described below
commit c2e4f452946931f07e9905ccd8814b097dc0ed2d
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Wed Jan 17 10:34:53 2018 -0800
DoubleByteBuf fix for Netty > 4.1.12
This is a port from Pulsar, DoubleByteBuf has problems with Netty Native Transport.
Original author is sschepens
see https://github.com/apache/incubator-pulsar/pull/1056
Author: Enrico Olivelli <eo...@apache.org>
Reviewers: Ivan Kelly <iv...@apache.org>, Jia Zhai <None>, Sijie Guo <si...@apache.org>
This closes #996 from eolivelli/fix/doublebytebuf
(cherry picked from commit ff4932b37f68303793acce91840a55fddfda231f)
Signed-off-by: Sijie Guo <si...@apache.org>
---
.../org/apache/bookkeeper/util/DoubleByteBuf.java | 30 +++++----
.../apache/bookkeeper/util/DoubleByteBufTest.java | 75 ++++++++++++++++++++++
2 files changed, 92 insertions(+), 13 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
index 15eb263..d38205c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
@@ -17,6 +17,18 @@
*/
package org.apache.bookkeeper.util;
+import com.google.common.collect.ObjectArrays;
+import io.netty.buffer.AbstractReferenceCountedByteBuf;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.ResourceLeakDetector;
+import io.netty.util.ResourceLeakDetectorFactory;
+import io.netty.util.ResourceLeakTracker;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -30,17 +42,6 @@ import java.nio.channels.ScatteringByteChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.buffer.AbstractReferenceCountedByteBuf;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-import io.netty.util.ResourceLeakDetector;
-import io.netty.util.ResourceLeakDetectorFactory;
-import io.netty.util.ResourceLeakTracker;
-
/**
* ByteBuf that holds 2 buffers. Similar to {@see CompositeByteBuf} but doesn't allocate list to hold them.
*/
@@ -366,7 +367,7 @@ public final class DoubleByteBuf extends AbstractReferenceCountedByteBuf {
@Override
public ByteBuffer nioBuffer(int index, int length) {
- ByteBuffer dst = ByteBuffer.allocate(length);
+ ByteBuffer dst = isDirect() ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
ByteBuf b = Unpooled.wrappedBuffer(dst);
b.writerIndex(0);
getBytes(index, b, length);
@@ -392,7 +393,10 @@ public final class DoubleByteBuf extends AbstractReferenceCountedByteBuf {
@Override
public ByteBuffer[] nioBuffers() {
- return nioBuffers(readerIndex(), readableBytes());
+ if (b1.nioBufferCount() == 1 && b2.nioBufferCount() == 1) {
+ return new ByteBuffer[] { b1.nioBuffer(), b2.nioBuffer() };
+ }
+ return ObjectArrays.concat(b1.nioBuffers(), b2.nioBuffers(), ByteBuffer.class);
}
@Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
index 5d8b223..06c7dae 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
@@ -19,12 +19,15 @@ package org.apache.bookkeeper.util;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
public class DoubleByteBufTest {
@@ -118,4 +121,76 @@ public class DoubleByteBufTest {
assertEquals(ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 }), b.nioBuffer());
}
+
+ @Test
+ public void testNonDirectNioBuffer() {
+ ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
+ ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
+ ByteBuf b = DoubleByteBuf.get(b1, b2);
+ assertFalse(b1.isDirect());
+ assertFalse(b2.isDirect());
+ assertFalse(b.isDirect());
+ ByteBuffer nioBuffer = b.nioBuffer();
+ assertFalse(nioBuffer.isDirect());
+ }
+
+ @Test
+ public void testNonDirectPlusDirectNioBuffer() {
+ ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
+ ByteBuf b2 = Unpooled.directBuffer(2);
+ ByteBuf b = DoubleByteBuf.get(b1, b2);
+ assertFalse(b1.isDirect());
+ assertTrue(b2.isDirect());
+ assertFalse(b.isDirect());
+ ByteBuffer nioBuffer = b.nioBuffer();
+ assertFalse(nioBuffer.isDirect());
+ }
+
+ @Test
+ public void testDirectPlusNonDirectNioBuffer() {
+ ByteBuf b1 = Unpooled.directBuffer(2);
+ ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
+ ByteBuf b = DoubleByteBuf.get(b1, b2);
+ assertTrue(b1.isDirect());
+ assertFalse(b2.isDirect());
+ assertFalse(b.isDirect());
+ ByteBuffer nioBuffer = b.nioBuffer();
+ assertFalse(nioBuffer.isDirect());
+ }
+
+ @Test
+ public void testDirectNioBuffer() {
+ ByteBuf b1 = Unpooled.directBuffer(2);
+ ByteBuf b2 = Unpooled.directBuffer(2);
+ ByteBuf b = DoubleByteBuf.get(b1, b2);
+ assertTrue(b1.isDirect());
+ assertTrue(b2.isDirect());
+ assertTrue(b.isDirect());
+ ByteBuffer nioBuffer = b.nioBuffer();
+ assertTrue(nioBuffer.isDirect());
+ }
+
+ /**
+ * Verify that readableBytes() returns writerIndex - readerIndex. In this case writerIndex is the end of the buffer
+ * and readerIndex is increased by 64.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReadableBytes() throws Exception {
+ ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ b1.writerIndex(b1.capacity());
+ ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ b2.writerIndex(b2.capacity());
+ ByteBuf buf = DoubleByteBuf.get(b1, b2);
+
+ assertEquals(buf.readerIndex(), 0);
+ assertEquals(buf.writerIndex(), 256);
+ assertEquals(buf.readableBytes(), 256);
+
+ for (int i = 0; i < 4; ++i) {
+ buf.skipBytes(64);
+ assertEquals(buf.readableBytes(), 256 - 64 * (i + 1));
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].