You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/10/18 15:21:44 UTC
[10/10] qpid-broker-j git commit: QPID-7832: [Java Broker] Refactor
store/protocol API using Collection
QPID-7832: [Java Broker] Refactor store/protocol API using Collection<QpidByteBuffer>
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/660c206d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/660c206d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/660c206d
Branch: refs/heads/master
Commit: 660c206deb352aca3694a6b31f5f7cf6fca70533
Parents: 955a79b
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri Oct 13 16:10:50 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Wed Oct 18 16:21:21 2017 +0100
----------------------------------------------------------------------
.../berkeleydb/AbstractBDBMessageStore.java | 199 +--
.../berkeleydb/tuple/ByteBufferBinding.java | 15 +-
.../tuple/MessageMetaDataBinding.java | 25 +-
.../store/berkeleydb/BDBMessageStoreTest.java | 247 +--
.../bytebuffer/NonPooledByteBufferRef.java | 4 +
.../server/bytebuffer/PooledByteBufferRef.java | 6 +
.../qpid/server/bytebuffer/QpidByteBuffer.java | 1656 +++++++++++++-----
.../bytebuffer/QpidByteBufferInputStream.java | 109 --
.../bytebuffer/QpidByteBufferOutputStream.java | 62 +-
.../server/bytebuffer/QpidByteBufferUtils.java | 285 ---
.../configuration/updater/TaskExecutorImpl.java | 33 +-
.../message/AbstractServerMessageImpl.java | 8 +-
.../server/message/MessageContentSource.java | 5 +-
.../message/internal/InternalMessage.java | 21 +-
.../internal/InternalMessageMetaDataType.java | 6 +-
.../qpid/server/plugin/MessageFormat.java | 6 +-
.../qpid/server/plugin/MessageMetaDataType.java | 6 +-
...idByteBufferDisposingThreadPoolExecutor.java | 81 -
.../server/protocol/v0_8/AMQShortString.java | 2 +-
.../qpid/server/protocol/v0_8/FieldArray.java | 11 +-
.../apache/qpid/server/queue/AbstractQueue.java | 53 +-
.../qpid/server/store/MemoryMessageStore.java | 9 +-
.../qpid/server/store/StoredMemoryMessage.java | 77 +-
.../apache/qpid/server/store/StoredMessage.java | 4 +-
.../store/serializer/v1/Deserializer.java | 27 +-
.../store/serializer/v1/MessageRecord.java | 19 +-
.../v1/MessageStoreSerializer_v1.java | 35 +-
.../transport/MultiVersionProtocolEngine.java | 41 +-
.../transport/NetworkConnectionScheduler.java | 17 +-
.../server/transport/NonBlockingConnection.java | 52 +-
.../NonBlockingConnectionDelegate.java | 3 +-
.../NonBlockingConnectionPlainDelegate.java | 50 +-
.../NonBlockingConnectionTLSDelegate.java | 61 +-
.../NonBlockingConnectionUndecidedDelegate.java | 38 +-
.../transport/network/security/ssl/SSLUtil.java | 11 +-
.../qpid/server/transport/util/Functions.java | 13 -
.../qpid/server/util/ByteBufferUtils.java | 71 -
.../qpid/server/util/HousekeepingExecutor.java | 26 +-
.../AsynchronousMessageStoreRecoverer.java | 13 +-
.../QpidByteBufferOutputStreamTest.java | 119 +-
.../server/bytebuffer/QpidByteBufferTest.java | 491 +++---
.../server/protocol/v0_8/FieldTableTest.java | 10 +-
.../security/TrustStoreMessageSourceTest.java | 22 +-
.../qpid/server/store/MessageStoreTestCase.java | 12 -
.../store/TestMessageMetaDataFactory.java | 9 +-
.../server/store/TestMessageMetaDataType.java | 15 +-
.../qpid/server/txn/MockServerMessage.java | 10 +-
.../VirtualHostPropertiesNodeTest.java | 4 +
.../protocol/v0_10/ConsumerTarget_0_10.java | 63 +-
.../MessageConverter_Internal_to_v0_10.java | 6 +-
.../protocol/v0_10/MessageConverter_v0_10.java | 4 +-
.../MessageConverter_v0_10_to_Internal.java | 8 +-
.../protocol/v0_10/MessageFormat_0_10.java | 42 +-
.../v0_10/MessageMetaDataType_0_10.java | 6 +-
.../protocol/v0_10/MessageMetaData_0_10.java | 2 +-
.../protocol/v0_10/MessageTransferMessage.java | 8 +-
.../server/protocol/v0_10/ServerAssembler.java | 208 ++-
.../server/protocol/v0_10/ServerConnection.java | 11 +-
.../server/protocol/v0_10/ServerDecoder.java | 117 +-
.../protocol/v0_10/ServerDisassembler.java | 109 +-
.../protocol/v0_10/ServerSessionDelegate.java | 7 +-
.../v0_10/transport/MessageTransfer.java | 79 +-
.../server/protocol/v0_10/transport/Method.java | 10 +-
.../MessageConverter_0_10_to_InternalTest.java | 13 +-
.../MessageConverter_Internal_to_0_10Test.java | 30 +-
.../PropertyConverter_0_10_to_InternalTest.java | 8 +-
...PropertyConverter_Internal_to_v0_10Test.java | 4 +-
.../protocol/v0_10/ServerSessionTest.java | 11 +-
.../qpid/server/protocol/v0_8/AMQChannel.java | 22 +
.../protocol/v0_8/AMQPConnection_0_8Impl.java | 19 +-
.../qpid/server/protocol/v0_8/CachedFrame.java | 14 +-
.../v0_8/MessageConverter_Internal_to_v0_8.java | 6 +-
.../v0_8/MessageConverter_v0_8_to_Internal.java | 8 +-
.../protocol/v0_8/MessageFormat_0_9_1.java | 153 +-
.../server/protocol/v0_8/MessageMetaData.java | 39 +-
.../protocol/v0_8/MessageMetaDataType_0_8.java | 6 +-
.../v0_8/ProtocolOutputConverterImpl.java | 106 +-
.../protocol/v0_8/transport/AMQFrame.java | 23 +-
.../v0_8/transport/AMQMethodBodyImpl.java | 15 +-
.../transport/BasicContentHeaderProperties.java | 33 +-
.../protocol/v0_8/transport/ContentBody.java | 19 +-
.../v0_8/transport/ContentHeaderBody.java | 22 +-
.../server/protocol/v0_8/AMQDecoderTest.java | 65 +-
.../MessageConverter_0_8_to_InternalTest.java | 13 +-
.../MessageConverter_Internal_to_0_8Test.java | 31 +-
.../v0_8/MessageMetaDataFactoryTest.java | 54 +-
.../PropertyConverter_v0_8_to_InternalTest.java | 9 +-
.../protocol/v1_0/AMQPConnection_1_0.java | 3 +-
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 115 +-
.../v1_0/AbstractReceivingLinkEndpoint.java | 12 +
.../protocol/v1_0/ConsumerTarget_1_0.java | 26 +-
.../qpid/server/protocol/v1_0/Delivery.java | 21 +-
.../v1_0/MessageConverter_from_1_0.java | 49 +-
.../protocol/v1_0/MessageConverter_to_1_0.java | 42 +-
.../server/protocol/v1_0/MessageFormat_1_0.java | 11 +-
.../protocol/v1_0/MessageMetaDataType_1_0.java | 6 +-
.../protocol/v1_0/MessageMetaData_1_0.java | 25 +-
.../qpid/server/protocol/v1_0/Message_1_0.java | 60 +-
.../qpid/server/protocol/v1_0/Session_1_0.java | 18 +-
.../v1_0/StandardReceivingLinkEndpoint.java | 13 +-
.../TxnCoordinatorReceivingLinkEndpoint.java | 106 +-
.../codec/AbstractCompositeTypeConstructor.java | 22 +-
.../codec/AbstractDescribedTypeConstructor.java | 8 +-
.../v1_0/codec/ArrayTypeConstructor.java | 27 +-
.../v1_0/codec/BinaryTypeConstructor.java | 15 +-
.../protocol/v1_0/codec/BooleanConstructor.java | 15 +-
.../v1_0/codec/ByteTypeConstructor.java | 9 +-
.../v1_0/codec/CharTypeConstructor.java | 13 +-
.../protocol/v1_0/codec/DecimalConstructor.java | 22 +-
.../v1_0/codec/DescribedTypeConstructor.java | 6 +-
.../v1_0/codec/DoubleTypeConstructor.java | 9 +-
.../v1_0/codec/FloatTypeConstructor.java | 9 +-
.../server/protocol/v1_0/codec/FrameWriter.java | 38 +-
.../protocol/v1_0/codec/IntTypeConstructor.java | 9 +-
.../protocol/v1_0/codec/ListConstructor.java | 15 +-
.../v1_0/codec/LongTypeConstructor.java | 9 +-
.../protocol/v1_0/codec/MapConstructor.java | 18 +-
.../v1_0/codec/NullTypeConstructor.java | 4 +-
.../v1_0/codec/ShortTypeConstructor.java | 9 +-
.../v1_0/codec/SmallIntConstructor.java | 9 +-
.../v1_0/codec/SmallLongConstructor.java | 9 +-
.../v1_0/codec/SmallUIntConstructor.java | 9 +-
.../v1_0/codec/SmallULongConstructor.java | 9 +-
.../v1_0/codec/StringTypeConstructor.java | 61 +-
.../v1_0/codec/SymbolTypeConstructor.java | 67 +-
.../v1_0/codec/TimestampTypeConstructor.java | 8 +-
.../protocol/v1_0/codec/TypeConstructor.java | 4 +-
.../v1_0/codec/UByteTypeConstructor.java | 9 +-
.../v1_0/codec/UIntTypeConstructor.java | 9 +-
.../v1_0/codec/ULongTypeConstructor.java | 9 +-
.../v1_0/codec/UShortTypeConstructor.java | 9 +-
.../v1_0/codec/UUIDTypeConstructor.java | 10 +-
.../protocol/v1_0/codec/ValueHandler.java | 42 +-
.../v1_0/codec/ZeroListConstructor.java | 2 +-
.../v1_0/codec/ZeroUIntConstructor.java | 6 +-
.../v1_0/codec/ZeroULongConstructor.java | 6 +-
.../server/protocol/v1_0/framing/AMQFrame.java | 12 +-
.../protocol/v1_0/framing/FrameHandler.java | 21 +-
.../protocol/v1_0/framing/TransportFrame.java | 4 +-
.../protocol/v1_0/messaging/SectionDecoder.java | 2 +-
.../v1_0/messaging/SectionDecoderImpl.java | 36 +-
.../protocol/v1_0/store/LinkStoreUtils.java | 22 +-
.../v1_0/type/messaging/AbstractSection.java | 101 +-
.../type/messaging/AmqpSequenceSection.java | 2 +-
.../v1_0/type/messaging/AmqpValueSection.java | 4 +-
.../messaging/ApplicationPropertiesSection.java | 3 +-
.../v1_0/type/messaging/DataSection.java | 4 +-
.../messaging/DeliveryAnnotationsSection.java | 3 +-
.../v1_0/type/messaging/FooterSection.java | 3 +-
.../v1_0/type/messaging/HeaderSection.java | 4 +-
.../messaging/MessageAnnotationsSection.java | 3 +-
.../v1_0/type/messaging/PropertiesSection.java | 4 +-
.../codec/AbstractLazyConstructor.java | 44 +-
.../codec/AmqpSequenceSectionConstructor.java | 4 +-
.../codec/AmqpValueSectionConstructor.java | 33 +-
...ApplicationPropertiesSectionConstructor.java | 4 +-
.../messaging/codec/DataSectionConstructor.java | 31 +-
.../DeliveryAnnotationsSectionConstructor.java | 4 +-
.../codec/DescribedListSectionConstructor.java | 35 +-
.../codec/DescribedMapSectionConstructor.java | 35 +-
.../type/messaging/codec/EncodingRetaining.java | 4 +-
.../codec/FooterSectionConstructor.java | 4 +-
.../codec/HeaderSectionConstructor.java | 4 +-
.../MessageAnnotationsSectionConstructor.java | 4 +-
.../codec/PropertiesSectionConstructor.java | 4 +-
.../protocol/v1_0/type/transport/Transfer.java | 31 +-
.../protocol/v1_0/ConsumerTarget_1_0Test.java | 36 +-
.../MessageConverter_Internal_to_1_0Test.java | 20 +-
.../MessageConverter_v1_0_to_InternalTest.java | 33 +-
.../PropertyConverter_v1_0_to_InternalTest.java | 13 +-
.../MessageConverter_1_0_to_v0_10.java | 6 +-
.../MessageConverter_0_10_to_1_0Test.java | 21 +-
.../MessageConverter_1_0_to_v0_10Test.java | 88 +-
.../PropertyConverter_0_10_to_1_0Test.java | 8 +-
.../PropertyConverter_1_0_to_0_10Test.java | 16 +-
.../MessageConverter_0_10_to_0_8.java | 23 +-
.../MessageConverter_0_8_to_0_10.java | 16 +-
.../PropertyConverter_0_10_to_0_8Test.java | 8 +-
.../PropertyConverter_0_8_to_0_10Test.java | 9 +-
.../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java | 6 +-
.../MessageConverter_0_8_to_1_0Test.java | 21 +-
.../MessageConverter_1_0_to_v0_8Test.java | 87 +-
.../PropertyConverter_0_8_to_1_0Test.java | 8 +-
.../PropertyConverter_1_0_to_0_8Test.java | 5 +-
.../store/derby/AbstractDerbyMessageStore.java | 5 +-
.../qpid/server/store/derby/DerbyUtils.java | 5 +-
.../store/jdbc/AbstractJDBCMessageStore.java | 204 +--
.../jdbc/GenericJDBCConfigurationStore.java | 13 +-
.../store/jdbc/GenericJDBCMessageStore.java | 9 +-
.../management/plugin/HttpManagement.java | 59 +-
.../management/plugin/report/ReportRunner.java | 11 +-
.../plugin/report/ReportRunnerTest.java | 3 +
.../transport/websocket/WebSocketProvider.java | 119 +-
.../tests/protocol/v1_0/FrameTransport.java | 33 +-
.../qpid/tests/protocol/v1_0/Interaction.java | 19 +-
.../tests/protocol/v1_0/MessageDecoder.java | 11 +-
.../tests/protocol/v1_0/MessageEncoder.java | 10 +-
.../qpid/tests/protocol/v1_0/OutputHandler.java | 4 +-
.../apache/qpid/tests/protocol/v1_0/Utils.java | 44 +-
.../tests/protocol/v1_0/DecodeErrorTest.java | 39 +-
.../protocol/v1_0/messaging/MessageFormat.java | 9 +-
.../v1_0/messaging/MultiTransferTest.java | 60 +-
.../protocol/v1_0/messaging/OutcomeTest.java | 1 -
.../protocol/v1_0/messaging/TransferTest.java | 8 +-
.../transport/link/ResumeDeliveriesTest.java | 13 +-
.../qpid/transport/ProtocolNegotiationTest.java | 1 +
206 files changed, 3492 insertions(+), 4625 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index f84b02c..c871508 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -21,11 +21,9 @@ package org.apache.qpid.server.store.berkeleydb;
import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG;
import static org.apache.qpid.server.store.berkeleydb.BDBUtils.abortTransactionSafely;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -62,7 +60,6 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.tuple.ByteBufferBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
@@ -88,12 +85,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private static final String BRIDGEDB_NAME = "BRIDGES";
private static final String LINKDB_NAME = "LINKS";
private static final String XID_DB_NAME = "XIDS";
- private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0);
private final EventManager _eventManager = new EventManager();
private final DatabaseEntry MESSAGE_METADATA_SEQ_KEY = new DatabaseEntry("MESSAGE_METADATA_SEQ_KEY".getBytes(
- Charset.forName("UTF-8")));
+ StandardCharsets.UTF_8));
private final SequenceConfig MESSAGE_METADATA_SEQ_CONFIG = SequenceConfig.DEFAULT.
setAllowCreate(true).
@@ -351,66 +347,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
-
- /**
- * Fills the provided ByteBuffer with as much content for the specified message as possible, starting
- * from the specified offset in the message.
- *
- * @param messageId The message to get the data for.
- * @param offset The offset of the data within the message.
- * @param dst The destination of the content read back
- *
- * @return The number of bytes inserted into the destination
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException
- {
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- LongBinding.longToEntry(messageId, contentKeyEntry);
- DatabaseEntry value = new DatabaseEntry();
- ByteBufferBinding contentTupleBinding = ByteBufferBinding.getInstance();
-
-
- getLogger().debug("Message Id: {} Getting content body from offset: {}", messageId, offset);
-
-
- try
- {
-
- int written = 0;
- OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
- if (status == OperationStatus.SUCCESS)
- {
- QpidByteBuffer buffer = contentTupleBinding.entryToObject(value);
- int size = buffer.remaining();
- if (offset > size)
- {
- throw new RuntimeException("Offset " + offset + " is greater than message size " + size
- + " for message id " + messageId + "!");
-
- }
-
- written = size - offset;
- if(written > dst.remaining())
- {
- written = dst.remaining();
- }
- buffer = buffer.view(offset, written);
- buffer.get(dst);
- }
- return written;
- }
- catch (RuntimeException e)
- {
- throw getEnvironmentFacade().handleDatabaseException("Error getting AMQMessage with id "
- + messageId
- + " to database: "
- + e.getMessage(), e);
- }
- }
-
- Collection<QpidByteBuffer> getAllContent(long messageId) throws StoreException
+ QpidByteBuffer getAllContent(long messageId) throws StoreException
{
DatabaseEntry contentKeyEntry = new DatabaseEntry();
LongBinding.longToEntry(messageId, contentKeyEntry);
@@ -427,15 +364,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore
byte[] data = value.getData();
int offset = value.getOffset();
int length = value.getSize();
- Collection<QpidByteBuffer> buffers = QpidByteBuffer.allocateDirectCollection(length);
- for(QpidByteBuffer buf : buffers)
- {
- int bufSize = buf.remaining();
- buf.put(data, offset, bufSize);
- buf.flip();
- offset+=bufSize;
- }
- return buffers;
+ QpidByteBuffer buf = QpidByteBuffer.allocateDirect(length);
+ buf.put(data, offset, length);
+ buf.flip();
+ return buf;
}
else
{
@@ -534,25 +466,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*
* @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- private void addContent(final Transaction tx, long messageId,
- Collection<QpidByteBuffer> contentBody) throws StoreException
+ private void addContent(final Transaction tx, long messageId, QpidByteBuffer contentBody) throws StoreException
{
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- int size = 0;
-
- for(QpidByteBuffer buf : contentBody)
- {
- size += buf.remaining();
- }
- byte[] data = new byte[size];
- ByteBuffer dst = ByteBuffer.wrap(data);
- for(QpidByteBuffer buf : contentBody)
- {
- buf.copyTo(dst);
- }
+ byte[] data = new byte[contentBody.remaining()];
+ contentBody.copyTo(data);
value.setData(data);
try
{
@@ -937,7 +858,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private static class MessageDataRef<T extends StorableMessageMetaData>
{
private volatile T _metaData;
- private volatile Collection<QpidByteBuffer> _data;
+ private volatile QpidByteBuffer _data;
private volatile boolean _isHardRef;
private MessageDataRef(final T metaData, boolean isHardRef)
@@ -945,7 +866,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
this(metaData, null, isHardRef);
}
- private MessageDataRef(final T metaData, Collection<QpidByteBuffer> data, boolean isHardRef)
+ private MessageDataRef(final T metaData, QpidByteBuffer data, boolean isHardRef)
{
_metaData = metaData;
_data = data;
@@ -957,12 +878,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
return _metaData;
}
- public Collection<QpidByteBuffer> getData()
+ public QpidByteBuffer getData()
{
return _data;
}
- public void setData(final Collection<QpidByteBuffer> data)
+ public void setData(final QpidByteBuffer data)
{
_data = data;
}
@@ -997,11 +918,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
if(_data != null)
{
- for(QpidByteBuffer buf : _data)
- {
- bytesCleared += buf.remaining();
- buf.dispose();
- }
+ bytesCleared += _data.remaining();
+ _data.dispose();
_data = null;
}
return bytesCleared;
@@ -1058,20 +976,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore
@Override
public synchronized void addContent(QpidByteBuffer src)
{
- src = src.slice();
- Collection<QpidByteBuffer> data = _messageDataRef.getData();
- if(data == null)
+ try(QpidByteBuffer data = _messageDataRef.getData())
{
- _messageDataRef.setData(Collections.singleton(src));
- }
- else
- {
- List<QpidByteBuffer> newCollection = new ArrayList<>(data.size()+1);
- newCollection.addAll(data);
- newCollection.add(src);
- _messageDataRef.setData(Collections.unmodifiableCollection(newCollection));
+ if(data == null)
+ {
+ _messageDataRef.setData(src.slice());
+ }
+ else
+ {
+ _messageDataRef.setData(QpidByteBuffer.concatenate(Arrays.asList(data, src)));
+ }
}
-
}
@Override
@@ -1082,11 +997,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
/**
- * returns QBBs containing the content. The caller must not dispose of them because we keep a reference in _messageDataRef.
+ * returns QBB containing the content. The caller must not dispose of them because we keep a reference in _messageDataRef.
*/
- private Collection<QpidByteBuffer> getContentAsByteBuffer()
+ private QpidByteBuffer getContentAsByteBuffer()
{
- Collection<QpidByteBuffer> data = _messageDataRef == null ? Collections.<QpidByteBuffer>emptyList() : _messageDataRef.getData();
+ QpidByteBuffer data = _messageDataRef == null ? QpidByteBuffer.emptyQpidByteBuffer() : _messageDataRef.getData();
if(data == null)
{
if(stored())
@@ -1098,7 +1013,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
else
{
- data = Collections.emptyList();
+ data = QpidByteBuffer.emptyQpidByteBuffer();
}
}
return data;
@@ -1106,49 +1021,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
@Override
- public synchronized Collection<QpidByteBuffer> getContent(int offset, int length)
+ public synchronized QpidByteBuffer getContent(int offset, int length)
{
- Collection<QpidByteBuffer> bufs = getContentAsByteBuffer();
- Collection<QpidByteBuffer> content = new ArrayList<>(bufs.size());
- int pos = 0;
- for (QpidByteBuffer buf : bufs)
- {
- if(length > 0)
- {
- int bufRemaining = buf.remaining();
- if (pos + bufRemaining <= offset)
- {
- pos += bufRemaining;
- }
- else if (pos >= offset)
- {
- buf = buf.duplicate();
- if (bufRemaining <= length)
- {
- length -= bufRemaining;
- }
- else
- {
- buf.limit(length);
- length = 0;
- }
- content.add(buf);
- pos += buf.remaining();
-
- }
- else
- {
- int offsetInBuf = offset - pos;
- int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
- final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
- content.add(bufView);
- length -= limit;
- pos+=limit+offsetInBuf;
- }
- }
-
- }
- return content;
+ return getContentAsByteBuffer().view(offset, length);
}
@Override
@@ -1170,7 +1045,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _messageDataRef.getMetaData());
AbstractBDBMessageStore.this.addContent(txn, _messageId,
_messageDataRef.getData() == null
- ? Collections.<QpidByteBuffer>emptySet()
+ ? QpidByteBuffer.emptyQpidByteBuffer()
: _messageDataRef.getData());
_messageDataRef.setSoft();
}
@@ -1219,14 +1094,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
metaData.dispose();
}
- Collection<QpidByteBuffer> data = _messageDataRef.getData();
- if(data != null)
+ try (QpidByteBuffer data = _messageDataRef.getData())
{
- bytesCleared += getContentSize();
- _messageDataRef.setData(null);
- for(QpidByteBuffer buf : data)
+ if (data != null)
{
- buf.dispose();
+ bytesCleared += getContentSize();
+ _messageDataRef.setData(null);
}
}
_messageDataRef = null;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
index d6d09dc..3b10bfc 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
@@ -71,15 +71,16 @@ public class ByteBufferBinding extends TupleBinding<QpidByteBuffer>
@Override
public void objectToEntry(QpidByteBuffer data, final TupleOutput output)
{
- QpidByteBuffer dup = data.duplicate();
- byte[] copyBuf = COPY_BUFFER.get();
- while(dup.hasRemaining())
+ try (QpidByteBuffer dup = data.duplicate())
{
- int length = Math.min(COPY_BUFFER_SIZE, dup.remaining());
- dup.get(copyBuf,0,length);
- output.write(copyBuf,0,length);
+ byte[] copyBuf = COPY_BUFFER.get();
+ while (dup.hasRemaining())
+ {
+ int length = Math.min(COPY_BUFFER_SIZE, dup.remaining());
+ dup.get(copyBuf, 0, length);
+ output.write(copyBuf, 0, length);
+ }
}
- dup.dispose();
}
public ByteBuffer readByteBuffer(final TupleInput input, int length)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
index 0fd3e44..0b9fa17 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.store.berkeleydb.tuple;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
-import java.util.List;
import com.sleepycat.bind.EntryBinding;
import com.sleepycat.je.DatabaseEntry;
@@ -61,15 +60,10 @@ public class MessageMetaDataBinding implements EntryBinding<StorableMessageMetaD
final int metaDataType = stream.readByte() & 0xff;
MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(metaDataType);
- List<QpidByteBuffer> bufs = QpidByteBuffer.asQpidByteBuffers(stream);
-
- final StorableMessageMetaData metaData = type.createMetaData(bufs);
-
- for (final QpidByteBuffer buf : bufs)
+ try (QpidByteBuffer buf = QpidByteBuffer.asQpidByteBuffer(stream))
{
- buf.dispose();
+ return type.createMetaData(buf);
}
- return metaData;
}
catch (IOException | RuntimeException e)
{
@@ -83,12 +77,15 @@ public class MessageMetaDataBinding implements EntryBinding<StorableMessageMetaD
final int bodySize = 1 + metaData.getStorableSize();
byte[] underlying = new byte[4+bodySize];
underlying[4] = (byte) metaData.getType().ordinal();
- QpidByteBuffer buf = QpidByteBuffer.wrap(underlying);
- buf.putInt(bodySize ^ 0x80000000);
- buf.position(5);
- buf = buf.slice();
-
- metaData.writeToBuffer(buf);
+ try (QpidByteBuffer buf = QpidByteBuffer.wrap(underlying))
+ {
+ buf.putInt(bodySize ^ 0x80000000);
+ buf.position(5);
+ try (QpidByteBuffer bufSlice = buf.slice())
+ {
+ metaData.writeToBuffer(bufSlice);
+ }
+ }
entry.setData(underlying);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index da45856..91f698a 100644
--- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -24,36 +24,21 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10;
-import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
-import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
-import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8;
import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreTestCase;
-import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost;
-import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
-import org.apache.qpid.server.protocol.v0_10.transport.Header;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryPriority;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
import org.apache.qpid.server.util.FileUtils;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost;
/**
* Subclass of MessageStoreTestCase which runs the standard tests from the superclass against
@@ -78,164 +63,6 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
}
}
- /**
- * Tests that message metadata and content are successfully read back from a
- * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
- * verify their ability to co-exist within the store and be successful retrieved.
- */
- public void testBDBMessagePersistence() throws Exception
- {
- MessageStore bdbStore = getStore();
-
- // Create content ByteBuffers.
- // Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
- // Use a single chunk for the 0-10 message as per broker behaviour.
- String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
-
- QpidByteBuffer firstContentBytes_0_8 = QpidByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
- QpidByteBuffer secondContentBytes_0_8 = QpidByteBuffer.wrap(bodyText.substring(10).getBytes());
-
- QpidByteBuffer completeContentBody_0_10 = QpidByteBuffer.wrap(bodyText.getBytes());
- int bodySize = completeContentBody_0_10.limit();
-
- /*
- * Create and insert a 0-8 message (metadata and multi-chunk content)
- */
- MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
- BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
-
- ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
-
- MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8);
- MessageHandle<MessageMetaData> messageHandle_0_8 = bdbStore.addMessage(messageMetaData_0_8);
-
- long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime();
-
- messageHandle_0_8.addContent(firstContentBytes_0_8);
- messageHandle_0_8.addContent(secondContentBytes_0_8);
- final StoredMessage<MessageMetaData> storedMessage_0_8 = messageHandle_0_8.allContentAdded();
- long messageid_0_8 = storedMessage_0_8.getMessageNumber();
- ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_8).flushToStore();
-
- /*
- * Create and insert a 0-10 message (metadata and content)
- */
- MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
- DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
- Header header_0_10 = new Header(delProps_0_10, msgProps_0_10);
-
- MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED, header_0_10,
- Collections.singletonList(completeContentBody_0_10));
-
- MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10);
- MessageHandle<MessageMetaData_0_10> messageHandle_0_10 = bdbStore.addMessage(messageMetaData_0_10);
-
- long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime();
-
- messageHandle_0_10.addContent(completeContentBody_0_10);
- final StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = messageHandle_0_10.allContentAdded();
- long messageid_0_10 = storedMessage_0_10.getMessageNumber();
- ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_10).flushToStore();
-
- /*
- * reload the store only (read-only)
- */
- reopenStore();
-
- /*
- * Read back and validate the 0-8 message metadata and content
- */
- BDBMessageStore reopenedBdbStore = (BDBMessageStore) getStore();
- StorableMessageMetaData storeableMMD_0_8 = reopenedBdbStore.getMessageMetaData(messageid_0_8);
-
- assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal());
- assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData);
- MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8;
-
- assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime());
-
- MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo();
- assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange());
- assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate());
- assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory());
- assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey());
-
- ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody();
- assertEquals("ContentHeader ClassID has changed", chb_0_8.getClassId(), returnedHeaderBody_0_8.getClassId());
- assertEquals("ContentHeader weight has changed", chb_0_8.getWeight(), returnedHeaderBody_0_8.getWeight());
- assertEquals("ContentHeader bodySize has changed", chb_0_8.getBodySize(), returnedHeaderBody_0_8.getBodySize());
-
- BasicContentHeaderProperties returnedProperties_0_8 = returnedHeaderBody_0_8.getProperties();
- assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString());
- assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
-
- ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ;
- long recoveredCount_0_8 = reopenedBdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
- assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8);
- String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array());
- assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8);
-
- /*
- * Read back and validate the 0-10 message metadata and content
- */
- StorableMessageMetaData storeableMMD_0_10 = reopenedBdbStore.getMessageMetaData(messageid_0_10);
-
- assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal());
- assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10);
- MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10;
-
- assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime());
-
- DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties();
- assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10);
- assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate());
- assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey());
- assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange());
- assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration());
- assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority());
-
- MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties();
- assertNotNull("MessageProperties were not returned", returnedMsgProps);
- assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId()));
- assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength());
- assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType());
-
- ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ;
- long recoveredCount = reopenedBdbStore.getContent(messageid_0_10, 0, recoveredContent);
- assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount);
-
- String returnedPayloadString_0_10 = new String(recoveredContent.array());
- assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10);
-
- reopenedBdbStore.closeMessageStore();
- }
-
- private DeliveryProperties createDeliveryProperties_0_10()
- {
- DeliveryProperties delProps_0_10 = new DeliveryProperties();
-
- delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- delProps_0_10.setImmediate(true);
- delProps_0_10.setExchange("exchange12345");
- delProps_0_10.setRoutingKey("routingKey12345");
- delProps_0_10.setExpiration(5);
- delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE);
-
- return delProps_0_10;
- }
-
- private MessageProperties createMessageProperties_0_10(int bodySize)
- {
- MessageProperties msgProps_0_10 = new MessageProperties();
- msgProps_0_10.setContentLength(bodySize);
- msgProps_0_10.setCorrelationId("qwerty".getBytes());
- msgProps_0_10.setContentType("text/html");
-
- return msgProps_0_10;
- }
-
-
private MessagePublishInfo createPublishInfoBody_0_8()
{
return new MessagePublishInfo(new AMQShortString("exchange12345"), false, true,
@@ -257,60 +84,6 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
return props;
}
- public void testGetContentWithOffset() throws Exception
- {
- BDBMessageStore bdbStore = (BDBMessageStore) getStore();
- StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore);
- long messageid_0_8 = storedMessage_0_8.getMessageNumber();
-
- // normal case: offset is 0
- ByteBuffer dst = ByteBuffer.allocate(10);
- int length = bdbStore.getContent(messageid_0_8, 0, dst);
- assertEquals("Unexpected length", CONTENT_BYTES.length, length);
- byte[] array = dst.array();
- assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array));
-
- // offset is in the middle
- dst = ByteBuffer.allocate(10);
- length = bdbStore.getContent(messageid_0_8, 5, dst);
- assertEquals("Unexpected length", 5, length);
- array = dst.array();
- byte[] expected = new byte[10];
- System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5);
- assertTrue("Unexpected content", Arrays.equals(expected, array));
-
- // offset beyond the content length
- dst = ByteBuffer.allocate(10);
- try
- {
- bdbStore.getContent(messageid_0_8, 15, dst);
- fail("Should fail for the offset greater than message size");
- }
- catch (RuntimeException e)
- {
- assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id "
- + messageid_0_8 + "!", e.getCause().getMessage());
- }
-
- // buffer is smaller then message size
- dst = ByteBuffer.allocate(5);
- length = bdbStore.getContent(messageid_0_8, 0, dst);
- assertEquals("Unexpected length", 5, length);
- array = dst.array();
- expected = new byte[5];
- System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5);
- assertTrue("Unexpected content", Arrays.equals(expected, array));
-
- // buffer is smaller then message size, offset is not 0
- dst = ByteBuffer.allocate(5);
- length = bdbStore.getContent(messageid_0_8, 2, dst);
- assertEquals("Unexpected length", 5, length);
- array = dst.array();
- expected = new byte[5];
- System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5);
- assertTrue("Unexpected content", Arrays.equals(expected, array));
- }
-
/**
* Tests that messages which are added to the store and then removed using the
* public MessageStore interfaces are actually removed from the store by then
@@ -338,11 +111,15 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
// pass since exception expected
}
- //expecting no content, allocate a 1 byte
- ByteBuffer dst = ByteBuffer.allocate(1);
-
- assertEquals("Retrieved content when none was expected",
- 0, bdbStore.getContent(messageid_0_8, 0, dst));
+ try
+ {
+ bdbStore.getAllContent(messageid_0_8);
+ fail("Expected exception not thrown");
+ }
+ catch (StoreException se)
+ {
+ // PASS
+ }
}
private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java
index 2d0d668..27ad899 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java
@@ -28,6 +28,10 @@ class NonPooledByteBufferRef implements ByteBufferRef
NonPooledByteBufferRef(final ByteBuffer buffer)
{
+ if (buffer == null)
+ {
+ throw new NullPointerException();
+ }
_buffer = buffer;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
index 01ad373..0126e20 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.bytebuffer;
import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
@@ -43,6 +45,10 @@ class PooledByteBufferRef implements ByteBufferRef
PooledByteBufferRef(final ByteBuffer buffer)
{
+ if (buffer == null)
+ {
+ throw new NullPointerException();
+ }
_buffer = buffer;
ACTIVE_BUFFERS.incrementAndGet();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org