You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/05/01 14:09:43 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1136 Improve UnpooledUnsafeDirectByteBufWrapper

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 6df8c3a28 -> 938c7a1d9


ARTEMIS-1136 Improve UnpooledUnsafeDirectByteBufWrapper


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2cdc6257
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2cdc6257
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2cdc6257

Branch: refs/heads/master
Commit: 2cdc62572bb42de0f98d96e53fb47dc89012a30c
Parents: 6df8c3a
Author: Francesco Nigro <ni...@gmail.com>
Authored: Sun Apr 30 08:11:41 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon May 1 10:09:33 2017 -0400

----------------------------------------------------------------------
 .../UnpooledUnsafeDirectByteBufWrapper.java     | 288 ++++++++++++++++---
 1 file changed, 250 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2cdc6257/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java b/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java
index 0da33c6..a4346dc 100644
--- a/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java
+++ b/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.ReadOnlyBufferException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
@@ -29,28 +30,59 @@ import java.nio.channels.ScatteringByteChannel;
 import io.netty.util.internal.PlatformDependent;
 
 /**
- * A NIO direct {@link ByteBuffer} wrapper.
- * Only ByteBuffer's manipulation operations are supported.
+ * A NIO {@link ByteBuffer}, byte[] and address direct access wrapper.
+ * Only content manipulation operations are supported.
  * Is best suited only for encoding/decoding purposes.
  */
 public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceCountedByteBuf {
 
    private ByteBuffer buffer;
+   private int arrayOffset;
+   private byte[] array;
    private long memoryAddress;
 
-   /**
-    * Creates a new direct buffer by wrapping the specified initial buffer.
-    */
    public UnpooledUnsafeDirectByteBufWrapper() {
       super(0);
       this.buffer = null;
+      this.arrayOffset = -1;
+      this.array = null;
       this.memoryAddress = 0L;
    }
 
+   public void wrap(long address, int length) {
+      this.memoryAddress = address;
+      this.arrayOffset = -1;
+      this.array = null;
+      this.buffer = null;
+      clear();
+      maxCapacity(length);
+   }
+
+   public void wrap(byte[] array, int srcIndex, int length) {
+      if (array != null) {
+         this.memoryAddress = 0L;
+         this.arrayOffset = srcIndex;
+         this.array = array;
+         this.buffer = null;
+         clear();
+         maxCapacity(length);
+      } else {
+         reset();
+      }
+   }
+
    public void wrap(ByteBuffer buffer, int srcIndex, int length) {
       if (buffer != null) {
          this.buffer = buffer;
-         this.memoryAddress = PlatformDependent.directBufferAddress(buffer) + srcIndex;
+         if (buffer.isDirect()) {
+            this.memoryAddress = PlatformDependent.directBufferAddress(buffer) + srcIndex;
+            this.arrayOffset = -1;
+            this.array = null;
+         } else {
+            this.arrayOffset = srcIndex;
+            this.array = buffer.array();
+            this.memoryAddress = 0L;
+         }
          clear();
          maxCapacity(length);
       } else {
@@ -61,13 +93,15 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
    public void reset() {
       this.buffer = null;
       this.memoryAddress = 0L;
+      this.arrayOffset = -1;
+      this.array = null;
       clear();
       maxCapacity(0);
    }
 
    @Override
    public boolean isDirect() {
-      return true;
+      return buffer != null ? buffer.isDirect() : false;
    }
 
    @Override
@@ -95,22 +129,22 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
 
    @Override
    public boolean hasArray() {
-      return false;
+      return array != null;
    }
 
    @Override
    public byte[] array() {
-      throw new UnsupportedOperationException("direct buffer");
+      return array;
    }
 
    @Override
    public int arrayOffset() {
-      throw new UnsupportedOperationException("direct buffer");
+      return arrayOffset;
    }
 
    @Override
    public boolean hasMemoryAddress() {
-      return true;
+      return memoryAddress != 0;
    }
 
    @Override
@@ -120,70 +154,156 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
 
    @Override
    protected byte _getByte(int index) {
-      return UnsafeByteBufUtil.getByte(addr(index));
+      if (hasMemoryAddress()) {
+         return UnsafeByteBufUtil.getByte(addr(index));
+      } else {
+         return UnsafeByteBufUtil.getByte(array, idx(index));
+      }
    }
 
    @Override
    protected short _getShort(int index) {
-      return UnsafeByteBufUtil.getShort(addr(index));
+      if (hasMemoryAddress()) {
+         return UnsafeByteBufUtil.getShort(addr(index));
+      } else {
+         return UnsafeByteBufUtil.getShort(array, idx(index));
+      }
    }
 
    @Override
    protected short _getShortLE(int index) {
-      return UnsafeByteBufUtil.getShortLE(addr(index));
+      if (hasMemoryAddress()) {
+         return UnsafeByteBufUtil.getShortLE(addr(index));
+      } else {
+         return UnsafeByteBufUtil.getShortLE(array, idx(index));
+      }
    }
 
    @Override
    protected int _getUnsignedMedium(int index) {
-      return UnsafeByteBufUtil.getUnsignedMedium(addr(index));
+      if (hasMemoryAddress()) {
+         return UnsafeByteBufUtil.getUnsignedMedium(addr(index));
+      } else {
+         return UnsafeByteBufUtil.getUnsignedMedium(array, idx(index));
+      }
    }
 
    @Override
    protected int _getUnsignedMediumLE(int index) {
-      return UnsafeByteBufUtil.getUnsignedMediumLE(addr(index));
+      if (hasMemoryAddress()) {
+         return UnsafeByteBufUtil.getUnsignedMediumLE(addr(index));
+      } else {
+         return UnsafeByteBufUtil.getUnsignedMediumLE(array, idx(index));
+      }
    }
 
    @Override
    protected int _getInt(int index) {
-      return UnsafeByteBufUtil.getInt(addr(index));
+      if (hasMemoryAddress()) {
+         return UnsafeByteBufUtil.getInt(addr(index));
+      } else {
+         return UnsafeByteBufUtil.getInt(array, idx(index));
+      }
    }
 
    @Override
    protected int _getIntLE(int index) {
-      return UnsafeByteBufUtil.getIntLE(addr(index));
+      if (hasMemoryAddress()) {
+         return UnsafeByteBufUtil.getIntLE(addr(index));
+      } else {
+         return UnsafeByteBufUtil.getIntLE(array, idx(index));
+      }
    }
 
    @Override
    protected long _getLong(int index) {
-      return UnsafeByteBufUtil.getLong(addr(index));
+      if (hasMemoryAddress()) {
+         return UnsafeByteBufUtil.getLong(addr(index));
+      } else {
+         return UnsafeByteBufUtil.getLong(array, idx(index));
+      }
    }
 
    @Override
    protected long _getLongLE(int index) {
-      return UnsafeByteBufUtil.getLongLE(addr(index));
+      if (hasMemoryAddress()) {
+         return UnsafeByteBufUtil.getLongLE(addr(index));
+      } else {
+         return UnsafeByteBufUtil.getLongLE(array, idx(index));
+      }
    }
 
    @Override
    public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
-      UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
+      } else {
+         final int idx = idx(index);
+         checkDstIndex(idx, length, dstIndex, dst.capacity());
+         getBytes(array, idx, dst, dstIndex, length);
+      }
       return this;
    }
 
+   private static void getBytes(byte[] array, int idx, ByteBuf dst, int dstIndex, int length) {
+      if (dst.hasMemoryAddress()) {
+         PlatformDependent.copyMemory(array, idx, dst.memoryAddress() + dstIndex, length);
+      } else if (dst.hasArray()) {
+         System.arraycopy(array, idx, dst.array(), dst.arrayOffset() + dstIndex, length);
+      } else {
+         dst.setBytes(dstIndex, array, idx, length);
+      }
+   }
+
    @Override
    public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
-      UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
+      } else {
+         final int idx = idx(index);
+         checkDstIndex(idx, length, dstIndex, dst.length);
+         System.arraycopy(array, idx, dst, dstIndex, length);
+      }
       return this;
    }
 
    @Override
    public ByteBuf getBytes(int index, ByteBuffer dst) {
-      UnsafeByteBufUtil.getBytes(this, addr(index), index, dst);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.getBytes(this, addr(index), index, dst);
+      } else {
+         final int idx = idx(index);
+         checkIndex(idx, dst.remaining());
+         getBytes(array, idx, dst);
+      }
       return this;
    }
 
+   private static void getBytes(byte[] array, int idx, ByteBuffer dst) {
+      if (dst.remaining() == 0) {
+         return;
+      }
+      if (dst.isDirect()) {
+         if (dst.isReadOnly()) {
+            // We need to check if dst is ready-only so we not write something in it by using Unsafe.
+            throw new ReadOnlyBufferException();
+         }
+         // Copy to direct memory
+         final long dstAddress = PlatformDependent.directBufferAddress(dst);
+         PlatformDependent.copyMemory(array, idx, dstAddress + dst.position(), dst.remaining());
+         dst.position(dst.position() + dst.remaining());
+      } else if (dst.hasArray()) {
+         // Copy to array
+         System.arraycopy(array, idx, dst.array(), dst.arrayOffset() + dst.position(), dst.remaining());
+         dst.position(dst.position() + dst.remaining());
+      } else {
+         dst.put(array, idx, dst.remaining());
+      }
+   }
+
    @Override
    public ByteBuf readBytes(ByteBuffer dst) {
-      int length = dst.remaining();
+      final int length = dst.remaining();
       checkReadableBytes(length);
       getBytes(readerIndex, dst);
       readerIndex += length;
@@ -192,67 +312,150 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
 
    @Override
    protected void _setByte(int index, int value) {
-      UnsafeByteBufUtil.setByte(addr(index), value);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.setByte(addr(index), value);
+      } else {
+         UnsafeByteBufUtil.setByte(array, idx(index), value);
+      }
    }
 
    @Override
    protected void _setShort(int index, int value) {
-      UnsafeByteBufUtil.setShort(addr(index), value);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.setShort(addr(index), value);
+      } else {
+         UnsafeByteBufUtil.setShort(array, idx(index), value);
+      }
    }
 
    @Override
    protected void _setShortLE(int index, int value) {
-      UnsafeByteBufUtil.setShortLE(addr(index), value);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.setShortLE(addr(index), value);
+      } else {
+         UnsafeByteBufUtil.setShortLE(array, idx(index), value);
+      }
    }
 
    @Override
    protected void _setMedium(int index, int value) {
-      UnsafeByteBufUtil.setMedium(addr(index), value);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.setMedium(addr(index), value);
+      } else {
+         UnsafeByteBufUtil.setMedium(array, idx(index), value);
+      }
    }
 
    @Override
    protected void _setMediumLE(int index, int value) {
-      UnsafeByteBufUtil.setMediumLE(addr(index), value);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.setMediumLE(addr(index), value);
+      } else {
+         UnsafeByteBufUtil.setMediumLE(array, idx(index), value);
+      }
    }
 
    @Override
    protected void _setInt(int index, int value) {
-      UnsafeByteBufUtil.setInt(addr(index), value);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.setInt(addr(index), value);
+      } else {
+         UnsafeByteBufUtil.setInt(array, idx(index), value);
+      }
    }
 
    @Override
    protected void _setIntLE(int index, int value) {
-      UnsafeByteBufUtil.setIntLE(addr(index), value);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.setIntLE(addr(index), value);
+      } else {
+         UnsafeByteBufUtil.setIntLE(array, idx(index), value);
+      }
    }
 
    @Override
    protected void _setLong(int index, long value) {
-      UnsafeByteBufUtil.setLong(addr(index), value);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.setLong(addr(index), value);
+      } else {
+         UnsafeByteBufUtil.setLong(array, idx(index), value);
+      }
    }
 
    @Override
    protected void _setLongLE(int index, long value) {
-      UnsafeByteBufUtil.setLongLE(addr(index), value);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.setLongLE(addr(index), value);
+      } else {
+         UnsafeByteBufUtil.setLongLE(array, idx(index), value);
+      }
    }
 
    @Override
    public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
-      UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
+      } else {
+         final int idx = idx(index);
+         checkSrcIndex(idx, length, srcIndex, src.capacity());
+         setBytes(array, idx, src, srcIndex, length);
+      }
       return this;
    }
 
+   private static void setBytes(byte[] array, int idx, ByteBuf src, int srcIndex, int length) {
+      if (src.hasMemoryAddress()) {
+         PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, idx, length);
+      } else if (src.hasArray()) {
+         System.arraycopy(src.array(), src.arrayOffset() + srcIndex, array, idx, length);
+      } else {
+         src.getBytes(srcIndex, array, idx, length);
+      }
+   }
+
    @Override
    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
-      UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
+      } else {
+         final int idx = idx(index);
+         checkSrcIndex(idx, length, srcIndex, src.length);
+         System.arraycopy(src, srcIndex, array, idx, length);
+      }
       return this;
    }
 
    @Override
    public ByteBuf setBytes(int index, ByteBuffer src) {
-      UnsafeByteBufUtil.setBytes(this, addr(index), index, src);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.setBytes(this, addr(index), index, src);
+      } else {
+         final int idx = idx(index);
+         checkSrcIndex(idx, src.remaining(), src.position(), src.capacity());
+         setBytes(array, idx(index), src);
+      }
       return this;
    }
 
+   private static void setBytes(byte[] array, int idx, ByteBuffer src) {
+      final int length = src.remaining();
+      if (length == 0) {
+         return;
+      }
+      if (src.isDirect()) {
+         // Copy from direct memory
+         final long srcAddress = PlatformDependent.directBufferAddress(src);
+         PlatformDependent.copyMemory(srcAddress + src.position(), array, idx, length);
+         src.position(src.position() + length);
+      } else if (src.hasArray()) {
+         // Copy from array
+         System.arraycopy(src.array(), src.arrayOffset() + src.position(), array, idx, length);
+         src.position(src.position() + length);
+      } else {
+         src.get(array, idx, src.remaining());
+      }
+   }
+
    @Override
    @Deprecated
    public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
@@ -303,7 +506,7 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
 
    @Override
    public int nioBufferCount() {
-      return 1;
+      return buffer == null ? 0 : 1;
    }
 
    @Override
@@ -347,6 +550,10 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
       return memoryAddress + index;
    }
 
+   private int idx(int index) {
+      return arrayOffset + index;
+   }
+
    @Override
    @Deprecated
    protected SwappedByteBuf newSwappedByteBuf() {
@@ -355,7 +562,12 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
 
    @Override
    public ByteBuf setZero(int index, int length) {
-      UnsafeByteBufUtil.setZero(this, addr(index), index, length);
+      if (hasMemoryAddress()) {
+         UnsafeByteBufUtil.setZero(this, addr(index), index, length);
+      } else {
+         //prefer Arrays::fill here?
+         UnsafeByteBufUtil.setZero(array, idx(index), length);
+      }
       return this;
    }
 


[2/2] activemq-artemis git commit: This closes #1238

Posted by cl...@apache.org.
This closes #1238


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/938c7a1d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/938c7a1d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/938c7a1d

Branch: refs/heads/master
Commit: 938c7a1d9916c2bdb3f4c7bca0c103f579a6f0b6
Parents: 6df8c3a 2cdc625
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon May 1 10:09:34 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon May 1 10:09:34 2017 -0400

----------------------------------------------------------------------
 .../UnpooledUnsafeDirectByteBufWrapper.java     | 288 ++++++++++++++++---
 1 file changed, 250 insertions(+), 38 deletions(-)
----------------------------------------------------------------------