You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/05/01 14:09:43 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1136 Improve
UnpooledUnsafeDirectByteBufWrapper
Repository: activemq-artemis
Updated Branches:
refs/heads/master 6df8c3a28 -> 938c7a1d9
ARTEMIS-1136 Improve UnpooledUnsafeDirectByteBufWrapper
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2cdc6257
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2cdc6257
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2cdc6257
Branch: refs/heads/master
Commit: 2cdc62572bb42de0f98d96e53fb47dc89012a30c
Parents: 6df8c3a
Author: Francesco Nigro <ni...@gmail.com>
Authored: Sun Apr 30 08:11:41 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon May 1 10:09:33 2017 -0400
----------------------------------------------------------------------
.../UnpooledUnsafeDirectByteBufWrapper.java | 288 ++++++++++++++++---
1 file changed, 250 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2cdc6257/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java b/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java
index 0da33c6..a4346dc 100644
--- a/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java
+++ b/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.ReadOnlyBufferException;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
@@ -29,28 +30,59 @@ import java.nio.channels.ScatteringByteChannel;
import io.netty.util.internal.PlatformDependent;
/**
- * A NIO direct {@link ByteBuffer} wrapper.
- * Only ByteBuffer's manipulation operations are supported.
+ * A NIO {@link ByteBuffer}, byte[] and address direct access wrapper.
+ * Only content manipulation operations are supported.
* Is best suited only for encoding/decoding purposes.
*/
public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceCountedByteBuf {
private ByteBuffer buffer;
+ private int arrayOffset;
+ private byte[] array;
private long memoryAddress;
- /**
- * Creates a new direct buffer by wrapping the specified initial buffer.
- */
public UnpooledUnsafeDirectByteBufWrapper() {
super(0);
this.buffer = null;
+ this.arrayOffset = -1;
+ this.array = null;
this.memoryAddress = 0L;
}
+ public void wrap(long address, int length) {
+ this.memoryAddress = address;
+ this.arrayOffset = -1;
+ this.array = null;
+ this.buffer = null;
+ clear();
+ maxCapacity(length);
+ }
+
+ public void wrap(byte[] array, int srcIndex, int length) {
+ if (array != null) {
+ this.memoryAddress = 0L;
+ this.arrayOffset = srcIndex;
+ this.array = array;
+ this.buffer = null;
+ clear();
+ maxCapacity(length);
+ } else {
+ reset();
+ }
+ }
+
public void wrap(ByteBuffer buffer, int srcIndex, int length) {
if (buffer != null) {
this.buffer = buffer;
- this.memoryAddress = PlatformDependent.directBufferAddress(buffer) + srcIndex;
+ if (buffer.isDirect()) {
+ this.memoryAddress = PlatformDependent.directBufferAddress(buffer) + srcIndex;
+ this.arrayOffset = -1;
+ this.array = null;
+ } else {
+ this.arrayOffset = srcIndex;
+ this.array = buffer.array();
+ this.memoryAddress = 0L;
+ }
clear();
maxCapacity(length);
} else {
@@ -61,13 +93,15 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
public void reset() {
this.buffer = null;
this.memoryAddress = 0L;
+ this.arrayOffset = -1;
+ this.array = null;
clear();
maxCapacity(0);
}
@Override
public boolean isDirect() {
- return true;
+ return buffer != null ? buffer.isDirect() : false;
}
@Override
@@ -95,22 +129,22 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
@Override
public boolean hasArray() {
- return false;
+ return array != null;
}
@Override
public byte[] array() {
- throw new UnsupportedOperationException("direct buffer");
+ return array;
}
@Override
public int arrayOffset() {
- throw new UnsupportedOperationException("direct buffer");
+ return arrayOffset;
}
@Override
public boolean hasMemoryAddress() {
- return true;
+ return memoryAddress != 0;
}
@Override
@@ -120,70 +154,156 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
@Override
protected byte _getByte(int index) {
- return UnsafeByteBufUtil.getByte(addr(index));
+ if (hasMemoryAddress()) {
+ return UnsafeByteBufUtil.getByte(addr(index));
+ } else {
+ return UnsafeByteBufUtil.getByte(array, idx(index));
+ }
}
@Override
protected short _getShort(int index) {
- return UnsafeByteBufUtil.getShort(addr(index));
+ if (hasMemoryAddress()) {
+ return UnsafeByteBufUtil.getShort(addr(index));
+ } else {
+ return UnsafeByteBufUtil.getShort(array, idx(index));
+ }
}
@Override
protected short _getShortLE(int index) {
- return UnsafeByteBufUtil.getShortLE(addr(index));
+ if (hasMemoryAddress()) {
+ return UnsafeByteBufUtil.getShortLE(addr(index));
+ } else {
+ return UnsafeByteBufUtil.getShortLE(array, idx(index));
+ }
}
@Override
protected int _getUnsignedMedium(int index) {
- return UnsafeByteBufUtil.getUnsignedMedium(addr(index));
+ if (hasMemoryAddress()) {
+ return UnsafeByteBufUtil.getUnsignedMedium(addr(index));
+ } else {
+ return UnsafeByteBufUtil.getUnsignedMedium(array, idx(index));
+ }
}
@Override
protected int _getUnsignedMediumLE(int index) {
- return UnsafeByteBufUtil.getUnsignedMediumLE(addr(index));
+ if (hasMemoryAddress()) {
+ return UnsafeByteBufUtil.getUnsignedMediumLE(addr(index));
+ } else {
+ return UnsafeByteBufUtil.getUnsignedMediumLE(array, idx(index));
+ }
}
@Override
protected int _getInt(int index) {
- return UnsafeByteBufUtil.getInt(addr(index));
+ if (hasMemoryAddress()) {
+ return UnsafeByteBufUtil.getInt(addr(index));
+ } else {
+ return UnsafeByteBufUtil.getInt(array, idx(index));
+ }
}
@Override
protected int _getIntLE(int index) {
- return UnsafeByteBufUtil.getIntLE(addr(index));
+ if (hasMemoryAddress()) {
+ return UnsafeByteBufUtil.getIntLE(addr(index));
+ } else {
+ return UnsafeByteBufUtil.getIntLE(array, idx(index));
+ }
}
@Override
protected long _getLong(int index) {
- return UnsafeByteBufUtil.getLong(addr(index));
+ if (hasMemoryAddress()) {
+ return UnsafeByteBufUtil.getLong(addr(index));
+ } else {
+ return UnsafeByteBufUtil.getLong(array, idx(index));
+ }
}
@Override
protected long _getLongLE(int index) {
- return UnsafeByteBufUtil.getLongLE(addr(index));
+ if (hasMemoryAddress()) {
+ return UnsafeByteBufUtil.getLongLE(addr(index));
+ } else {
+ return UnsafeByteBufUtil.getLongLE(array, idx(index));
+ }
}
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
- UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
+ } else {
+ final int idx = idx(index);
+ checkDstIndex(idx, length, dstIndex, dst.capacity());
+ getBytes(array, idx, dst, dstIndex, length);
+ }
return this;
}
+ private static void getBytes(byte[] array, int idx, ByteBuf dst, int dstIndex, int length) {
+ if (dst.hasMemoryAddress()) {
+ PlatformDependent.copyMemory(array, idx, dst.memoryAddress() + dstIndex, length);
+ } else if (dst.hasArray()) {
+ System.arraycopy(array, idx, dst.array(), dst.arrayOffset() + dstIndex, length);
+ } else {
+ dst.setBytes(dstIndex, array, idx, length);
+ }
+ }
+
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
- UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
+ } else {
+ final int idx = idx(index);
+ checkDstIndex(idx, length, dstIndex, dst.length);
+ System.arraycopy(array, idx, dst, dstIndex, length);
+ }
return this;
}
@Override
public ByteBuf getBytes(int index, ByteBuffer dst) {
- UnsafeByteBufUtil.getBytes(this, addr(index), index, dst);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.getBytes(this, addr(index), index, dst);
+ } else {
+ final int idx = idx(index);
+ checkIndex(idx, dst.remaining());
+ getBytes(array, idx, dst);
+ }
return this;
}
+ private static void getBytes(byte[] array, int idx, ByteBuffer dst) {
+ if (dst.remaining() == 0) {
+ return;
+ }
+ if (dst.isDirect()) {
+ if (dst.isReadOnly()) {
+ // We need to check if dst is ready-only so we not write something in it by using Unsafe.
+ throw new ReadOnlyBufferException();
+ }
+ // Copy to direct memory
+ final long dstAddress = PlatformDependent.directBufferAddress(dst);
+ PlatformDependent.copyMemory(array, idx, dstAddress + dst.position(), dst.remaining());
+ dst.position(dst.position() + dst.remaining());
+ } else if (dst.hasArray()) {
+ // Copy to array
+ System.arraycopy(array, idx, dst.array(), dst.arrayOffset() + dst.position(), dst.remaining());
+ dst.position(dst.position() + dst.remaining());
+ } else {
+ dst.put(array, idx, dst.remaining());
+ }
+ }
+
@Override
public ByteBuf readBytes(ByteBuffer dst) {
- int length = dst.remaining();
+ final int length = dst.remaining();
checkReadableBytes(length);
getBytes(readerIndex, dst);
readerIndex += length;
@@ -192,67 +312,150 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
@Override
protected void _setByte(int index, int value) {
- UnsafeByteBufUtil.setByte(addr(index), value);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.setByte(addr(index), value);
+ } else {
+ UnsafeByteBufUtil.setByte(array, idx(index), value);
+ }
}
@Override
protected void _setShort(int index, int value) {
- UnsafeByteBufUtil.setShort(addr(index), value);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.setShort(addr(index), value);
+ } else {
+ UnsafeByteBufUtil.setShort(array, idx(index), value);
+ }
}
@Override
protected void _setShortLE(int index, int value) {
- UnsafeByteBufUtil.setShortLE(addr(index), value);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.setShortLE(addr(index), value);
+ } else {
+ UnsafeByteBufUtil.setShortLE(array, idx(index), value);
+ }
}
@Override
protected void _setMedium(int index, int value) {
- UnsafeByteBufUtil.setMedium(addr(index), value);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.setMedium(addr(index), value);
+ } else {
+ UnsafeByteBufUtil.setMedium(array, idx(index), value);
+ }
}
@Override
protected void _setMediumLE(int index, int value) {
- UnsafeByteBufUtil.setMediumLE(addr(index), value);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.setMediumLE(addr(index), value);
+ } else {
+ UnsafeByteBufUtil.setMediumLE(array, idx(index), value);
+ }
}
@Override
protected void _setInt(int index, int value) {
- UnsafeByteBufUtil.setInt(addr(index), value);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.setInt(addr(index), value);
+ } else {
+ UnsafeByteBufUtil.setInt(array, idx(index), value);
+ }
}
@Override
protected void _setIntLE(int index, int value) {
- UnsafeByteBufUtil.setIntLE(addr(index), value);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.setIntLE(addr(index), value);
+ } else {
+ UnsafeByteBufUtil.setIntLE(array, idx(index), value);
+ }
}
@Override
protected void _setLong(int index, long value) {
- UnsafeByteBufUtil.setLong(addr(index), value);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.setLong(addr(index), value);
+ } else {
+ UnsafeByteBufUtil.setLong(array, idx(index), value);
+ }
}
@Override
protected void _setLongLE(int index, long value) {
- UnsafeByteBufUtil.setLongLE(addr(index), value);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.setLongLE(addr(index), value);
+ } else {
+ UnsafeByteBufUtil.setLongLE(array, idx(index), value);
+ }
}
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
- UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
+ } else {
+ final int idx = idx(index);
+ checkSrcIndex(idx, length, srcIndex, src.capacity());
+ setBytes(array, idx, src, srcIndex, length);
+ }
return this;
}
+ private static void setBytes(byte[] array, int idx, ByteBuf src, int srcIndex, int length) {
+ if (src.hasMemoryAddress()) {
+ PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, idx, length);
+ } else if (src.hasArray()) {
+ System.arraycopy(src.array(), src.arrayOffset() + srcIndex, array, idx, length);
+ } else {
+ src.getBytes(srcIndex, array, idx, length);
+ }
+ }
+
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
- UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
+ } else {
+ final int idx = idx(index);
+ checkSrcIndex(idx, length, srcIndex, src.length);
+ System.arraycopy(src, srcIndex, array, idx, length);
+ }
return this;
}
@Override
public ByteBuf setBytes(int index, ByteBuffer src) {
- UnsafeByteBufUtil.setBytes(this, addr(index), index, src);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.setBytes(this, addr(index), index, src);
+ } else {
+ final int idx = idx(index);
+ checkSrcIndex(idx, src.remaining(), src.position(), src.capacity());
+ setBytes(array, idx(index), src);
+ }
return this;
}
+ private static void setBytes(byte[] array, int idx, ByteBuffer src) {
+ final int length = src.remaining();
+ if (length == 0) {
+ return;
+ }
+ if (src.isDirect()) {
+ // Copy from direct memory
+ final long srcAddress = PlatformDependent.directBufferAddress(src);
+ PlatformDependent.copyMemory(srcAddress + src.position(), array, idx, length);
+ src.position(src.position() + length);
+ } else if (src.hasArray()) {
+ // Copy from array
+ System.arraycopy(src.array(), src.arrayOffset() + src.position(), array, idx, length);
+ src.position(src.position() + length);
+ } else {
+ src.get(array, idx, src.remaining());
+ }
+ }
+
@Override
@Deprecated
public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
@@ -303,7 +506,7 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
@Override
public int nioBufferCount() {
- return 1;
+ return buffer == null ? 0 : 1;
}
@Override
@@ -347,6 +550,10 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
return memoryAddress + index;
}
+ private int idx(int index) {
+ return arrayOffset + index;
+ }
+
@Override
@Deprecated
protected SwappedByteBuf newSwappedByteBuf() {
@@ -355,7 +562,12 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
@Override
public ByteBuf setZero(int index, int length) {
- UnsafeByteBufUtil.setZero(this, addr(index), index, length);
+ if (hasMemoryAddress()) {
+ UnsafeByteBufUtil.setZero(this, addr(index), index, length);
+ } else {
+ //prefer Arrays::fill here?
+ UnsafeByteBufUtil.setZero(array, idx(index), length);
+ }
return this;
}
[2/2] activemq-artemis git commit: This closes #1238
Posted by cl...@apache.org.
This closes #1238
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/938c7a1d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/938c7a1d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/938c7a1d
Branch: refs/heads/master
Commit: 938c7a1d9916c2bdb3f4c7bca0c103f579a6f0b6
Parents: 6df8c3a 2cdc625
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon May 1 10:09:34 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon May 1 10:09:34 2017 -0400
----------------------------------------------------------------------
.../UnpooledUnsafeDirectByteBufWrapper.java | 288 ++++++++++++++++---
1 file changed, 250 insertions(+), 38 deletions(-)
----------------------------------------------------------------------