You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/06/14 22:32:13 UTC

[qpid-protonj2] branch main updated: PROTON-2564 Reduece memory alloactions on send and receive paths

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 10747f4a PROTON-2564 Reduece memory alloactions on send and receive paths
10747f4a is described below

commit 10747f4a58819a29ba5fdcc09755b3f0e1440fd3
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Tue Jun 14 17:58:27 2022 -0400

    PROTON-2564 Reduece memory alloactions on send and receive paths
    
    Reduces the amount of memory allocations in the engine and client on
    both the send and receive paths during normal operations.
---
 .../protonj2/client/impl/ClientMessageSupport.java |  9 ++-
 .../protonj2/client/util/FifoDeliveryQueue.java    | 75 +++++--------------
 .../protonj2/client/impl/ClientMessageTest.java    |  1 +
 .../codec/decoders/messaging/DataTypeDecoder.java  |  6 +-
 .../ApplicationPropertiesTypeEncoder.java          | 12 ++-
 .../codec/encoders/messaging/DataTypeEncoder.java  |  9 +--
 .../messaging/DeliveryAnnotationsTypeEncoder.java  | 12 ++-
 .../encoders/messaging/FooterTypeEncoder.java      | 12 ++-
 .../messaging/MessageAnnotationsTypeEncoder.java   | 12 ++-
 .../codec/encoders/primitives/MapTypeEncoder.java  | 22 ++----
 .../apache/qpid/protonj2/types/messaging/Data.java | 86 ++++++++++++++++------
 .../qpid/protonj2/codec/benchmark/Benchmark.java   |  7 +-
 .../codec/messaging/DataTypeCodecTest.java         |  2 +-
 .../qpid/protonj2/types/messaging/DataTest.java    | 52 +++++++++++++
 14 files changed, 182 insertions(+), 135 deletions(-)

diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessageSupport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessageSupport.java
index a7cba17d..cf4ad91f 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessageSupport.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessageSupport.java
@@ -54,6 +54,11 @@ public abstract class ClientMessageSupport {
     private static final Encoder DEFAULT_ENCODER = CodecFactory.getDefaultEncoder();
     private static final Decoder DEFAULT_DECODER = CodecFactory.getDefaultDecoder();
 
+    private static final ThreadLocal<EncoderState> THREAD_LOCAL_ENCODER_STATE =
+        ThreadLocal.withInitial(() -> DEFAULT_ENCODER.newEncoderState());
+    private static final ThreadLocal<DecoderState> THREAD_LOCAL_DECODER_STATE =
+            ThreadLocal.withInitial(() -> DEFAULT_DECODER.newDecoderState());
+
     //----- Message Conversion
 
     /**
@@ -92,7 +97,7 @@ public abstract class ClientMessageSupport {
     //----- Message Encoding
 
     public static ProtonBuffer encodeMessage(AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations) throws ClientException {
-        return encodeMessage(DEFAULT_ENCODER, DEFAULT_ENCODER.newEncoderState(), ProtonByteBufferAllocator.DEFAULT, message, deliveryAnnotations);
+        return encodeMessage(DEFAULT_ENCODER, THREAD_LOCAL_ENCODER_STATE.get(), ProtonByteBufferAllocator.DEFAULT, message, deliveryAnnotations);
     }
 
     public static ProtonBuffer encodeMessage(Encoder encoder, ProtonBufferAllocator allocator, AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations) throws ClientException {
@@ -136,7 +141,7 @@ public abstract class ClientMessageSupport {
     //----- Message Decoding
 
     public static Message<?> decodeMessage(ProtonBuffer buffer, Consumer<DeliveryAnnotations> daConsumer) throws ClientException {
-        return decodeMessage(DEFAULT_DECODER, DEFAULT_DECODER.newDecoderState(), buffer, daConsumer);
+        return decodeMessage(DEFAULT_DECODER, THREAD_LOCAL_DECODER_STATE.get(), buffer, daConsumer);
     }
 
     public static Message<?> decodeMessage(Decoder decoder, ProtonBuffer buffer, Consumer<DeliveryAnnotations> daConsumer) throws ClientException {
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
index 634b1ec6..91fbaebe 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
@@ -20,8 +20,6 @@ import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.qpid.protonj2.client.Delivery;
 import org.apache.qpid.protonj2.client.impl.ClientDelivery;
@@ -40,9 +38,6 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
 
     private volatile int state = STOPPED;
 
-    private final ReentrantLock lock = new ReentrantLock();
-    private final Condition condition = lock.newCondition();
-
     private final Deque<ClientDelivery> queue;
 
     /**
@@ -57,38 +52,30 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
 
     @Override
     public void enqueueFirst(ClientDelivery envelope) {
-        lock.lock();
-        try {
+        synchronized (queue) {
             queue.addFirst(envelope);
-            condition.signal();
-        } finally {
-            lock.unlock();
+            queue.notify();
         }
     }
 
     @Override
     public void enqueue(ClientDelivery envelope) {
-        lock.lock();
-        try {
+        synchronized (queue) {
             queue.addLast(envelope);
-            condition.signal();
-        } finally {
-            lock.unlock();
+            queue.notify();
         }
     }
 
-
     @Override
     public ClientDelivery dequeue(long timeout) throws InterruptedException {
-        lock.lock();
-        try {
+        synchronized (queue) {
             // Wait until the receiver is ready to deliver messages.
             while (timeout != 0 && isRunning() && queue.isEmpty()) {
                 if (timeout == -1) {
-                    condition.await();
+                    queue.wait();
                 } else {
                     long start = System.currentTimeMillis();
-                    condition.await(timeout, TimeUnit.MILLISECONDS);
+                    queue.wait(TimeUnit.MILLISECONDS.toMillis(timeout));
                     timeout = Math.max(timeout + start - System.currentTimeMillis(), 0);
                 }
             }
@@ -98,33 +85,25 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
             }
 
             return queue.pollFirst();
-        } finally {
-            lock.unlock();
         }
     }
 
     @Override
     public ClientDelivery dequeueNoWait() {
-        lock.lock();
-        try {
+        synchronized (queue) {
             if (!isRunning()) {
                 return null;
             }
 
             return queue.pollFirst();
-        } finally {
-            lock.unlock();
         }
     }
 
     @Override
     public void start() {
         if (STATE_FIELD_UPDATER.compareAndSet(this, STOPPED, RUNNING)) {
-            lock.lock();
-            try {
-                condition.signalAll();
-            } finally {
-                lock.unlock();
+            synchronized (queue) {
+                queue.notifyAll();
             }
         }
     }
@@ -132,11 +111,8 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
     @Override
     public void stop() {
         if (STATE_FIELD_UPDATER.compareAndSet(this, RUNNING, STOPPED)) {
-            lock.lock();
-            try {
-                condition.signalAll();
-            } finally {
-                lock.unlock();
+            synchronized (queue) {
+                queue.notifyAll();
             }
         }
     }
@@ -144,11 +120,8 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
     @Override
     public void close() {
         if (STATE_FIELD_UPDATER.getAndSet(this, CLOSED) > CLOSED) {
-            lock.lock();
-            try {
-                condition.signalAll();
-            } finally {
-                lock.unlock();
+            synchronized (queue) {
+                queue.notifyAll();
             }
         }
     }
@@ -165,41 +138,29 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
 
     @Override
     public boolean isEmpty() {
-        lock.lock();
-        try {
+        synchronized (queue) {
             return queue.isEmpty();
-        } finally {
-            lock.unlock();
         }
     }
 
     @Override
     public int size() {
-        lock.lock();
-        try {
+        synchronized (queue) {
             return queue.size();
-        } finally {
-            lock.unlock();
         }
     }
 
     @Override
     public void clear() {
-        lock.lock();
-        try {
+        synchronized (queue) {
             queue.clear();
-        } finally {
-            lock.unlock();
         }
     }
 
     @Override
     public String toString() {
-        lock.lock();
-        try {
+        synchronized (queue) {
             return queue.toString();
-        } finally {
-            lock.unlock();
         }
     }
 }
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientMessageTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientMessageTest.java
index 1a695b2c..c41a16f5 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientMessageTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientMessageTest.java
@@ -366,6 +366,7 @@ class ClientMessageTest {
         message.bodySections().forEach(section -> {
             assertTrue(section instanceof Data);
             final Data dataView = (Data) section;
+            assertEquals(counter.get(), dataView.getBuffer().getArray()[0]);
             assertEquals(counter.getAndIncrement(), dataView.getBinary().getArray()[0]);
         });
     }
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/messaging/DataTypeDecoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/messaging/DataTypeDecoder.java
index 5b274670..b0221db5 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/messaging/DataTypeDecoder.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/messaging/DataTypeDecoder.java
@@ -39,7 +39,7 @@ import org.apache.qpid.protonj2.types.messaging.Data;
  */
 public final class DataTypeDecoder extends AbstractDescribedTypeDecoder<Data> {
 
-    private static final Data EMPTY_DATA = new Data((Binary) null);
+    private static final Data EMPTY_DATA = new Data((ProtonBuffer) null);
 
     @Override
     public Class<Data> getTypeClass() {
@@ -86,7 +86,7 @@ public final class DataTypeDecoder extends AbstractDescribedTypeDecoder<Data> {
         data.setWriteIndex(size);
         buffer.setReadIndex(position + size);
 
-        return new Data(new Binary(data));
+        return new Data(data);
     }
 
     @Override
@@ -130,7 +130,7 @@ public final class DataTypeDecoder extends AbstractDescribedTypeDecoder<Data> {
                 throw new DecodeException("Expected Binary type but found encoding: " + encodingCode);
         }
 
-        return new Data(new Binary(ProtonStreamUtils.readBytes(stream, size)));
+        return new Data(ProtonStreamUtils.readBytes(stream, size));
     }
 
     @Override
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/ApplicationPropertiesTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/ApplicationPropertiesTypeEncoder.java
index 912a6ef9..f80b5626 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/ApplicationPropertiesTypeEncoder.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/ApplicationPropertiesTypeEncoder.java
@@ -16,8 +16,6 @@
  */
 package org.apache.qpid.protonj2.codec.encoders.messaging;
 
-import java.util.Map;
-
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.codec.EncoderState;
 import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedMapTypeEncoder;
@@ -60,11 +58,11 @@ public final class ApplicationPropertiesTypeEncoder extends AbstractDescribedMap
     }
 
     @Override
-    public void writeMapEntries(ProtonBuffer buffer, EncoderState state, ApplicationProperties value) {
+    public void writeMapEntries(ProtonBuffer buffer, EncoderState state, ApplicationProperties properties) {
         // Write the Map elements and then compute total size written.
-        for (Map.Entry<String, Object> entry : value.getValue().entrySet()) {
-            state.getEncoder().writeString(buffer, state, entry.getKey());
-            state.getEncoder().writeObject(buffer, state, entry.getValue());
-        }
+        properties.getValue().forEach((key, value) -> {
+            state.getEncoder().writeString(buffer, state, key);
+            state.getEncoder().writeObject(buffer, state, value);
+        });
     }
 }
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java
index a6ec3284..8038fc6e 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java
@@ -20,7 +20,6 @@ import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.codec.EncoderState;
 import org.apache.qpid.protonj2.codec.EncodingCodes;
 import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedTypeEncoder;
-import org.apache.qpid.protonj2.types.Binary;
 import org.apache.qpid.protonj2.types.Symbol;
 import org.apache.qpid.protonj2.types.UnsignedLong;
 import org.apache.qpid.protonj2.types.messaging.Data;
@@ -51,7 +50,7 @@ public final class DataTypeEncoder extends AbstractDescribedTypeEncoder<Data> {
         buffer.writeByte(EncodingCodes.SMALLULONG);
         buffer.writeByte(Data.DESCRIPTOR_CODE.byteValue());
 
-        state.getEncoder().writeBinary(buffer, state, value.getValue());
+        state.getEncoder().writeBinary(buffer, state, value.getBuffer());
     }
 
     @Override
@@ -85,9 +84,9 @@ public final class DataTypeEncoder extends AbstractDescribedTypeEncoder<Data> {
 
         buffer.writeByte(EncodingCodes.VBIN32);
         for (Object value : values) {
-            final Binary binary = ((Data) value).getBinary();
-            buffer.writeInt(binary.getLength());
-            buffer.writeBytes(binary.getArray(), binary.getArrayOffset(), binary.getLength());
+            final ProtonBuffer binary = ((Data) value).getBuffer();
+            buffer.writeInt(binary.getReadableBytes());
+            buffer.writeBytes(binary.getArray(), binary.getArrayOffset(), binary.getReadableBytes());
         }
     }
 }
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DeliveryAnnotationsTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DeliveryAnnotationsTypeEncoder.java
index 774f8e6a..769a3ca6 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DeliveryAnnotationsTypeEncoder.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DeliveryAnnotationsTypeEncoder.java
@@ -16,8 +16,6 @@
  */
 package org.apache.qpid.protonj2.codec.encoders.messaging;
 
-import java.util.Map;
-
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.codec.EncoderState;
 import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedMapTypeEncoder;
@@ -60,11 +58,11 @@ public final class DeliveryAnnotationsTypeEncoder extends AbstractDescribedMapTy
     }
 
     @Override
-    public void writeMapEntries(ProtonBuffer buffer, EncoderState state, DeliveryAnnotations value) {
+    public void writeMapEntries(ProtonBuffer buffer, EncoderState state, DeliveryAnnotations annotations) {
         // Write the Map elements and then compute total size written.
-        for (Map.Entry<Symbol, Object> entry : value.getValue().entrySet()) {
-            state.getEncoder().writeSymbol(buffer, state, entry.getKey());
-            state.getEncoder().writeObject(buffer, state, entry.getValue());
-        }
+        annotations.getValue().forEach((key, value) -> {
+            state.getEncoder().writeSymbol(buffer, state, key);
+            state.getEncoder().writeObject(buffer, state, value);
+        });
     }
 }
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/FooterTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/FooterTypeEncoder.java
index 5bb2fd29..c584e537 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/FooterTypeEncoder.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/FooterTypeEncoder.java
@@ -16,8 +16,6 @@
  */
 package org.apache.qpid.protonj2.codec.encoders.messaging;
 
-import java.util.Map;
-
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.codec.EncoderState;
 import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedMapTypeEncoder;
@@ -60,11 +58,11 @@ public final class FooterTypeEncoder extends AbstractDescribedMapTypeEncoder<Obj
     }
 
     @Override
-    public void writeMapEntries(ProtonBuffer buffer, EncoderState state, Footer value) {
+    public void writeMapEntries(ProtonBuffer buffer, EncoderState state, Footer footers) {
         // Write the Map elements and then compute total size written.
-        for (Map.Entry<Symbol, Object> entry : value.getValue().entrySet()) {
-            state.getEncoder().writeObject(buffer, state, entry.getKey());
-            state.getEncoder().writeObject(buffer, state, entry.getValue());
-        }
+        footers.getValue().forEach((key, value) -> {
+            state.getEncoder().writeObject(buffer, state, key);
+            state.getEncoder().writeObject(buffer, state, value);
+        });
     }
 }
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/MessageAnnotationsTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/MessageAnnotationsTypeEncoder.java
index b4233aa7..02b692f1 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/MessageAnnotationsTypeEncoder.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/MessageAnnotationsTypeEncoder.java
@@ -16,8 +16,6 @@
  */
 package org.apache.qpid.protonj2.codec.encoders.messaging;
 
-import java.util.Map;
-
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.codec.EncoderState;
 import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedMapTypeEncoder;
@@ -60,11 +58,11 @@ public final class MessageAnnotationsTypeEncoder extends AbstractDescribedMapTyp
     }
 
     @Override
-    public void writeMapEntries(ProtonBuffer buffer, EncoderState state, MessageAnnotations value) {
+    public void writeMapEntries(ProtonBuffer buffer, EncoderState state, MessageAnnotations annotations) {
         // Write the Map elements and then compute total size written.
-        for (Map.Entry<Symbol, Object> entry : value.getValue().entrySet()) {
-            state.getEncoder().writeSymbol(buffer, state, entry.getKey());
-            state.getEncoder().writeObject(buffer, state, entry.getValue());
-        }
+        annotations.getValue().forEach((key, value) -> {
+            state.getEncoder().writeSymbol(buffer, state, key);
+            state.getEncoder().writeObject(buffer, state, value);
+        });
     }
 }
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/primitives/MapTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/primitives/MapTypeEncoder.java
index 0f34276c..80360148 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/primitives/MapTypeEncoder.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/primitives/MapTypeEncoder.java
@@ -17,8 +17,6 @@
 package org.apache.qpid.protonj2.codec.encoders.primitives;
 
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
 
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.codec.EncodeException;
@@ -62,25 +60,21 @@ public final class MapTypeEncoder extends AbstractPrimitiveTypeEncoder<Map> {
         buffer.writeInt(value.size() * 2);
 
         // Write the list elements and then compute total size written.
-        Set<Map.Entry> entries = value.entrySet();
-        for (Entry entry : entries) {
-            Object entryKey = entry.getKey();
-            Object entryValue = entry.getValue();
-
-            TypeEncoder keyEncoder = state.getEncoder().getTypeEncoder(entryKey);
+        value.forEach((key, entry) -> {
+            TypeEncoder keyEncoder = state.getEncoder().getTypeEncoder(key);
             if (keyEncoder == null) {
-                throw new EncodeException("Cannot find encoder for type " + entryKey);
+                throw new EncodeException("Cannot find encoder for type " + key);
             }
 
-            keyEncoder.writeType(buffer, state, entryKey);
+            keyEncoder.writeType(buffer, state, key);
 
-            TypeEncoder valueEncoder = state.getEncoder().getTypeEncoder(entryValue);
+            TypeEncoder valueEncoder = state.getEncoder().getTypeEncoder(entry);
             if (valueEncoder == null) {
-                throw new EncodeException("Cannot find encoder for type " + entryValue);
+                throw new EncodeException("Cannot find encoder for type " + entry);
             }
 
-            valueEncoder.writeType(buffer, state, entryValue);
-        }
+            valueEncoder.writeType(buffer, state, entry);
+        });
 
         // Move back and write the size
         int endIndex = buffer.getWriteIndex();
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java b/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java
index ff872ba7..1856828d 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java
@@ -17,6 +17,7 @@
 package org.apache.qpid.protonj2.types.messaging;
 
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
+import org.apache.qpid.protonj2.buffer.ProtonByteBufferAllocator;
 import org.apache.qpid.protonj2.types.Binary;
 import org.apache.qpid.protonj2.types.Symbol;
 import org.apache.qpid.protonj2.types.UnsignedLong;
@@ -26,30 +27,51 @@ public final class Data implements Section<byte[]> {
     public static final UnsignedLong DESCRIPTOR_CODE = UnsignedLong.valueOf(0x0000000000000075L);
     public static final Symbol DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:data:binary");
 
-    private final Binary value;
+    private final ProtonBuffer buffer;
 
-    public Data(Binary value) {
-        this.value = value;
+    private Binary cachedBinary;
+
+    public Data(Binary binary) {
+        this.buffer = binary != null ? binary.asProtonBuffer() : null;
+        this.cachedBinary = binary;
     }
 
-    public Data(ProtonBuffer value) {
-        this.value = value != null ? new Binary(value) : null;
+    public Data(ProtonBuffer buffer) {
+        this.buffer = buffer;
     }
 
     public Data(byte[] value) {
-        this.value = value != null ? new Binary(value) : null;
+        this.buffer = value != null ? ProtonByteBufferAllocator.DEFAULT.wrap(value) : null;
     }
 
     public Data(byte[] value, int offset, int length) {
-        this.value = value != null ? new Binary(value, offset, length) : null;
+        this.buffer = value != null ? ProtonByteBufferAllocator.DEFAULT.wrap(value, offset, length) : null;
     }
 
     public Data copy() {
-        return new Data(value == null ? null : value.copy());
+        return new Data(buffer == null ? null : buffer.copy());
     }
 
     public Binary getBinary() {
-        return value;
+        if (cachedBinary != null || buffer == null) {
+            return cachedBinary;
+        } else {
+            return cachedBinary = new Binary(buffer);
+        }
+    }
+
+    /**
+     * Returns the {@link ProtonBuffer} that contains the bytes carried in the {@link Data} section.
+     * If the section carries no bytes then this method returns null.  This method allows the {@link Data}
+     * section to be considered a carrier of {@link ProtonBuffer} types instead of the {@link Binary}
+     * value it will encode as part of its body and avoids creation of a Binary object when one is not
+     * needed. If a Binary instance is required then calling the {@link #getBinary()} method will create
+     * an instance that wraps the internal {@link ProtonBuffer}.
+     *
+     * @return the {@link ProtonBuffer} that back this Data section.
+     */
+    public ProtonBuffer getBuffer() {
+        return buffer;
     }
 
     /**
@@ -61,16 +83,42 @@ public final class Data implements Section<byte[]> {
      */
     @Override
     public byte[] getValue() {
-        if (value != null && value.hasArray() && value.getArrayOffset() == 0 && value.getLength() == value.getArray().length) {
-            return value.getArray();
+        if (buffer != null && buffer.hasArray() && buffer.getArrayOffset() == 0 && buffer.getReadableBytes() == buffer.getArray().length) {
+            return buffer.getArray();
         } else {
-            return value != null ? value.arrayCopy() : null;
+            byte[] dataCopy = null;
+            if (buffer != null) {
+                dataCopy = new byte[buffer.getReadableBytes()];
+                buffer.getBytes(buffer.getReadIndex(), dataCopy);
+            }
+
+            return dataCopy;
         }
     }
 
     @Override
     public String toString() {
-        return "Data{ " + value + " }";
+        if (buffer == null) {
+            return "";
+        }
+
+        StringBuilder str = new StringBuilder();
+
+        str.append("Data{ ");
+
+        for (int i = 0; i < buffer.getReadableBytes(); i++) {
+            byte c = buffer.getByte(i);
+
+            if (c > 31 && c < 127 && c != '\\') {
+                str.append((char) c);
+            } else {
+                str.append(String.format("\\x%02x", c));
+            }
+        }
+
+        str.append(" }");
+
+        return str.toString();
     }
 
     @Override
@@ -82,7 +130,7 @@ public final class Data implements Section<byte[]> {
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + ((value == null) ? 0 : value.hashCode());
+        result = prime * result + ((buffer == null) ? 0 : buffer.hashCode());
         return result;
     }
 
@@ -99,14 +147,10 @@ public final class Data implements Section<byte[]> {
         }
 
         Data other = (Data) obj;
-        if (value == null) {
-            if (other.value != null) {
-                return false;
-            }
-        } else if (!value.equals(other.value)) {
-            return false;
+        if (buffer == null) {
+            return other.buffer == null;
         }
 
-        return true;
+        return buffer.equals(other.buffer);
     }
 }
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/codec/benchmark/Benchmark.java b/protonj2/src/test/java/org/apache/qpid/protonj2/codec/benchmark/Benchmark.java
index 5e4eff69..669690ab 100644
--- a/protonj2/src/test/java/org/apache/qpid/protonj2/codec/benchmark/Benchmark.java
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/codec/benchmark/Benchmark.java
@@ -34,7 +34,6 @@ import org.apache.qpid.protonj2.codec.Decoder;
 import org.apache.qpid.protonj2.codec.DecoderState;
 import org.apache.qpid.protonj2.codec.Encoder;
 import org.apache.qpid.protonj2.codec.EncoderState;
-import org.apache.qpid.protonj2.types.Binary;
 import org.apache.qpid.protonj2.types.Symbol;
 import org.apache.qpid.protonj2.types.UnsignedByte;
 import org.apache.qpid.protonj2.types.UnsignedInteger;
@@ -367,9 +366,9 @@ public class Benchmark implements Runnable {
     }
 
     private void benchmarkData() throws IOException {
-        Data data1 = new Data(new Binary(new byte[] {1, 2, 3}));
-        Data data2 = new Data(new Binary(new byte[] {4, 5, 6}));
-        Data data3 = new Data(new Binary(new byte[] {7, 8, 9}));
+        Data data1 = new Data(new byte[] {1, 2, 3});
+        Data data2 = new Data(new byte[] {4, 5, 6});
+        Data data3 = new Data(new byte[] {7, 8, 9});
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/codec/messaging/DataTypeCodecTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/codec/messaging/DataTypeCodecTest.java
index fd878ee5..5efa51a9 100644
--- a/protonj2/src/test/java/org/apache/qpid/protonj2/codec/messaging/DataTypeCodecTest.java
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/codec/messaging/DataTypeCodecTest.java
@@ -98,7 +98,7 @@ public class DataTypeCodecTest extends CodecTestSupport {
         ProtonBuffer buffer = ProtonByteBufferAllocator.DEFAULT.allocate();
         InputStream stream = new ProtonBufferInputStream(buffer);
 
-        Data data = new Data(new Binary(new byte[] { 1, 2, 3}));
+        Data data = new Data(new byte[] { 1, 2, 3});
 
         for (int i = 0; i < size; ++i) {
             encoder.writeObject(buffer, encoderState, data);
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/types/messaging/DataTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/types/messaging/DataTest.java
index 4e1f2b7b..d57111a1 100644
--- a/protonj2/src/test/java/org/apache/qpid/protonj2/types/messaging/DataTest.java
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/types/messaging/DataTest.java
@@ -42,6 +42,11 @@ public class DataTest {
         assertNull(new Data((byte[]) null).getValue());
     }
 
+    @Test
+    public void testCopyFromEmptyProtonBuffer() {
+        assertNull(new Data((ProtonBuffer) null).copy().getBinary());
+    }
+
     @Test
     public void testCopyFromEmpty() {
         assertNull(new Data((Binary) null).copy().getBinary());
@@ -78,6 +83,25 @@ public class DataTest {
         assertEquals(new Data((Binary) null).hashCode(), new Data((ProtonBuffer) null).hashCode());
     }
 
+    @Test
+    public void testHashCodeWithProtonBuffer() {
+        byte[] bytes = new byte[] { 1 };
+        Data data = new Data(bytes);
+        Data copy = data.copy();
+
+        assertNotNull(copy.getValue());
+        assertNotSame(data.getValue(), copy.getValue());
+
+        assertEquals(data.hashCode(), copy.hashCode());
+
+        Data second = new Data(new byte[] { 1, 2, 3 });
+
+        assertNotEquals(data.hashCode(), second.hashCode());
+
+        assertNotEquals(new Data((ProtonBuffer) null).hashCode(), data.hashCode());
+        assertEquals(new Data((ProtonBuffer) null).hashCode(), new Data((ProtonBuffer) null).hashCode());
+    }
+
     @Test
     public void testEquals() {
         byte[] bytes = new byte[] { 1 };
@@ -107,6 +131,34 @@ public class DataTest {
         assertEquals(new Data((Binary) null), new Data((ProtonBuffer) null));
     }
 
+    @Test
+    public void testEqualsWithoutBinary() {
+        byte[] bytes = new byte[] { 1 };
+        Data data = new Data(bytes);
+        Data copy = data.copy();
+
+        assertNotNull(copy.getValue());
+        assertNotSame(data.getValue(), copy.getValue());
+
+        assertEquals(data, data);
+        assertEquals(data, copy);
+
+        Data second = new Data(ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 1, 2, 3 }));
+        Data third = new Data(new byte[] { 1, 2, 3 }, 0, 3);
+        Data fourth = new Data(new byte[] { 1, 2, 3 }, 0, 1);
+        Data fifth = new Data(null, 0, 0);
+
+        assertNotEquals(data, second);
+        assertNotEquals(data, third);
+        assertNotEquals(data, fifth);
+        assertEquals(data, fourth);
+        assertFalse(data.equals(null));
+        assertNotEquals(data, "not a data");
+        assertNotEquals(data, new Data((byte[]) null));
+        assertNotEquals(new Data((ProtonBuffer) null), data);
+        assertEquals(new Data((byte[]) null), new Data((ProtonBuffer) null));
+    }
+
     @Test
     public void testGetValueWhenUsingAnArrayView() {
         Data view = new Data(new byte[] { 1, 2, 3 }, 0, 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org