You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/10/18 15:21:36 UTC
[02/10] qpid-broker-j git commit: QPID-7832: [Java Broker] Refactor
store/protocol API using Collection
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
index 9a48b84..6d9c190 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -23,28 +23,26 @@ package org.apache.qpid.server.protocol.converter.v0_8_v0_10;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.net.URISyntaxException;
-import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.converter.MessageConversionException;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
-import org.apache.qpid.server.protocol.v0_8.AMQShortString;
-import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
-import org.apache.qpid.server.protocol.v0_8.FieldTable;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
-import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryPriority;
import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.url.AMQBindingURL;
@PluggableService
@@ -93,7 +91,7 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag
}
@Override
- public Collection<QpidByteBuffer> getContent(final int offset, final int length)
+ public QpidByteBuffer getContent(final int offset, final int length)
{
return message_0_8.getContent(offset, length);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java
index e46d732..645d102 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java
@@ -578,8 +578,12 @@ public class PropertyConverter_0_10_to_0_8Test extends QpidTestCase
if (content != null)
{
when(storedMessage.getContentSize()).thenReturn(content.length);
- when(storedMessage.getContent(0, content.length)).thenReturn(Collections.singleton(QpidByteBuffer.wrap(
- content)));
+ when(storedMessage.getContent(0, content.length)).thenReturn(QpidByteBuffer.wrap(content));
+ }
+ else
+ {
+ when(storedMessage.getContentSize()).thenReturn(0);
+ when(storedMessage.getContent(0, 0)).thenReturn(QpidByteBuffer.emptyQpidByteBuffer());
}
return new MessageTransferMessage(storedMessage, null);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_8_to_0_10Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_8_to_0_10Test.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_8_to_0_10Test.java
index 8bb4a47..76bc200 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_8_to_0_10Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_8_to_0_10Test.java
@@ -24,7 +24,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -472,8 +471,12 @@ public class PropertyConverter_0_8_to_0_10Test extends QpidTestCase
if (content != null)
{
when(storedMessage.getContentSize()).thenReturn(content.length);
- when(storedMessage.getContent(0, content.length)).thenReturn(Collections.singleton(QpidByteBuffer.wrap(
- content)));
+ when(storedMessage.getContent(0, content.length)).thenReturn(QpidByteBuffer.wrap(content));
+ }
+ else
+ {
+ when(storedMessage.getContentSize()).thenReturn(0);
+ when(storedMessage.getContent(0, 0)).thenReturn(QpidByteBuffer.emptyQpidByteBuffer());
}
return new AMQMessage(storedMessage);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index dcdd430..e8a02fd 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -32,8 +32,6 @@ import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.get
import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getTtl;
import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getUserId;
-import java.util.Collection;
-import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -117,9 +115,9 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
}
@Override
- public Collection<QpidByteBuffer> getContent(final int offset, final int length)
+ public QpidByteBuffer getContent(final int offset, final int length)
{
- return Collections.singleton(QpidByteBuffer.wrap(convertedContent, offset, length));
+ return QpidByteBuffer.wrap(convertedContent, offset, length);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
index df1a786..4fa5734 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
@@ -30,8 +30,6 @@ import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -39,8 +37,6 @@ import java.util.UUID;
import com.google.common.collect.Lists;
import org.mockito.ArgumentCaptor;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
@@ -340,12 +336,12 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
return expected;
}
- private List<EncodingRetainingSection<?>> getEncodingRetainingSections(final Collection<QpidByteBuffer> content,
+ private List<EncodingRetainingSection<?>> getEncodingRetainingSections(final QpidByteBuffer content,
final int expectedNumberOfSections)
throws Exception
{
SectionDecoder sectionDecoder = new SectionDecoderImpl(_typeRegistry.getSectionDecoderRegistry());
- final List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(new ArrayList<>(content));
+ final List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(content);
assertEquals("Unexpected number of sections", expectedNumberOfSections, sections.size());
return sections;
}
@@ -376,15 +372,8 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
when(_handle.getContent(offsetCaptor.capture(),
- sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>()
- {
- @Override
- public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable
- {
- final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue());
- return Collections.singleton(view);
- }
- });
+ sizeCaptor.capture())).then(invocation -> combined.view(offsetCaptor.getValue(),
+ sizeCaptor.getValue()));
}
private Byte getJmsMessageTypeAnnotation(final Message_1_0 convertedMessage)
@@ -466,7 +455,7 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
{
final AMQMessage sourceMessage = getAmqMessage(messageBytes, mimeType);
final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1);
EncodingRetainingSection<?> encodingRetainingSection = sections.get(0);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
index c9d17cc..cc2ada2 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
@@ -27,9 +27,9 @@ import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -67,7 +67,6 @@ import org.apache.qpid.server.typedmessage.mimecontentconverter.JmsMapMessageToM
import org.apache.qpid.server.typedmessage.mimecontentconverter.JmsStreamMessageToList;
import org.apache.qpid.server.typedmessage.mimecontentconverter.ListToJmsStreamMessage;
import org.apache.qpid.server.typedmessage.mimecontentconverter.MapToJmsMapMessage;
-import org.apache.qpid.server.util.ByteBufferUtils;
import org.apache.qpid.test.utils.QpidTestCase;
public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
@@ -127,7 +126,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertEquals("Unexpected mime type",
"application/java-object-stream",
@@ -143,7 +142,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
assertArrayEquals("Unexpected content size",
@@ -235,7 +234,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
@@ -265,7 +264,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertEquals("Unexpected content", expected, new String(getBytes(content), UTF_8));
}
@@ -279,7 +278,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertEquals("Unexpected content", expected, new String(getBytes(content), UTF_8));
}
@@ -295,7 +294,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
Map<String, Object> convertedMap = new JmsMapMessageToMap().toObject(getBytes(content));
assertEquals("Unexpected size", originalMap.size(), convertedMap.size());
@@ -316,7 +315,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
assertEquals("Unexpected mime type", "amqp/map", convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
Map<String, Object> convertedMap = new AmqpMapToMapConverter().toObject(getBytes(content));
assertEquals("Unexpected size", originalMap.size(), convertedMap.size());
@@ -352,7 +351,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
assertEquals("Unexpected mime type", "jms/stream-message", convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
List<Object> convertedList = new JmsStreamMessageToList().toObject(getBytes(content));
assertEquals("Unexpected size", originalList.size(), convertedList.size());
@@ -371,7 +370,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
assertEquals("Unexpected mime type", "amqp/list", convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
List<Object> convertedList = new AmqpListToListConverter().toObject(getBytes(content));
assertEquals("Unexpected size", originalList.size(), convertedList.size());
@@ -424,7 +423,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
assertEquals("Unexpected mime type", "jms/stream-message", convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertEquals("Unexpected content", expected, new JmsStreamMessageToList().toObject(getBytes(content)));
}
@@ -437,7 +436,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
assertEquals("Unexpected mime type", "amqp/list", convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
List<Object> convertedList = new AmqpListToListConverter().toObject(getBytes(content));
assertEquals("Unexpected size", originalList.size(), convertedList.size());
@@ -497,7 +496,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
assertEquals("Unexpected mime type",
expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertArrayEquals("Unexpected content", data, getBytes(content));
}
@@ -517,7 +516,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
assertEquals("Unexpected mime type",
expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertArrayEquals("Unexpected content", data, getBytes(content));
}
@@ -550,7 +549,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
assertEquals("Unexpected mime type",
expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertArrayEquals("Unexpected content", data, getBytes(content));
}
@@ -570,7 +569,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
assertEquals("Unexpected mime type",
expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertArrayEquals("Unexpected content", data, getBytes(content));
}
@@ -640,7 +639,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
assertEquals("Unexpected mime type", "amqp/map", convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertArrayEquals("Unexpected content", bytes, getBytes(content));
}
@@ -657,7 +656,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertArrayEquals("Unexpected content", bytes, getBytes(content));
}
@@ -674,7 +673,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
assertEquals("Unexpected mime type", "amqp/list", convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertArrayEquals("Unexpected content", bytes, getBytes(content));
}
@@ -691,7 +690,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
assertEquals("Unexpected mime type", "jms/stream-message", convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertArrayEquals("Unexpected content", bytes, getBytes(content));
}
@@ -708,7 +707,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
assertEquals("Unexpected mime type",
"application/java-object-stream",
convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertArrayEquals("Unexpected content", expected, getBytes(content));
}
@@ -726,7 +725,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
assertEquals("Unexpected mime type",
"application/java-object-stream",
convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertArrayEquals("Unexpected content", expected, getBytes(content));
}
@@ -743,7 +742,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
assertEquals("Unexpected mime type",
"application/octet-stream",
convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertArrayEquals("Unexpected content", expected, getBytes(content));
}
@@ -758,7 +757,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
assertEquals("Unexpected mime type",
"application/octet-stream",
convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertArrayEquals("Unexpected content", expected, getBytes(content));
}
@@ -778,7 +777,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertEquals("Unexpected mime type",
"application/java-object-stream",
@@ -792,7 +791,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
assertArrayEquals("Unexpected content size",
@@ -891,7 +890,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
@@ -926,21 +925,21 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
assertEquals("Unexpected content", expected, new String(getBytes(content), UTF_8));
assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
}
- private byte[] getBytes(final Collection<QpidByteBuffer> content) throws Exception
+ private byte[] getBytes(final QpidByteBuffer content) throws Exception
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- for (QpidByteBuffer buf : content)
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ InputStream contentInputStream = content.asInputStream())
{
- ByteStreams.copy(buf.asInputStream(), bos);
- buf.dispose();
+ ByteStreams.copy(contentInputStream, bos);
+ content.dispose();
+ return bos.toByteArray();
}
- return bos.toByteArray();
}
private Message_1_0 createTestMessage(final EncodingRetainingSection encodingRetainingSection)
@@ -1005,19 +1004,19 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
if (section != null)
{
- final QpidByteBuffer combined = QpidByteBuffer.wrap(ByteBufferUtils.combine(section.getEncodedForm()));
+ // TODO this seems to leak QBBs
+ final QpidByteBuffer combined = section.getEncodedForm();
when(storedMessage.getContentSize()).thenReturn((int) section.getEncodedSize());
final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class);
final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
when(storedMessage.getContent(offsetCaptor.capture(),
- sizeCaptor.capture())).then(invocation ->
- {
- final QpidByteBuffer view = combined.view(
- offsetCaptor.getValue(),
- sizeCaptor.getValue());
- return Collections.singleton(view);
- });
+ sizeCaptor.capture())).then(invocation -> combined.view(offsetCaptor.getValue(),
+ sizeCaptor.getValue()));
+ }
+ else
+ {
+ when(storedMessage.getContent(0,0)).thenReturn(QpidByteBuffer.emptyQpidByteBuffer());
}
return new Message_1_0(storedMessage);
}
@@ -1045,7 +1044,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
assertEquals("Unexpected mime type",
expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
- final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+ final QpidByteBuffer content = convertedMessage.getContent();
assertArrayEquals("Unexpected content", data, getBytes(content));
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java
index 0520baf..f9443e0 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java
@@ -472,8 +472,12 @@ public class PropertyConverter_0_8_to_1_0Test extends QpidTestCase
if (content != null)
{
when(storedMessage.getContentSize()).thenReturn(content.length);
- when(storedMessage.getContent(0, content.length)).thenReturn(Collections.singleton(QpidByteBuffer.wrap(
- content)));
+ when(storedMessage.getContent(0, content.length)).thenReturn(QpidByteBuffer.wrap(content));
+ }
+ else
+ {
+ when(storedMessage.getContentSize()).thenReturn(0);
+ when(storedMessage.getContent(0, 0)).thenReturn(QpidByteBuffer.emptyQpidByteBuffer());
}
return new AMQMessage(storedMessage);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java
index 059e268..6f7e75c 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java
@@ -33,13 +33,12 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.converter.MessageConversionException;
-import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
-import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.protocol.v0_8.FieldTable;
@@ -922,6 +921,8 @@ public class PropertyConverter_1_0_to_0_8Test extends QpidTestCase
final long arrivalTime)
{
final StoredMessage<MessageMetaData_1_0> storedMessage = mock(StoredMessage.class);
+ when(storedMessage.getContentSize()).thenReturn(0);
+ when(storedMessage.getContent(0, 0)).thenReturn(QpidByteBuffer.emptyQpidByteBuffer());
MessageMetaData_1_0 metaData = new MessageMetaData_1_0(header.createEncodingRetainingSection(),
deliveryAnnotations.createEncodingRetainingSection(),
messageAnnotations.createEncodingRetainingSection(),
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java b/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
index e417f9a..d5a4de6 100644
--- a/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
+++ b/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store.derby;
+import java.io.InputStream;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -137,9 +138,9 @@ public abstract class AbstractDerbyMessageStore extends AbstractJDBCMessageStore
}
@Override
- protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ protected InputStream getBlobAsInputStream(ResultSet rs, int col) throws SQLException
{
- return DerbyUtils.getBlobAsBytes(rs, col);
+ return DerbyUtils.getBlobAsInputStream(rs, col);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
----------------------------------------------------------------------
diff --git a/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java b/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
index 9af478d..62c9889 100644
--- a/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
+++ b/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.derby;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.sql.Blob;
@@ -151,10 +152,10 @@ public class DerbyUtils
return new String(bytes, StandardCharsets.UTF_8);
}
- protected static byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ protected static InputStream getBlobAsInputStream(ResultSet rs, int col) throws SQLException
{
Blob dataAsBlob = rs.getBlob(col);
- return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+ return dataAsBlob.getBinaryStream();
}
public static boolean tableExists(final String tableName, final Connection conn) throws SQLException
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 099029c..9c6cfa9 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.store.jdbc;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -31,7 +30,6 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -831,11 +829,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
final int bodySize = 1 + metaData.getStorableSize();
byte[] underlying = new byte[bodySize];
underlying[0] = (byte) metaData.getType().ordinal();
- QpidByteBuffer buf = QpidByteBuffer.wrap(underlying);
- buf.position(1);
- buf = buf.slice();
-
- metaData.writeToBuffer(buf);
+ try (QpidByteBuffer buf = QpidByteBuffer.wrap(underlying))
+ {
+ buf.position(1);
+ try (QpidByteBuffer bufSlice = buf.slice())
+ {
+ metaData.writeToBuffer(buf);
+ }
+ }
try(ByteArrayInputStream bis = new ByteArrayInputStream(underlying))
{
stmt.setBinaryStream(2, bis, underlying.length);
@@ -938,7 +939,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
if (rs.next())
{
- return getStorableMessageMetaData(messageId, getBlobAsBytes(rs, 1));
+ try (InputStream blobAsInputStream = getBlobAsInputStream(rs, 1))
+ {
+ return getStorableMessageMetaData(messageId, blobAsInputStream);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error reading meta data from the store for message with id " + messageId, e);
+ }
}
else
{
@@ -949,20 +957,18 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
}
- private StorableMessageMetaData getStorableMessageMetaData(final long messageId, final byte[] blobAsBytes)
+ private StorableMessageMetaData getStorableMessageMetaData(final long messageId, final InputStream stream)
throws SQLException
{
- try(InputStream stream = new ByteArrayInputStream(blobAsBytes))
+ try
{
int typeOrdinal = stream.read() & 0xff;
MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(typeOrdinal);
- List<QpidByteBuffer> bufs = QpidByteBuffer.asQpidByteBuffers(stream);
- StorableMessageMetaData metaData = type.createMetaData(bufs);
- for (final QpidByteBuffer buf : bufs)
+
+ try (QpidByteBuffer buf = QpidByteBuffer.asQpidByteBuffer(stream))
{
- buf.dispose();
+ return type.createMetaData(buf);
}
- return metaData;
}
catch (IOException | RuntimeException e)
{
@@ -970,41 +976,30 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
}
- protected abstract byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException;
+ protected abstract InputStream getBlobAsInputStream(ResultSet rs, int col) throws SQLException;
private void addContent(final Connection conn, long messageId,
- Collection<QpidByteBuffer> contentBody)
+ QpidByteBuffer contentBody)
{
getLogger().debug("Adding content for message {}", messageId);
- int size = 0;
-
- for(QpidByteBuffer buf : contentBody)
- {
- size += buf.remaining();
- }
- byte[] data = new byte[size];
- ByteBuffer dst = ByteBuffer.wrap(data);
- for(QpidByteBuffer buf : contentBody)
- {
- buf.copyTo(dst);
- }
-
- try(PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + getMessageContentTableName()
- + "( message_id, content ) values (?, ?)"))
+ try (PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + getMessageContentTableName()
+ + "( message_id, content ) values (?, ?)");
+ QpidByteBuffer bodyDuplicate = contentBody.duplicate();
+ InputStream inputStream = bodyDuplicate.asInputStream())
{
stmt.setLong(1, messageId);
- stmt.setBinaryStream(2, new ByteArrayInputStream(data), data.length);
+ stmt.setBinaryStream(2, inputStream, contentBody.remaining());
stmt.executeUpdate();
}
- catch (SQLException e)
+ catch (SQLException | IOException e)
{
JdbcUtils.closeConnection(conn, getLogger());
throw new StoreException("Error adding content for message " + messageId + ": " + e.getMessage(), e);
}
}
- Collection<QpidByteBuffer> getAllContent(long messageId) throws StoreException
+ QpidByteBuffer getAllContent(long messageId) throws StoreException
{
getLogger().debug("Message Id: {} Getting content body", messageId);
@@ -1017,18 +1012,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
if (rs.next())
{
- byte[] data = getBlobAsBytes(rs, 1);
- int offset = 0;
- int length = data.length;
- Collection<QpidByteBuffer> buffers = QpidByteBuffer.allocateDirectCollection(length);
- for(QpidByteBuffer buf : buffers)
+ try (InputStream blobAsInputStream = getBlobAsInputStream(rs, 1))
{
- int bufSize = buf.remaining();
- buf.put(data, offset, bufSize);
- buf.flip();
- offset+=bufSize;
+ return QpidByteBuffer.asQpidByteBuffer(blobAsInputStream);
}
- return buffers;
}
else
{
@@ -1036,7 +1023,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
}
- catch (SQLException e)
+ catch (SQLException | IOException e)
{
throw new StoreException("Error retrieving content for message " + messageId + ": " + e.getMessage(), e);
}
@@ -1262,7 +1249,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
private static class MessageDataRef<T extends StorableMessageMetaData>
{
private volatile T _metaData;
- private volatile Collection<QpidByteBuffer> _data;
+ private volatile QpidByteBuffer _data;
private volatile boolean _isHardRef;
private MessageDataRef(final T metaData, boolean isHardRef)
@@ -1270,7 +1257,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
this(metaData, null, isHardRef);
}
- private MessageDataRef(final T metaData, Collection<QpidByteBuffer> data, boolean isHardRef)
+ private MessageDataRef(final T metaData, QpidByteBuffer data, boolean isHardRef)
{
_metaData = metaData;
_data = data;
@@ -1282,12 +1269,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
return _metaData;
}
- public Collection<QpidByteBuffer> getData()
+ public QpidByteBuffer getData()
{
return _data;
}
- public void setData(final Collection<QpidByteBuffer> data)
+ public void setData(final QpidByteBuffer data)
{
_data = data;
}
@@ -1322,11 +1309,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
if(_data != null)
{
- for(QpidByteBuffer buf : _data)
- {
- bytesCleared += buf.remaining();
- buf.dispose();
- }
+ bytesCleared += _data.remaining();
+ _data.dispose();
_data = null;
}
return bytesCleared;
@@ -1405,20 +1389,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
@Override
public synchronized void addContent(QpidByteBuffer src)
{
- src = src.slice();
- Collection<QpidByteBuffer> data = _messageDataRef.getData();
- if(data == null)
+ try(QpidByteBuffer data = _messageDataRef.getData())
{
- _messageDataRef.setData(Collections.singleton(src));
- }
- else
- {
- List<QpidByteBuffer> newCollection = new ArrayList<>(data.size()+1);
- newCollection.addAll(data);
- newCollection.add(src);
- _messageDataRef.setData(Collections.unmodifiableCollection(newCollection));
+ if(data == null)
+ {
+ _messageDataRef.setData(src.slice());
+ }
+ else
+ {
+ _messageDataRef.setData(QpidByteBuffer.concatenate(Arrays.asList(data, src)));
+ }
}
-
}
@Override
@@ -1429,11 +1410,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
/**
- * returns QBBs containing the content. The caller must not dispose of them because we keep a reference in _messageDataRef.
+ * returns QBB containing the content. The caller must not dispose of them because we keep a reference in _messageDataRef.
*/
- private Collection<QpidByteBuffer> getContentAsByteBuffer()
+ private QpidByteBuffer getContentAsByteBuffer()
{
- Collection<QpidByteBuffer> data = _messageDataRef == null ? Collections.<QpidByteBuffer>emptyList() : _messageDataRef.getData();
+ QpidByteBuffer data = _messageDataRef == null ? QpidByteBuffer.emptyQpidByteBuffer() : _messageDataRef.getData();
if(data == null)
{
if(stored())
@@ -1445,57 +1426,16 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
else
{
- data = Collections.emptyList();
+ data = QpidByteBuffer.emptyQpidByteBuffer();
}
}
return data;
}
@Override
- public synchronized Collection<QpidByteBuffer> getContent(int offset, int length)
+ public synchronized QpidByteBuffer getContent(int offset, int length)
{
- Collection<QpidByteBuffer> bufs = getContentAsByteBuffer();
- Collection<QpidByteBuffer> content = new ArrayList<>(bufs.size());
-
- int pos = 0;
- for (QpidByteBuffer buf : bufs)
- {
- if (length > 0)
- {
- int bufRemaining = buf.remaining();
- if (pos + bufRemaining <= offset)
- {
- pos += bufRemaining;
- }
- else if (pos >= offset)
- {
- buf = buf.duplicate();
- if (bufRemaining <= length)
- {
- length -= bufRemaining;
- }
- else
- {
- buf.limit(length);
- length = 0;
- }
- content.add(buf);
- pos += buf.remaining();
-
- }
- else
- {
- int offsetInBuf = offset - pos;
- int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
- final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
- content.add(bufView);
- length -= limit;
- pos+=limit+offsetInBuf;
- }
- }
-
- }
- return content;
+ return getContentAsByteBuffer().view(offset, length);
}
@Override
@@ -1517,7 +1457,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
AbstractJDBCMessageStore.this.storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
AbstractJDBCMessageStore.this.addContent(conn, _messageId,
_messageDataRef.getData() == null
- ? Collections.<QpidByteBuffer>emptySet()
+ ? QpidByteBuffer.emptyQpidByteBuffer()
: _messageDataRef.getData());
getLogger().debug("Storing message {} to store", _messageId);
@@ -1569,14 +1509,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
metaData.dispose();
}
- Collection<QpidByteBuffer> data = _messageDataRef.getData();
- if(data != null)
+ try (QpidByteBuffer data = _messageDataRef.getData())
{
- bytesCleared += getContentSize();
- _messageDataRef.setData(null);
- for(QpidByteBuffer buf : data)
+ if (data != null)
{
- buf.dispose();
+ bytesCleared += getContentSize();
+ _messageDataRef.setData(null);
}
}
_messageDataRef = null;
@@ -1670,9 +1608,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
if (rs.next())
{
- byte[] dataAsBytes = getBlobAsBytes(rs, 2);
- StorableMessageMetaData metaData = getStorableMessageMetaData(messageId, dataAsBytes);
- message = createStoredJDBCMessage(messageId, metaData, true);
+ try (InputStream blobAsInputStream = getBlobAsInputStream(rs, 2))
+ {
+ final StorableMessageMetaData metaData = getStorableMessageMetaData(messageId, blobAsInputStream);
+ message = createStoredJDBCMessage(messageId, metaData, true);
+ }
}
else
{
@@ -1682,7 +1622,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
return message;
}
- catch (SQLException e)
+ catch (SQLException | IOException e)
{
throw new StoreException("Error encountered when visiting messages", e);
}
@@ -1710,18 +1650,20 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
while (rs.next())
{
long messageId = rs.getLong(1);
- byte[] dataAsBytes = getBlobAsBytes(rs, 2);
- StorableMessageMetaData metaData = getStorableMessageMetaData(messageId, dataAsBytes);
- StoredJDBCMessage message = createStoredJDBCMessage(messageId, metaData, true);
- if (!handler.handle(message))
+ try (InputStream dataAsInputStream = getBlobAsInputStream(rs, 2))
{
- break;
+ StorableMessageMetaData metaData = getStorableMessageMetaData(messageId, dataAsInputStream);
+ StoredJDBCMessage message = createStoredJDBCMessage(messageId, metaData, true);
+ if (!handler.handle(message))
+ {
+ break;
+ }
}
}
}
}
}
- catch (SQLException e)
+ catch (SQLException | IOException e)
{
throw new StoreException("Error encountered when visiting messages", e);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
index 3b77b10..efbedb3 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
@@ -22,7 +22,9 @@ package org.apache.qpid.server.store.jdbc;
import static org.apache.qpid.server.store.jdbc.AbstractJDBCConfigurationStore.State.CLOSED;
import static org.apache.qpid.server.store.jdbc.AbstractJDBCConfigurationStore.State.CONFIGURED;
+import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.InputStream;
import java.nio.charset.Charset;
import java.sql.Blob;
import java.sql.Connection;
@@ -204,17 +206,16 @@ public class GenericJDBCConfigurationStore extends AbstractJDBCConfigurationStor
}
- protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ protected InputStream getBlobAsInputStream(ResultSet rs, int col) throws SQLException
{
if(_useBytesMethodsForBlob)
{
- return rs.getBytes(col);
+ return new ByteArrayInputStream(rs.getBytes(col));
}
else
{
Blob dataAsBlob = rs.getBlob(col);
- return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
-
+ return dataAsBlob.getBinaryStream();
}
}
@@ -298,9 +299,9 @@ public class GenericJDBCConfigurationStore extends AbstractJDBCConfigurationStor
}
@Override
- protected byte[] getBlobAsBytes(final ResultSet rs, final int col) throws SQLException
+ protected InputStream getBlobAsInputStream(final ResultSet rs, final int col) throws SQLException
{
- return GenericJDBCConfigurationStore.this.getBlobAsBytes(rs, col);
+ return GenericJDBCConfigurationStore.this.getBlobAsInputStream(rs, col);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
index 1b3c827..551362d 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
@@ -21,7 +21,9 @@
package org.apache.qpid.server.store.jdbc;
+import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.InputStream;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -156,17 +158,16 @@ public class GenericJDBCMessageStore extends GenericAbstractJDBCMessageStore
}
@Override
- protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ protected InputStream getBlobAsInputStream(ResultSet rs, int col) throws SQLException
{
if(_useBytesMethodsForBlob)
{
- return rs.getBytes(col);
+ return new ByteArrayInputStream(rs.getBytes(col));
}
else
{
Blob dataAsBlob = rs.getBlob(col);
- return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
-
+ return dataAsBlob.getBinaryStream();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index f81648b..5dbbe67 100644
--- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -34,9 +34,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
@@ -856,69 +856,18 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
private static class QBBTrackingThreadPool extends QueuedThreadPool
{
- private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
+ private final ThreadFactory _threadFactory;
public QBBTrackingThreadPool(@Name("maxThreads") final int maxThreads, @Name("minThreads") final int minThreads)
{
super(maxThreads, minThreads);
- }
-
- @Override
- protected void doStop() throws Exception
- {
- try
- {
- super.doStop();
- }
- finally
- {
- for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
- {
- qpidByteBuffer.dispose();
- }
- _cachedBufferMap.clear();
- }
+ _threadFactory = QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(r -> QBBTrackingThreadPool.super.newThread(r));
}
@Override
protected Thread newThread(final Runnable runnable)
{
- return super.newThread(() ->
- {
- try
- {
- runnable.run();
- }
- finally
- {
- QpidByteBuffer qbb = _cachedBufferMap.remove(Thread.currentThread());
- if (qbb != null)
- {
- qbb.dispose();
- }
- }
- });
- }
-
- @Override
- protected void runJob(final Runnable job)
- {
- try
- {
- super.runJob(job);
- }
- finally
- {
- final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
- if (cachedThreadLocalBuffer != null)
- {
- _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
- }
- else
- {
- _cachedBufferMap.remove(Thread.currentThread());
- }
- }
+ return _threadFactory.newThread(runnable);
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
index 3242f01..a33a3d6 100644
--- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
+++ b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
@@ -45,7 +45,6 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryVisitor;
-import org.apache.qpid.server.util.ByteBufferUtils;
public class ReportRunner<T>
{
@@ -216,11 +215,11 @@ public class ReportRunner<T>
{
final MessageInfoImpl messageInfo = new MessageInfoImpl(entry, true);
ServerMessage message = entry.getMessage();
- final Collection<QpidByteBuffer> contentBuffers = message.getContent(0, (int) message.getSize());
- final ByteBuffer content = ByteBufferUtils.combine(contentBuffers);
- for(QpidByteBuffer buf : contentBuffers)
+ byte[] content;
+ try (QpidByteBuffer contentBuffer = message.getContent())
{
- buf.dispose();
+ content = new byte[contentBuffer.remaining()];
+ contentBuffer.get(content);
}
return new ReportableMessage()
@@ -240,7 +239,7 @@ public class ReportRunner<T>
@Override
public ByteBuffer getContent()
{
- return content.asReadOnlyBuffer();
+ return ByteBuffer.wrap(content).asReadOnlyBuffer();
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java
index a1c3f34..7c85f39 100644
--- a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java
+++ b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java
@@ -35,6 +35,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
@@ -67,6 +68,7 @@ public class ReportRunnerTest extends QpidTestCase
final ServerMessage message = mock(ServerMessage.class);
final AMQMessageHeader header = mock(AMQMessageHeader.class);
when(message.getMessageHeader()).thenReturn(header);
+ when(message.getContent()).thenReturn(QpidByteBuffer.emptyQpidByteBuffer());
return message;
}
@@ -158,6 +160,7 @@ public class ReportRunnerTest extends QpidTestCase
}
});
when(header.getHeaderNames()).thenReturn(props.keySet());
+ when(message.getContent()).thenReturn(QpidByteBuffer.emptyQpidByteBuffer());
return message;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
----------------------------------------------------------------------
diff --git a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index 76b3fbd..8394ca8 100644
--- a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -30,11 +30,10 @@ import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
@@ -130,68 +129,7 @@ class WebSocketProvider implements AcceptingTransport
{
_idleTimeoutChecker.start();
- _server = new Server(new QueuedThreadPool()
- {
- private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
-
- @Override
- protected void doStop() throws Exception
- {
- try
- {
- super.doStop();
- }
- finally
- {
- for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
- {
- qpidByteBuffer.dispose();
- }
- _cachedBufferMap.clear();
- }
- }
-
- @Override
- protected Thread newThread(final Runnable runnable)
- {
- return super.newThread(() ->
- {
- try
- {
- runnable.run();
- }
- finally
- {
- QpidByteBuffer qbb = _cachedBufferMap.remove(Thread.currentThread());
- if (qbb != null)
- {
- qbb.dispose();
- }
- }
- });
- }
-
- @Override
- protected void runJob(final Runnable job)
- {
- try
- {
- super.runJob(job);
- }
- finally
- {
- final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
- if (cachedThreadLocalBuffer != null)
- {
- _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
- }
- else
- {
- _cachedBufferMap.remove(Thread.currentThread());
- }
- }
- }
- });
+ _server = new Server(new QBBTrackingThreadPool());
final ServerConnector connector;
HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory();
@@ -357,6 +295,17 @@ class WebSocketProvider implements AcceptingTransport
((ServerConnector) server.getConnectors()[0]).getLocalPort();
}
+ private static class QBBTrackingThreadPool extends QueuedThreadPool
+ {
+ private final ThreadFactory _threadFactory = QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(r -> QBBTrackingThreadPool.super.newThread(r));
+
+ @Override
+ protected Thread newThread(final Runnable runnable)
+ {
+ return _threadFactory.newThread(runnable);
+ }
+ }
+
@WebSocket
public class AmqpWebSocket
{
@@ -442,13 +391,14 @@ class WebSocketProvider implements AcceptingTransport
private void restoreApplicationBufferForWrite()
{
- QpidByteBuffer oldNetInputBuffer = _netInputBuffer;
- int unprocessedDataLength = _netInputBuffer.remaining();
+ try (QpidByteBuffer oldNetInputBuffer = _netInputBuffer)
+ {
+ int unprocessedDataLength = _netInputBuffer.remaining();
- _netInputBuffer.limit(_netInputBuffer.capacity());
- _netInputBuffer = oldNetInputBuffer.slice();
- _netInputBuffer.limit(unprocessedDataLength);
- oldNetInputBuffer.dispose();
+ _netInputBuffer.limit(_netInputBuffer.capacity());
+ _netInputBuffer = oldNetInputBuffer.slice();
+ _netInputBuffer.limit(unprocessedDataLength);
+ }
if (_netInputBuffer.limit() != _netInputBuffer.capacity())
{
_netInputBuffer.position(_netInputBuffer.limit());
@@ -456,22 +406,23 @@ class WebSocketProvider implements AcceptingTransport
}
else
{
- QpidByteBuffer currentBuffer = _netInputBuffer;
- int newBufSize;
-
- if (currentBuffer.capacity() < _broker.getNetworkBufferSize())
+ try (QpidByteBuffer currentBuffer = _netInputBuffer)
{
- newBufSize = _broker.getNetworkBufferSize();
- }
- else
- {
- newBufSize = currentBuffer.capacity() + _broker.getNetworkBufferSize();
- reportUnexpectedByteBufferSizeUsage();
- }
+ int newBufSize;
- _netInputBuffer = QpidByteBuffer.allocateDirect(newBufSize);
- _netInputBuffer.put(currentBuffer);
- currentBuffer.dispose();
+ if (currentBuffer.capacity() < _broker.getNetworkBufferSize())
+ {
+ newBufSize = _broker.getNetworkBufferSize();
+ }
+ else
+ {
+ newBufSize = currentBuffer.capacity() + _broker.getNetworkBufferSize();
+ reportUnexpectedByteBufferSizeUsage();
+ }
+
+ _netInputBuffer = QpidByteBuffer.allocateDirect(newBufSize);
+ _netInputBuffer.put(currentBuffer);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 4cb0ccb..4d53751 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -26,7 +26,6 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import java.net.InetSocketAddress;
-import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -34,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -158,11 +158,32 @@ public class FrameTransport implements AutoCloseable
{
Preconditions.checkState(_channel != null, "Not connected");
ChannelPromise promise = _channel.newPromise();
- final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null;
- TransportFrame transportFrame = new TransportFrame(channel.shortValue(), frameBody, payload);
- _channel.write(transportFrame, promise);
- _channel.flush();
- return JdkFutureAdapters.listenInPoolThread(promise);
+ final TransportFrame transportFrame;
+ try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null)
+ {
+ final QpidByteBuffer duplicate;
+ if (payload == null)
+ {
+ duplicate = null;
+ }
+ else
+ {
+ duplicate = payload.duplicate();
+ }
+ transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
+ _channel.write(transportFrame, promise);
+ _channel.flush();
+ final ListenableFuture<Void> listenableFuture = JdkFutureAdapters.listenInPoolThread(promise);
+ if (frameBody instanceof Transfer)
+ {
+ listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
+ }
+ if (duplicate != null)
+ {
+ listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor());
+ }
+ return listenableFuture;
+ }
}
public ListenableFuture<Void> sendPerformative(final SaslFrameBody frameBody) throws Exception
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 0b91273..52518ab 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -761,7 +761,7 @@ public class Interaction
return this;
}
- public Interaction transferPayload(final List<QpidByteBuffer> payload)
+ public Interaction transferPayload(final QpidByteBuffer payload)
{
_transfer.setPayload(payload);
return this;
@@ -777,12 +777,10 @@ public class Interaction
{
AmqpValue amqpValue = new AmqpValue(payload);
final AmqpValueSection section = amqpValue.createEncodingRetainingSection();
- final List<QpidByteBuffer> encodedForm = section.getEncodedForm();
- transfer.setPayload(encodedForm);
- section.dispose();
- for (QpidByteBuffer qbb : encodedForm)
+ try (QpidByteBuffer encodedForm = section.getEncodedForm())
{
- qbb.dispose();
+ transfer.setPayload(encodedForm);
+ section.dispose();
}
}
@@ -806,15 +804,12 @@ public class Interaction
transferCopy.setResume(transfer.getResume());
transferCopy.setAborted(transfer.getAborted());
transferCopy.setBatchable(transfer.getBatchable());
- final List<QpidByteBuffer> payload = transfer.getPayload();
- if (payload != null)
+ try (QpidByteBuffer payload = transfer.getPayload())
{
- final List<QpidByteBuffer> payloadCopy = new ArrayList<>(payload.size());
- for (QpidByteBuffer qpidByteBuffer : payload)
+ if (payload != null)
{
- payloadCopy.add(qpidByteBuffer.duplicate());
+ transferCopy.setPayload(payload);
}
- transferCopy.setPayload(payloadCopy);
}
return transferCopy;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
index 8656b9b..1a513f7 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
@@ -32,7 +32,6 @@ import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
@@ -72,14 +71,20 @@ public class MessageDecoder
{
throw new IllegalStateException("The section fragments have already been parsed");
}
- _fragments.addAll(transfer.getPayload());
+ _fragments.add(transfer.getPayload());
}
public void parse() throws AmqpErrorException
{
if (!_parsed)
{
- List<EncodingRetainingSection<?>> sections = _sectionDecoder.parseAll(_fragments);
+ List<EncodingRetainingSection<?>> sections;
+ try (QpidByteBuffer combined = QpidByteBuffer.concatenate(_fragments))
+ {
+ sections = _sectionDecoder.parseAll(combined);
+ }
+ _fragments.forEach(QpidByteBuffer::dispose);
+
Iterator<EncodingRetainingSection<?>> iter = sections.iterator();
EncodingRetainingSection<?> s = iter.hasNext() ? iter.next() : null;
if (s instanceof HeaderSection)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
index 0c77ab1..37e327e 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
@@ -44,12 +44,12 @@ public class MessageEncoder
_header = header;
}
- public List<QpidByteBuffer> getPayload()
+ public QpidByteBuffer getPayload()
{
List<QpidByteBuffer> payload = new ArrayList<>();
if (_header != null)
{
- payload.addAll(_header.createEncodingRetainingSection().getEncodedForm());
+ payload.add(_header.createEncodingRetainingSection().getEncodedForm());
}
if (_data.isEmpty())
@@ -70,10 +70,12 @@ public class MessageEncoder
for (EncodingRetainingSection<?> section: dataSections)
{
- payload.addAll(section.getEncodedForm());
+ payload.add(section.getEncodedForm());
section.dispose();
}
- return payload;
+ QpidByteBuffer combined = QpidByteBuffer.concatenate(payload);
+ payload.forEach(QpidByteBuffer::dispose);
+ return combined;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
index dbd4cd6..68f4322 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
@@ -59,8 +59,10 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
@Override
public void send(final QpidByteBuffer msg)
{
+ byte[] data = new byte[msg.remaining()];
+ msg.get(data);
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
- buffer.writeBytes(msg.asByteBuffer());
+ buffer.writeBytes(data);
try
{
OutputHandler.super.write(ctx, buffer, promise);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org