You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/01/17 19:09:34 UTC

[bookkeeper] branch branch-4.6 updated: DoubleByteBuf fix for Netty > 4.1.12

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.6 by this push:
     new c2e4f45  DoubleByteBuf fix for Netty > 4.1.12
c2e4f45 is described below

commit c2e4f452946931f07e9905ccd8814b097dc0ed2d
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Wed Jan 17 10:34:53 2018 -0800

    DoubleByteBuf fix for Netty > 4.1.12
    
    This is a port from Pulsar, DoubleByteBuf has problems with Netty Native Transport.
    Original author is  sschepens
    
    see https://github.com/apache/incubator-pulsar/pull/1056
    
    Author: Enrico Olivelli <eo...@apache.org>
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Jia Zhai <None>, Sijie Guo <si...@apache.org>
    
    This closes #996 from eolivelli/fix/doublebytebuf
    
    (cherry picked from commit ff4932b37f68303793acce91840a55fddfda231f)
    Signed-off-by: Sijie Guo <si...@apache.org>
---
 .../org/apache/bookkeeper/util/DoubleByteBuf.java  | 30 +++++----
 .../apache/bookkeeper/util/DoubleByteBufTest.java  | 75 ++++++++++++++++++++++
 2 files changed, 92 insertions(+), 13 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
index 15eb263..d38205c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
@@ -17,6 +17,18 @@
 */
 package org.apache.bookkeeper.util;
 
+import com.google.common.collect.ObjectArrays;
+import io.netty.buffer.AbstractReferenceCountedByteBuf;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.ResourceLeakDetector;
+import io.netty.util.ResourceLeakDetectorFactory;
+import io.netty.util.ResourceLeakTracker;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -30,17 +42,6 @@ import java.nio.channels.ScatteringByteChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.AbstractReferenceCountedByteBuf;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-import io.netty.util.ResourceLeakDetector;
-import io.netty.util.ResourceLeakDetectorFactory;
-import io.netty.util.ResourceLeakTracker;
-
 /**
  * ByteBuf that holds 2 buffers. Similar to {@see CompositeByteBuf} but doesn't allocate list to hold them.
  */
@@ -366,7 +367,7 @@ public final class DoubleByteBuf extends AbstractReferenceCountedByteBuf {
 
     @Override
     public ByteBuffer nioBuffer(int index, int length) {
-        ByteBuffer dst = ByteBuffer.allocate(length);
+        ByteBuffer dst = isDirect() ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
         ByteBuf b = Unpooled.wrappedBuffer(dst);
         b.writerIndex(0);
         getBytes(index, b, length);
@@ -392,7 +393,10 @@ public final class DoubleByteBuf extends AbstractReferenceCountedByteBuf {
 
     @Override
     public ByteBuffer[] nioBuffers() {
-        return nioBuffers(readerIndex(), readableBytes());
+        if (b1.nioBufferCount() == 1 && b2.nioBufferCount() == 1) {
+            return new ByteBuffer[] { b1.nioBuffer(), b2.nioBuffer() };
+        }
+        return ObjectArrays.concat(b1.nioBuffers(), b2.nioBuffers(), ByteBuffer.class);
     }
 
     @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
index 5d8b223..06c7dae 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
@@ -19,12 +19,15 @@ package org.apache.bookkeeper.util;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
 
 import org.junit.Test;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 
 public class DoubleByteBufTest {
@@ -118,4 +121,76 @@ public class DoubleByteBufTest {
 
         assertEquals(ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 }), b.nioBuffer());
     }
+
+    @Test
+    public void testNonDirectNioBuffer() {
+        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
+        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
+        ByteBuf b = DoubleByteBuf.get(b1, b2);
+        assertFalse(b1.isDirect());
+        assertFalse(b2.isDirect());
+        assertFalse(b.isDirect());
+        ByteBuffer nioBuffer = b.nioBuffer();
+        assertFalse(nioBuffer.isDirect());
+    }
+
+    @Test
+    public void testNonDirectPlusDirectNioBuffer() {
+        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
+        ByteBuf b2 = Unpooled.directBuffer(2);
+        ByteBuf b = DoubleByteBuf.get(b1, b2);
+        assertFalse(b1.isDirect());
+        assertTrue(b2.isDirect());
+        assertFalse(b.isDirect());
+        ByteBuffer nioBuffer = b.nioBuffer();
+        assertFalse(nioBuffer.isDirect());
+    }
+
+    @Test
+    public void testDirectPlusNonDirectNioBuffer() {
+        ByteBuf b1 = Unpooled.directBuffer(2);
+        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
+        ByteBuf b = DoubleByteBuf.get(b1, b2);
+        assertTrue(b1.isDirect());
+        assertFalse(b2.isDirect());
+        assertFalse(b.isDirect());
+        ByteBuffer nioBuffer = b.nioBuffer();
+        assertFalse(nioBuffer.isDirect());
+    }
+
+    @Test
+    public void testDirectNioBuffer() {
+        ByteBuf b1 = Unpooled.directBuffer(2);
+        ByteBuf b2 = Unpooled.directBuffer(2);
+        ByteBuf b = DoubleByteBuf.get(b1, b2);
+        assertTrue(b1.isDirect());
+        assertTrue(b2.isDirect());
+        assertTrue(b.isDirect());
+        ByteBuffer nioBuffer = b.nioBuffer();
+        assertTrue(nioBuffer.isDirect());
+    }
+
+    /**
+     * Verify that readableBytes() returns writerIndex - readerIndex. In this case writerIndex is the end of the buffer
+     * and readerIndex is increased by 64.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testReadableBytes() throws Exception {
+        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b1.writerIndex(b1.capacity());
+        ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b2.writerIndex(b2.capacity());
+        ByteBuf buf = DoubleByteBuf.get(b1, b2);
+
+        assertEquals(buf.readerIndex(), 0);
+        assertEquals(buf.writerIndex(), 256);
+        assertEquals(buf.readableBytes(), 256);
+
+        for (int i = 0; i < 4; ++i) {
+            buf.skipBytes(64);
+            assertEquals(buf.readableBytes(), 256 - 64 * (i + 1));
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].