You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/06/14 22:32:13 UTC
[qpid-protonj2] branch main updated: PROTON-2564 Reduece memory alloactions on send and receive paths
This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 10747f4a PROTON-2564 Reduece memory alloactions on send and receive paths
10747f4a is described below
commit 10747f4a58819a29ba5fdcc09755b3f0e1440fd3
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Tue Jun 14 17:58:27 2022 -0400
PROTON-2564 Reduece memory alloactions on send and receive paths
Reduces the amount of memory allocations in the engine and client on
both the send and receive paths during normal operations.
---
.../protonj2/client/impl/ClientMessageSupport.java | 9 ++-
.../protonj2/client/util/FifoDeliveryQueue.java | 75 +++++--------------
.../protonj2/client/impl/ClientMessageTest.java | 1 +
.../codec/decoders/messaging/DataTypeDecoder.java | 6 +-
.../ApplicationPropertiesTypeEncoder.java | 12 ++-
.../codec/encoders/messaging/DataTypeEncoder.java | 9 +--
.../messaging/DeliveryAnnotationsTypeEncoder.java | 12 ++-
.../encoders/messaging/FooterTypeEncoder.java | 12 ++-
.../messaging/MessageAnnotationsTypeEncoder.java | 12 ++-
.../codec/encoders/primitives/MapTypeEncoder.java | 22 ++----
.../apache/qpid/protonj2/types/messaging/Data.java | 86 ++++++++++++++++------
.../qpid/protonj2/codec/benchmark/Benchmark.java | 7 +-
.../codec/messaging/DataTypeCodecTest.java | 2 +-
.../qpid/protonj2/types/messaging/DataTest.java | 52 +++++++++++++
14 files changed, 182 insertions(+), 135 deletions(-)
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessageSupport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessageSupport.java
index a7cba17d..cf4ad91f 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessageSupport.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessageSupport.java
@@ -54,6 +54,11 @@ public abstract class ClientMessageSupport {
private static final Encoder DEFAULT_ENCODER = CodecFactory.getDefaultEncoder();
private static final Decoder DEFAULT_DECODER = CodecFactory.getDefaultDecoder();
+ private static final ThreadLocal<EncoderState> THREAD_LOCAL_ENCODER_STATE =
+ ThreadLocal.withInitial(() -> DEFAULT_ENCODER.newEncoderState());
+ private static final ThreadLocal<DecoderState> THREAD_LOCAL_DECODER_STATE =
+ ThreadLocal.withInitial(() -> DEFAULT_DECODER.newDecoderState());
+
//----- Message Conversion
/**
@@ -92,7 +97,7 @@ public abstract class ClientMessageSupport {
//----- Message Encoding
public static ProtonBuffer encodeMessage(AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations) throws ClientException {
- return encodeMessage(DEFAULT_ENCODER, DEFAULT_ENCODER.newEncoderState(), ProtonByteBufferAllocator.DEFAULT, message, deliveryAnnotations);
+ return encodeMessage(DEFAULT_ENCODER, THREAD_LOCAL_ENCODER_STATE.get(), ProtonByteBufferAllocator.DEFAULT, message, deliveryAnnotations);
}
public static ProtonBuffer encodeMessage(Encoder encoder, ProtonBufferAllocator allocator, AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations) throws ClientException {
@@ -136,7 +141,7 @@ public abstract class ClientMessageSupport {
//----- Message Decoding
public static Message<?> decodeMessage(ProtonBuffer buffer, Consumer<DeliveryAnnotations> daConsumer) throws ClientException {
- return decodeMessage(DEFAULT_DECODER, DEFAULT_DECODER.newDecoderState(), buffer, daConsumer);
+ return decodeMessage(DEFAULT_DECODER, THREAD_LOCAL_DECODER_STATE.get(), buffer, daConsumer);
}
public static Message<?> decodeMessage(Decoder decoder, ProtonBuffer buffer, Consumer<DeliveryAnnotations> daConsumer) throws ClientException {
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
index 634b1ec6..91fbaebe 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
@@ -20,8 +20,6 @@ import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.qpid.protonj2.client.Delivery;
import org.apache.qpid.protonj2.client.impl.ClientDelivery;
@@ -40,9 +38,6 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
private volatile int state = STOPPED;
- private final ReentrantLock lock = new ReentrantLock();
- private final Condition condition = lock.newCondition();
-
private final Deque<ClientDelivery> queue;
/**
@@ -57,38 +52,30 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
@Override
public void enqueueFirst(ClientDelivery envelope) {
- lock.lock();
- try {
+ synchronized (queue) {
queue.addFirst(envelope);
- condition.signal();
- } finally {
- lock.unlock();
+ queue.notify();
}
}
@Override
public void enqueue(ClientDelivery envelope) {
- lock.lock();
- try {
+ synchronized (queue) {
queue.addLast(envelope);
- condition.signal();
- } finally {
- lock.unlock();
+ queue.notify();
}
}
-
@Override
public ClientDelivery dequeue(long timeout) throws InterruptedException {
- lock.lock();
- try {
+ synchronized (queue) {
// Wait until the receiver is ready to deliver messages.
while (timeout != 0 && isRunning() && queue.isEmpty()) {
if (timeout == -1) {
- condition.await();
+ queue.wait();
} else {
long start = System.currentTimeMillis();
- condition.await(timeout, TimeUnit.MILLISECONDS);
+ queue.wait(TimeUnit.MILLISECONDS.toMillis(timeout));
timeout = Math.max(timeout + start - System.currentTimeMillis(), 0);
}
}
@@ -98,33 +85,25 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
}
return queue.pollFirst();
- } finally {
- lock.unlock();
}
}
@Override
public ClientDelivery dequeueNoWait() {
- lock.lock();
- try {
+ synchronized (queue) {
if (!isRunning()) {
return null;
}
return queue.pollFirst();
- } finally {
- lock.unlock();
}
}
@Override
public void start() {
if (STATE_FIELD_UPDATER.compareAndSet(this, STOPPED, RUNNING)) {
- lock.lock();
- try {
- condition.signalAll();
- } finally {
- lock.unlock();
+ synchronized (queue) {
+ queue.notifyAll();
}
}
}
@@ -132,11 +111,8 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
@Override
public void stop() {
if (STATE_FIELD_UPDATER.compareAndSet(this, RUNNING, STOPPED)) {
- lock.lock();
- try {
- condition.signalAll();
- } finally {
- lock.unlock();
+ synchronized (queue) {
+ queue.notifyAll();
}
}
}
@@ -144,11 +120,8 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
@Override
public void close() {
if (STATE_FIELD_UPDATER.getAndSet(this, CLOSED) > CLOSED) {
- lock.lock();
- try {
- condition.signalAll();
- } finally {
- lock.unlock();
+ synchronized (queue) {
+ queue.notifyAll();
}
}
}
@@ -165,41 +138,29 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
@Override
public boolean isEmpty() {
- lock.lock();
- try {
+ synchronized (queue) {
return queue.isEmpty();
- } finally {
- lock.unlock();
}
}
@Override
public int size() {
- lock.lock();
- try {
+ synchronized (queue) {
return queue.size();
- } finally {
- lock.unlock();
}
}
@Override
public void clear() {
- lock.lock();
- try {
+ synchronized (queue) {
queue.clear();
- } finally {
- lock.unlock();
}
}
@Override
public String toString() {
- lock.lock();
- try {
+ synchronized (queue) {
return queue.toString();
- } finally {
- lock.unlock();
}
}
}
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientMessageTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientMessageTest.java
index 1a695b2c..c41a16f5 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientMessageTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientMessageTest.java
@@ -366,6 +366,7 @@ class ClientMessageTest {
message.bodySections().forEach(section -> {
assertTrue(section instanceof Data);
final Data dataView = (Data) section;
+ assertEquals(counter.get(), dataView.getBuffer().getArray()[0]);
assertEquals(counter.getAndIncrement(), dataView.getBinary().getArray()[0]);
});
}
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/messaging/DataTypeDecoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/messaging/DataTypeDecoder.java
index 5b274670..b0221db5 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/messaging/DataTypeDecoder.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/messaging/DataTypeDecoder.java
@@ -39,7 +39,7 @@ import org.apache.qpid.protonj2.types.messaging.Data;
*/
public final class DataTypeDecoder extends AbstractDescribedTypeDecoder<Data> {
- private static final Data EMPTY_DATA = new Data((Binary) null);
+ private static final Data EMPTY_DATA = new Data((ProtonBuffer) null);
@Override
public Class<Data> getTypeClass() {
@@ -86,7 +86,7 @@ public final class DataTypeDecoder extends AbstractDescribedTypeDecoder<Data> {
data.setWriteIndex(size);
buffer.setReadIndex(position + size);
- return new Data(new Binary(data));
+ return new Data(data);
}
@Override
@@ -130,7 +130,7 @@ public final class DataTypeDecoder extends AbstractDescribedTypeDecoder<Data> {
throw new DecodeException("Expected Binary type but found encoding: " + encodingCode);
}
- return new Data(new Binary(ProtonStreamUtils.readBytes(stream, size)));
+ return new Data(ProtonStreamUtils.readBytes(stream, size));
}
@Override
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/ApplicationPropertiesTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/ApplicationPropertiesTypeEncoder.java
index 912a6ef9..f80b5626 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/ApplicationPropertiesTypeEncoder.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/ApplicationPropertiesTypeEncoder.java
@@ -16,8 +16,6 @@
*/
package org.apache.qpid.protonj2.codec.encoders.messaging;
-import java.util.Map;
-
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.codec.EncoderState;
import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedMapTypeEncoder;
@@ -60,11 +58,11 @@ public final class ApplicationPropertiesTypeEncoder extends AbstractDescribedMap
}
@Override
- public void writeMapEntries(ProtonBuffer buffer, EncoderState state, ApplicationProperties value) {
+ public void writeMapEntries(ProtonBuffer buffer, EncoderState state, ApplicationProperties properties) {
// Write the Map elements and then compute total size written.
- for (Map.Entry<String, Object> entry : value.getValue().entrySet()) {
- state.getEncoder().writeString(buffer, state, entry.getKey());
- state.getEncoder().writeObject(buffer, state, entry.getValue());
- }
+ properties.getValue().forEach((key, value) -> {
+ state.getEncoder().writeString(buffer, state, key);
+ state.getEncoder().writeObject(buffer, state, value);
+ });
}
}
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java
index a6ec3284..8038fc6e 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java
@@ -20,7 +20,6 @@ import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.codec.EncoderState;
import org.apache.qpid.protonj2.codec.EncodingCodes;
import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedTypeEncoder;
-import org.apache.qpid.protonj2.types.Binary;
import org.apache.qpid.protonj2.types.Symbol;
import org.apache.qpid.protonj2.types.UnsignedLong;
import org.apache.qpid.protonj2.types.messaging.Data;
@@ -51,7 +50,7 @@ public final class DataTypeEncoder extends AbstractDescribedTypeEncoder<Data> {
buffer.writeByte(EncodingCodes.SMALLULONG);
buffer.writeByte(Data.DESCRIPTOR_CODE.byteValue());
- state.getEncoder().writeBinary(buffer, state, value.getValue());
+ state.getEncoder().writeBinary(buffer, state, value.getBuffer());
}
@Override
@@ -85,9 +84,9 @@ public final class DataTypeEncoder extends AbstractDescribedTypeEncoder<Data> {
buffer.writeByte(EncodingCodes.VBIN32);
for (Object value : values) {
- final Binary binary = ((Data) value).getBinary();
- buffer.writeInt(binary.getLength());
- buffer.writeBytes(binary.getArray(), binary.getArrayOffset(), binary.getLength());
+ final ProtonBuffer binary = ((Data) value).getBuffer();
+ buffer.writeInt(binary.getReadableBytes());
+ buffer.writeBytes(binary.getArray(), binary.getArrayOffset(), binary.getReadableBytes());
}
}
}
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DeliveryAnnotationsTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DeliveryAnnotationsTypeEncoder.java
index 774f8e6a..769a3ca6 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DeliveryAnnotationsTypeEncoder.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DeliveryAnnotationsTypeEncoder.java
@@ -16,8 +16,6 @@
*/
package org.apache.qpid.protonj2.codec.encoders.messaging;
-import java.util.Map;
-
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.codec.EncoderState;
import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedMapTypeEncoder;
@@ -60,11 +58,11 @@ public final class DeliveryAnnotationsTypeEncoder extends AbstractDescribedMapTy
}
@Override
- public void writeMapEntries(ProtonBuffer buffer, EncoderState state, DeliveryAnnotations value) {
+ public void writeMapEntries(ProtonBuffer buffer, EncoderState state, DeliveryAnnotations annotations) {
// Write the Map elements and then compute total size written.
- for (Map.Entry<Symbol, Object> entry : value.getValue().entrySet()) {
- state.getEncoder().writeSymbol(buffer, state, entry.getKey());
- state.getEncoder().writeObject(buffer, state, entry.getValue());
- }
+ annotations.getValue().forEach((key, value) -> {
+ state.getEncoder().writeSymbol(buffer, state, key);
+ state.getEncoder().writeObject(buffer, state, value);
+ });
}
}
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/FooterTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/FooterTypeEncoder.java
index 5bb2fd29..c584e537 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/FooterTypeEncoder.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/FooterTypeEncoder.java
@@ -16,8 +16,6 @@
*/
package org.apache.qpid.protonj2.codec.encoders.messaging;
-import java.util.Map;
-
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.codec.EncoderState;
import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedMapTypeEncoder;
@@ -60,11 +58,11 @@ public final class FooterTypeEncoder extends AbstractDescribedMapTypeEncoder<Obj
}
@Override
- public void writeMapEntries(ProtonBuffer buffer, EncoderState state, Footer value) {
+ public void writeMapEntries(ProtonBuffer buffer, EncoderState state, Footer footers) {
// Write the Map elements and then compute total size written.
- for (Map.Entry<Symbol, Object> entry : value.getValue().entrySet()) {
- state.getEncoder().writeObject(buffer, state, entry.getKey());
- state.getEncoder().writeObject(buffer, state, entry.getValue());
- }
+ footers.getValue().forEach((key, value) -> {
+ state.getEncoder().writeObject(buffer, state, key);
+ state.getEncoder().writeObject(buffer, state, value);
+ });
}
}
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/MessageAnnotationsTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/MessageAnnotationsTypeEncoder.java
index b4233aa7..02b692f1 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/MessageAnnotationsTypeEncoder.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/MessageAnnotationsTypeEncoder.java
@@ -16,8 +16,6 @@
*/
package org.apache.qpid.protonj2.codec.encoders.messaging;
-import java.util.Map;
-
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.codec.EncoderState;
import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedMapTypeEncoder;
@@ -60,11 +58,11 @@ public final class MessageAnnotationsTypeEncoder extends AbstractDescribedMapTyp
}
@Override
- public void writeMapEntries(ProtonBuffer buffer, EncoderState state, MessageAnnotations value) {
+ public void writeMapEntries(ProtonBuffer buffer, EncoderState state, MessageAnnotations annotations) {
// Write the Map elements and then compute total size written.
- for (Map.Entry<Symbol, Object> entry : value.getValue().entrySet()) {
- state.getEncoder().writeSymbol(buffer, state, entry.getKey());
- state.getEncoder().writeObject(buffer, state, entry.getValue());
- }
+ annotations.getValue().forEach((key, value) -> {
+ state.getEncoder().writeSymbol(buffer, state, key);
+ state.getEncoder().writeObject(buffer, state, value);
+ });
}
}
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/primitives/MapTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/primitives/MapTypeEncoder.java
index 0f34276c..80360148 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/primitives/MapTypeEncoder.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/primitives/MapTypeEncoder.java
@@ -17,8 +17,6 @@
package org.apache.qpid.protonj2.codec.encoders.primitives;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.codec.EncodeException;
@@ -62,25 +60,21 @@ public final class MapTypeEncoder extends AbstractPrimitiveTypeEncoder<Map> {
buffer.writeInt(value.size() * 2);
// Write the list elements and then compute total size written.
- Set<Map.Entry> entries = value.entrySet();
- for (Entry entry : entries) {
- Object entryKey = entry.getKey();
- Object entryValue = entry.getValue();
-
- TypeEncoder keyEncoder = state.getEncoder().getTypeEncoder(entryKey);
+ value.forEach((key, entry) -> {
+ TypeEncoder keyEncoder = state.getEncoder().getTypeEncoder(key);
if (keyEncoder == null) {
- throw new EncodeException("Cannot find encoder for type " + entryKey);
+ throw new EncodeException("Cannot find encoder for type " + key);
}
- keyEncoder.writeType(buffer, state, entryKey);
+ keyEncoder.writeType(buffer, state, key);
- TypeEncoder valueEncoder = state.getEncoder().getTypeEncoder(entryValue);
+ TypeEncoder valueEncoder = state.getEncoder().getTypeEncoder(entry);
if (valueEncoder == null) {
- throw new EncodeException("Cannot find encoder for type " + entryValue);
+ throw new EncodeException("Cannot find encoder for type " + entry);
}
- valueEncoder.writeType(buffer, state, entryValue);
- }
+ valueEncoder.writeType(buffer, state, entry);
+ });
// Move back and write the size
int endIndex = buffer.getWriteIndex();
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java b/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java
index ff872ba7..1856828d 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java
@@ -17,6 +17,7 @@
package org.apache.qpid.protonj2.types.messaging;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
+import org.apache.qpid.protonj2.buffer.ProtonByteBufferAllocator;
import org.apache.qpid.protonj2.types.Binary;
import org.apache.qpid.protonj2.types.Symbol;
import org.apache.qpid.protonj2.types.UnsignedLong;
@@ -26,30 +27,51 @@ public final class Data implements Section<byte[]> {
public static final UnsignedLong DESCRIPTOR_CODE = UnsignedLong.valueOf(0x0000000000000075L);
public static final Symbol DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:data:binary");
- private final Binary value;
+ private final ProtonBuffer buffer;
- public Data(Binary value) {
- this.value = value;
+ private Binary cachedBinary;
+
+ public Data(Binary binary) {
+ this.buffer = binary != null ? binary.asProtonBuffer() : null;
+ this.cachedBinary = binary;
}
- public Data(ProtonBuffer value) {
- this.value = value != null ? new Binary(value) : null;
+ public Data(ProtonBuffer buffer) {
+ this.buffer = buffer;
}
public Data(byte[] value) {
- this.value = value != null ? new Binary(value) : null;
+ this.buffer = value != null ? ProtonByteBufferAllocator.DEFAULT.wrap(value) : null;
}
public Data(byte[] value, int offset, int length) {
- this.value = value != null ? new Binary(value, offset, length) : null;
+ this.buffer = value != null ? ProtonByteBufferAllocator.DEFAULT.wrap(value, offset, length) : null;
}
public Data copy() {
- return new Data(value == null ? null : value.copy());
+ return new Data(buffer == null ? null : buffer.copy());
}
public Binary getBinary() {
- return value;
+ if (cachedBinary != null || buffer == null) {
+ return cachedBinary;
+ } else {
+ return cachedBinary = new Binary(buffer);
+ }
+ }
+
+ /**
+ * Returns the {@link ProtonBuffer} that contains the bytes carried in the {@link Data} section.
+ * If the section carries no bytes then this method returns null. This method allows the {@link Data}
+ * section to be considered a carrier of {@link ProtonBuffer} types instead of the {@link Binary}
+ * value it will encode as part of its body and avoids creation of a Binary object when one is not
+ * needed. If a Binary instance is required then calling the {@link #getBinary()} method will create
+ * an instance that wraps the internal {@link ProtonBuffer}.
+ *
+ * @return the {@link ProtonBuffer} that back this Data section.
+ */
+ public ProtonBuffer getBuffer() {
+ return buffer;
}
/**
@@ -61,16 +83,42 @@ public final class Data implements Section<byte[]> {
*/
@Override
public byte[] getValue() {
- if (value != null && value.hasArray() && value.getArrayOffset() == 0 && value.getLength() == value.getArray().length) {
- return value.getArray();
+ if (buffer != null && buffer.hasArray() && buffer.getArrayOffset() == 0 && buffer.getReadableBytes() == buffer.getArray().length) {
+ return buffer.getArray();
} else {
- return value != null ? value.arrayCopy() : null;
+ byte[] dataCopy = null;
+ if (buffer != null) {
+ dataCopy = new byte[buffer.getReadableBytes()];
+ buffer.getBytes(buffer.getReadIndex(), dataCopy);
+ }
+
+ return dataCopy;
}
}
@Override
public String toString() {
- return "Data{ " + value + " }";
+ if (buffer == null) {
+ return "";
+ }
+
+ StringBuilder str = new StringBuilder();
+
+ str.append("Data{ ");
+
+ for (int i = 0; i < buffer.getReadableBytes(); i++) {
+ byte c = buffer.getByte(i);
+
+ if (c > 31 && c < 127 && c != '\\') {
+ str.append((char) c);
+ } else {
+ str.append(String.format("\\x%02x", c));
+ }
+ }
+
+ str.append(" }");
+
+ return str.toString();
}
@Override
@@ -82,7 +130,7 @@ public final class Data implements Section<byte[]> {
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((value == null) ? 0 : value.hashCode());
+ result = prime * result + ((buffer == null) ? 0 : buffer.hashCode());
return result;
}
@@ -99,14 +147,10 @@ public final class Data implements Section<byte[]> {
}
Data other = (Data) obj;
- if (value == null) {
- if (other.value != null) {
- return false;
- }
- } else if (!value.equals(other.value)) {
- return false;
+ if (buffer == null) {
+ return other.buffer == null;
}
- return true;
+ return buffer.equals(other.buffer);
}
}
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/codec/benchmark/Benchmark.java b/protonj2/src/test/java/org/apache/qpid/protonj2/codec/benchmark/Benchmark.java
index 5e4eff69..669690ab 100644
--- a/protonj2/src/test/java/org/apache/qpid/protonj2/codec/benchmark/Benchmark.java
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/codec/benchmark/Benchmark.java
@@ -34,7 +34,6 @@ import org.apache.qpid.protonj2.codec.Decoder;
import org.apache.qpid.protonj2.codec.DecoderState;
import org.apache.qpid.protonj2.codec.Encoder;
import org.apache.qpid.protonj2.codec.EncoderState;
-import org.apache.qpid.protonj2.types.Binary;
import org.apache.qpid.protonj2.types.Symbol;
import org.apache.qpid.protonj2.types.UnsignedByte;
import org.apache.qpid.protonj2.types.UnsignedInteger;
@@ -367,9 +366,9 @@ public class Benchmark implements Runnable {
}
private void benchmarkData() throws IOException {
- Data data1 = new Data(new Binary(new byte[] {1, 2, 3}));
- Data data2 = new Data(new Binary(new byte[] {4, 5, 6}));
- Data data3 = new Data(new Binary(new byte[] {7, 8, 9}));
+ Data data1 = new Data(new byte[] {1, 2, 3});
+ Data data2 = new Data(new byte[] {4, 5, 6});
+ Data data3 = new Data(new byte[] {7, 8, 9});
resultSet.start();
for (int i = 0; i < ITERATIONS; i++) {
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/codec/messaging/DataTypeCodecTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/codec/messaging/DataTypeCodecTest.java
index fd878ee5..5efa51a9 100644
--- a/protonj2/src/test/java/org/apache/qpid/protonj2/codec/messaging/DataTypeCodecTest.java
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/codec/messaging/DataTypeCodecTest.java
@@ -98,7 +98,7 @@ public class DataTypeCodecTest extends CodecTestSupport {
ProtonBuffer buffer = ProtonByteBufferAllocator.DEFAULT.allocate();
InputStream stream = new ProtonBufferInputStream(buffer);
- Data data = new Data(new Binary(new byte[] { 1, 2, 3}));
+ Data data = new Data(new byte[] { 1, 2, 3});
for (int i = 0; i < size; ++i) {
encoder.writeObject(buffer, encoderState, data);
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/types/messaging/DataTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/types/messaging/DataTest.java
index 4e1f2b7b..d57111a1 100644
--- a/protonj2/src/test/java/org/apache/qpid/protonj2/types/messaging/DataTest.java
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/types/messaging/DataTest.java
@@ -42,6 +42,11 @@ public class DataTest {
assertNull(new Data((byte[]) null).getValue());
}
+ @Test
+ public void testCopyFromEmptyProtonBuffer() {
+ assertNull(new Data((ProtonBuffer) null).copy().getBinary());
+ }
+
@Test
public void testCopyFromEmpty() {
assertNull(new Data((Binary) null).copy().getBinary());
@@ -78,6 +83,25 @@ public class DataTest {
assertEquals(new Data((Binary) null).hashCode(), new Data((ProtonBuffer) null).hashCode());
}
+ @Test
+ public void testHashCodeWithProtonBuffer() {
+ byte[] bytes = new byte[] { 1 };
+ Data data = new Data(bytes);
+ Data copy = data.copy();
+
+ assertNotNull(copy.getValue());
+ assertNotSame(data.getValue(), copy.getValue());
+
+ assertEquals(data.hashCode(), copy.hashCode());
+
+ Data second = new Data(new byte[] { 1, 2, 3 });
+
+ assertNotEquals(data.hashCode(), second.hashCode());
+
+ assertNotEquals(new Data((ProtonBuffer) null).hashCode(), data.hashCode());
+ assertEquals(new Data((ProtonBuffer) null).hashCode(), new Data((ProtonBuffer) null).hashCode());
+ }
+
@Test
public void testEquals() {
byte[] bytes = new byte[] { 1 };
@@ -107,6 +131,34 @@ public class DataTest {
assertEquals(new Data((Binary) null), new Data((ProtonBuffer) null));
}
+ @Test
+ public void testEqualsWithoutBinary() {
+ byte[] bytes = new byte[] { 1 };
+ Data data = new Data(bytes);
+ Data copy = data.copy();
+
+ assertNotNull(copy.getValue());
+ assertNotSame(data.getValue(), copy.getValue());
+
+ assertEquals(data, data);
+ assertEquals(data, copy);
+
+ Data second = new Data(ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 1, 2, 3 }));
+ Data third = new Data(new byte[] { 1, 2, 3 }, 0, 3);
+ Data fourth = new Data(new byte[] { 1, 2, 3 }, 0, 1);
+ Data fifth = new Data(null, 0, 0);
+
+ assertNotEquals(data, second);
+ assertNotEquals(data, third);
+ assertNotEquals(data, fifth);
+ assertEquals(data, fourth);
+ assertFalse(data.equals(null));
+ assertNotEquals(data, "not a data");
+ assertNotEquals(data, new Data((byte[]) null));
+ assertNotEquals(new Data((ProtonBuffer) null), data);
+ assertEquals(new Data((byte[]) null), new Data((ProtonBuffer) null));
+ }
+
@Test
public void testGetValueWhenUsingAnArrayView() {
Data view = new Data(new byte[] { 1, 2, 3 }, 0, 1);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org