You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/06/20 13:20:50 UTC
[1/2] qpid-broker-j git commit: QPID-7791: Recover metadata into
direct memory
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 973c1f8db -> 85abb468e
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UIntTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UIntTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UIntTypeConstructor.java
index 53ca58b..700817d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UIntTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UIntTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ULongTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ULongTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ULongTypeConstructor.java
index 18796ca..dbd606f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ULongTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ULongTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UShortTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UShortTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UShortTypeConstructor.java
index ddc6e0b..b22790b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UShortTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UShortTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UUIDTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UUIDTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UUIDTypeConstructor.java
index c71ebce..4bff5c0 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UUIDTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UUIDTypeConstructor.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.UUID;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
index 9cec678..7a47703 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
index 8ba45c8..f55c49f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java
index 1fd6600..423d7d5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java
@@ -26,7 +26,7 @@ import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeConstructor;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AmqpValueSectionConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AmqpValueSectionConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AmqpValueSectionConstructor.java
index a29144c..da51ae3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AmqpValueSectionConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AmqpValueSectionConstructor.java
@@ -29,7 +29,7 @@ import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructor;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.codec.TypeConstructor;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DataSectionConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DataSectionConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DataSectionConstructor.java
index 23ba6a9..9f40612 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DataSectionConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DataSectionConstructor.java
@@ -29,7 +29,7 @@ import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructor;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.codec.TypeConstructor;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DescribedListSectionConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DescribedListSectionConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DescribedListSectionConstructor.java
index 605c6d9..5e11400 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DescribedListSectionConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DescribedListSectionConstructor.java
@@ -28,7 +28,7 @@ import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructor;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.codec.TypeConstructor;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DescribedMapSectionConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DescribedMapSectionConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DescribedMapSectionConstructor.java
index 6ca90c3..3a9a622 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DescribedMapSectionConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DescribedMapSectionConstructor.java
@@ -29,7 +29,7 @@ import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructor;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.TypeConstructor;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index f831399..0b1a2cc 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.jdbc;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -946,16 +947,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
if (rs.next())
{
- byte[] dataAsBytes = getBlobAsBytes(rs, 1);
- QpidByteBuffer buf = QpidByteBuffer.wrap(dataAsBytes);
- buf.position(1);
- buf = buf.slice();
- int typeOrdinal = dataAsBytes[0] & 0xff;
-
- MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(typeOrdinal);
- StorableMessageMetaData metaData = type.createMetaData(buf);
- buf.dispose();
- return metaData;
+ return getStorableMessageMetaData(messageId, getBlobAsBytes(rs, 1));
}
else
{
@@ -966,6 +958,27 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
}
+ private StorableMessageMetaData getStorableMessageMetaData(final long messageId, final byte[] blobAsBytes)
+ throws SQLException
+ {
+ try(InputStream stream = new ByteArrayInputStream(blobAsBytes))
+ {
+ int typeOrdinal = stream.read() & 0xff;
+ MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(typeOrdinal);
+ List<QpidByteBuffer> bufs = QpidByteBuffer.asQpidByteBuffers(stream);
+ StorableMessageMetaData metaData = type.createMetaData(bufs);
+ for (final QpidByteBuffer buf : bufs)
+ {
+ buf.dispose();
+ }
+ return metaData;
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Failed to stream metadata for message with id " + messageId, e);
+ }
+ }
+
protected abstract byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException;
private void addContent(final Connection conn, long messageId,
@@ -1648,14 +1661,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
if (rs.next())
{
byte[] dataAsBytes = getBlobAsBytes(rs, 2);
- QpidByteBuffer buf = QpidByteBuffer.wrap(dataAsBytes);
- buf.position(1);
- buf = buf.slice();
- MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
- StorableMessageMetaData metaData = type.createMetaData(buf);
- buf.dispose();
+ StorableMessageMetaData metaData = getStorableMessageMetaData(messageId, dataAsBytes);
message = createStoredJDBCMessage(messageId, metaData, true);
-
}
else
{
@@ -1694,13 +1701,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
long messageId = rs.getLong(1);
byte[] dataAsBytes = getBlobAsBytes(rs, 2);
- QpidByteBuffer buf = QpidByteBuffer.wrap(dataAsBytes);
- buf.position(1);
- buf = buf.slice();
- MessageMetaDataType<?> type =
- MessageMetaDataTypeRegistry.fromOrdinal(((int) dataAsBytes[0]) & 0xff);
- StorableMessageMetaData metaData = type.createMetaData(buf);
- buf.dispose();
+ StorableMessageMetaData metaData = getStorableMessageMetaData(messageId, dataAsBytes);
StoredJDBCMessage message = createStoredJDBCMessage(messageId, metaData, true);
if (!handler.handle(message))
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index a3f9271..3220cef 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -32,6 +32,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreTestCase;
@@ -41,6 +42,18 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
{
public static final String TEST_TABLE_PREFIX = "TEST_TABLE_PREFIX_";
private String _connectionURL;
+ private static final int BUFFER_SIZE = 10;
+ private static final int POOL_SIZE = 20;
+ private static final double SPARSITY_FRACTION = 1.0;
+
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ QpidByteBuffer.deinitialisePool();
+ QpidByteBuffer.initialisePool(BUFFER_SIZE, POOL_SIZE, SPARSITY_FRACTION);
+ }
@Override
public void tearDown() throws Exception
@@ -54,6 +67,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
}
finally
{
+ QpidByteBuffer.deinitialisePool();
super.tearDown();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-broker-j git commit: QPID-7791: Recover metadata into
direct memory
Posted by kw...@apache.org.
QPID-7791: Recover metadata into direct memory
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/85abb468
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/85abb468
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/85abb468
Branch: refs/heads/master
Commit: 85abb468e02de9a0e38d23e056997da4730ed271
Parents: 973c1f8
Author: Keith Wall <kw...@apache.org>
Authored: Tue Jun 20 10:02:08 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Jun 20 13:38:25 2017 +0100
----------------------------------------------------------------------
.../tuple/MessageMetaDataBinding.java | 41 ++-
.../store/berkeleydb/BDBMessageStoreTest.java | 2 +-
.../qpid/server/bytebuffer/QpidByteBuffer.java | 16 ++
.../server/bytebuffer/QpidByteBufferUtils.java | 271 ++++++++++++++++++
.../internal/InternalMessageMetaDataType.java | 8 +-
.../qpid/server/plugin/MessageMetaDataType.java | 6 +-
.../v1/MessageStoreSerializer_v1.java | 3 +-
.../server/bytebuffer/QpidByteBufferTest.java | 16 ++
.../qpid/server/store/MessageStoreTestCase.java | 18 +-
.../store/TestMessageMetaDataFactory.java | 9 +-
.../server/store/TestMessageMetaDataType.java | 7 +-
.../v0_10/MessageMetaDataType_0_10.java | 8 +-
.../protocol/v0_10/MessageMetaData_0_10.java | 9 +-
.../server/protocol/v0_8/MessageMetaData.java | 38 ++-
.../protocol/v0_8/MessageMetaDataType_0_8.java | 8 +-
.../v0_8/MessageMetaDataFactoryTest.java | 135 +++++++++
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 2 +-
.../protocol/v1_0/MessageMetaDataType_1_0.java | 8 +-
.../protocol/v1_0/MessageMetaData_1_0.java | 27 +-
.../qpid/server/protocol/v1_0/Message_1_0.java | 2 +-
.../qpid/server/protocol/v1_0/Session_1_0.java | 2 +-
.../v1_0/codec/ArrayTypeConstructor.java | 1 +
.../v1_0/codec/BinaryTypeConstructor.java | 1 +
.../protocol/v1_0/codec/BooleanConstructor.java | 1 +
.../v1_0/codec/ByteTypeConstructor.java | 1 +
.../v1_0/codec/CharTypeConstructor.java | 3 +-
.../v1_0/codec/CompoundTypeConstructor.java | 1 +
.../protocol/v1_0/codec/DecimalConstructor.java | 1 +
.../v1_0/codec/DoubleTypeConstructor.java | 1 +
.../v1_0/codec/FloatTypeConstructor.java | 1 +
.../server/protocol/v1_0/codec/FrameWriter.java | 1 +
.../protocol/v1_0/codec/IntTypeConstructor.java | 1 +
.../v1_0/codec/LongTypeConstructor.java | 1 +
.../v1_0/codec/QpidByteBufferUtils.java | 274 -------------------
.../v1_0/codec/ShortTypeConstructor.java | 1 +
.../v1_0/codec/SmallIntConstructor.java | 1 +
.../v1_0/codec/SmallLongConstructor.java | 1 +
.../v1_0/codec/SmallUIntConstructor.java | 1 +
.../v1_0/codec/SmallULongConstructor.java | 1 +
.../v1_0/codec/StringTypeConstructor.java | 1 +
.../v1_0/codec/SymbolTypeConstructor.java | 1 +
.../v1_0/codec/TimestampTypeConstructor.java | 1 +
.../v1_0/codec/UByteTypeConstructor.java | 1 +
.../v1_0/codec/UIntTypeConstructor.java | 1 +
.../v1_0/codec/ULongTypeConstructor.java | 1 +
.../v1_0/codec/UShortTypeConstructor.java | 1 +
.../v1_0/codec/UUIDTypeConstructor.java | 1 +
.../protocol/v1_0/codec/ValueHandler.java | 1 +
.../v1_0/messaging/SectionDecoderImpl.java | 2 +-
.../v1_0/type/messaging/AbstractSection.java | 2 +-
.../codec/AmqpValueSectionConstructor.java | 2 +-
.../messaging/codec/DataSectionConstructor.java | 2 +-
.../codec/DescribedListSectionConstructor.java | 2 +-
.../codec/DescribedMapSectionConstructor.java | 2 +-
.../store/jdbc/AbstractJDBCMessageStore.java | 49 ++--
.../server/store/jdbc/JDBCMessageStoreTest.java | 14 +
56 files changed, 658 insertions(+), 355 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
index 81806d5..798b097 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
@@ -20,23 +20,25 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuple;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.List;
+
import com.sleepycat.bind.EntryBinding;
import com.sleepycat.je.DatabaseEntry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.MessageMetaDataTypeRegistry;
import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
/**
* Handles the mapping to and from message meta data
*/
public class MessageMetaDataBinding implements EntryBinding<StorableMessageMetaData>
{
- private static final Logger LOGGER = LoggerFactory.getLogger(MessageMetaDataBinding.class);
private static final MessageMetaDataBinding INSTANCE = new MessageMetaDataBinding();
@@ -51,15 +53,28 @@ public class MessageMetaDataBinding implements EntryBinding<StorableMessageMetaD
@Override
public StorableMessageMetaData entryToObject(DatabaseEntry entry)
{
- QpidByteBuffer buf = QpidByteBuffer.wrap(entry.getData(), entry.getOffset(), entry.getSize());
- final int bodySize = buf.getInt() ^ 0x80000000;
- final int metaDataType = buf.get() & 0xff;
- buf = buf.slice();
- buf.limit(bodySize-1);
- MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(metaDataType);
- final StorableMessageMetaData metaData = type.createMetaData(buf);
- buf.dispose();
- return metaData;
+ try(DataInputStream stream = new DataInputStream(new ByteArrayInputStream(entry.getData(),
+ entry.getOffset(),
+ entry.getSize())))
+ {
+ final int bodySize = stream.readInt() ^ 0x80000000;
+ final int metaDataType = stream.readByte() & 0xff;
+ MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(metaDataType);
+
+ List<QpidByteBuffer> bufs = QpidByteBuffer.asQpidByteBuffers(stream);
+
+ final StorableMessageMetaData metaData = type.createMetaData(bufs);
+
+ for (final QpidByteBuffer buf : bufs)
+ {
+ buf.dispose();
+ }
+ return metaData;
+ }
+ catch (IOException e)
+ {
+ throw new StoreException(String.format("Unable to convert entry %s to metadata", entry));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index aa1c45d..da45856 100644
--- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -66,7 +66,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
private String _storeLocation;
@Override
- protected void tearDown() throws Exception
+ public void tearDown() throws Exception
{
try
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
index b335adc..346e203 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
@@ -676,6 +676,22 @@ public class QpidByteBuffer implements AutoCloseable
}
}
+ public static List<QpidByteBuffer> asQpidByteBuffers(final InputStream stream) throws IOException
+ {
+ List<QpidByteBuffer> bufs = new ArrayList<>();
+ byte[] transferBuf = new byte[QpidByteBuffer.getPooledBufferSize()];
+ int read = stream.read(transferBuf);
+ while(read > 0)
+ {
+ QpidByteBuffer chunk = QpidByteBuffer.allocateDirect(read);
+ chunk.put(transferBuf, 0, read);
+ chunk.flip();
+ bufs.add(chunk);
+ read = stream.read(transferBuf);
+ }
+ return bufs;
+ }
+
public static SSLEngineResult encryptSSL(SSLEngine engine,
final Collection<QpidByteBuffer> buffers,
QpidByteBuffer dest) throws SSLException
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
new file mode 100644
index 0000000..57552a8
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.bytebuffer;
+
+import java.nio.BufferUnderflowException;
+import java.util.List;
+
+public class QpidByteBufferUtils
+{
+ public static boolean hasRemaining(List<QpidByteBuffer> in)
+ {
+ if (in.isEmpty())
+ {
+ return false;
+ }
+ for (int i = 0; i < in.size(); i++)
+ {
+ if (in.get(i).hasRemaining())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static long remaining(List<QpidByteBuffer> in)
+ {
+ long remaining = 0L;
+ for (int i = 0; i < in.size(); i++)
+ {
+ remaining += in.get(i).remaining();
+ }
+ return remaining;
+ }
+
+ public static byte get(List<QpidByteBuffer> in)
+ {
+ for (int i = 0; i < in.size(); i++)
+ {
+ final QpidByteBuffer buffer = in.get(i);
+ if (buffer.hasRemaining())
+ {
+ return buffer.get();
+ }
+ }
+ throw new BufferUnderflowException();
+ }
+
+ public static boolean hasRemaining(final List<QpidByteBuffer> in, int len)
+ {
+ for (int i = 0; i < in.size(); i++)
+ {
+ final QpidByteBuffer buffer = in.get(i);
+ int remaining = buffer.remaining();
+ if (remaining >= len)
+ {
+ return true;
+ }
+ len -= remaining;
+ }
+
+ return false;
+ }
+
+ public static long getLong(final List<QpidByteBuffer> in)
+ {
+ boolean bytewise = false;
+ int consumed = 0;
+ long result = 0L;
+ for (int i = 0; i < in.size(); i++)
+ {
+ final QpidByteBuffer buffer = in.get(i);
+ int remaining = buffer.remaining();
+ if (bytewise)
+ {
+ while (buffer.hasRemaining() && consumed < 8)
+ {
+ result <<= 1;
+ result |= (0xFF & buffer.get());
+ consumed++;
+ }
+ if (consumed == 8)
+ {
+ return result;
+ }
+ }
+ else
+ {
+ if (remaining >= 8)
+ {
+ return buffer.getLong();
+ }
+ else if (remaining != 0)
+ {
+ bytewise = true;
+ while (buffer.hasRemaining())
+ {
+ result <<= 1;
+ result |= (0xFF & buffer.get());
+ consumed++;
+ }
+ }
+ }
+ }
+ throw new BufferUnderflowException();
+ }
+
+ public static int getInt(final List<QpidByteBuffer> in)
+ {
+ boolean bytewise = false;
+ int consumed = 0;
+ int result = 0;
+ for (int i = 0; i < in.size(); i++)
+ {
+ final QpidByteBuffer buffer = in.get(i);
+ int remaining = buffer.remaining();
+ if (bytewise)
+ {
+ while (buffer.hasRemaining() && consumed < 4)
+ {
+ result <<= 1;
+ result |= (0xFF & buffer.get());
+ consumed++;
+ }
+ if (consumed == 4)
+ {
+ return result;
+ }
+ }
+ else
+ {
+ if (remaining >= 4)
+ {
+ return buffer.getInt();
+ }
+ else if (remaining != 0)
+ {
+ bytewise = true;
+ while (buffer.hasRemaining())
+ {
+ result <<= 1;
+ result |= (0xFF & buffer.get());
+ consumed++;
+ }
+ }
+ }
+ }
+ throw new BufferUnderflowException();
+ }
+
+ public static float getFloat(final List<QpidByteBuffer> in)
+ {
+ return Float.intBitsToFloat(getInt(in));
+ }
+
+ public static double getDouble(final List<QpidByteBuffer> in)
+ {
+ return Double.longBitsToDouble(getLong(in));
+ }
+
+ public static Short getShort(final List<QpidByteBuffer> in)
+ {
+ boolean bytewise = false;
+ int consumed = 0;
+ short result = 0;
+ for (int i = 0; i < in.size(); i++)
+ {
+ final QpidByteBuffer buffer = in.get(i);
+ int remaining = buffer.remaining();
+ if (bytewise)
+ {
+ while (buffer.hasRemaining() && consumed < 2)
+ {
+ result <<= 1;
+ result |= (0xFF & buffer.get());
+ consumed++;
+ }
+ if (consumed == 2)
+ {
+ return result;
+ }
+ }
+ else
+ {
+ if (remaining >= 2)
+ {
+ return buffer.getShort();
+ }
+ else if (remaining != 0)
+ {
+ bytewise = true;
+ while (buffer.hasRemaining())
+ {
+ result <<= 1;
+ result |= (0xFF & buffer.get());
+ consumed++;
+ }
+ }
+ }
+ }
+ throw new BufferUnderflowException();
+ }
+
+ public static int get(final List<QpidByteBuffer> in, final byte[] data)
+ {
+ int copied = 0;
+ int i = 0;
+ while (copied < data.length && i < in.size())
+ {
+ QpidByteBuffer buf = in.get(i);
+ if (buf.hasRemaining())
+ {
+ int remaining = buf.remaining();
+ if (remaining >= data.length - copied)
+ {
+ buf.get(data, copied, data.length - copied);
+ return data.length;
+ }
+ else
+ {
+ buf.get(data, copied, remaining);
+ copied += remaining;
+ }
+ }
+ i++;
+ }
+ return copied;
+ }
+
+ public static void skip(final List<QpidByteBuffer> in, int length)
+ {
+ int skipped = 0;
+ int i = 0;
+ while (skipped < length && i < in.size())
+ {
+ QpidByteBuffer buf = in.get(i);
+ if (buf.hasRemaining())
+ {
+ int remaining = buf.remaining();
+ if (remaining >= length - skipped)
+ {
+ buf.position(buf.position() + length - skipped);
+ return;
+ }
+ else
+ {
+ buf.position(buf.position() + remaining);
+ skipped += remaining;
+ }
+ }
+ i++;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
index 92eaaaa..507837c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
@@ -22,8 +22,10 @@ package org.apache.qpid.server.message.internal;
import java.io.IOException;
import java.io.ObjectInputStream;
+import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferInputStream;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.plugin.PluggableService;
@@ -43,11 +45,9 @@ public class InternalMessageMetaDataType implements MessageMetaDataType<Internal
}
@Override
- public InternalMessageMetaData createMetaData(final QpidByteBuffer buf)
+ public InternalMessageMetaData createMetaData(final List<QpidByteBuffer> bufs)
{
-
-
- try(ObjectInputStream is = new ObjectInputStream(buf.asInputStream()))
+ try(ObjectInputStream is = new ObjectInputStream(new QpidByteBufferInputStream(bufs)))
{
int contentSize = is.readInt();
InternalMessageHeader header = (InternalMessageHeader) is.readObject();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java b/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
index b65e4b9..4821914 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.plugin;
+import java.util.List;
+
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -30,12 +32,12 @@ public interface MessageMetaDataType<M extends StorableMessageMetaData> extends
interface Factory<M extends StorableMessageMetaData>
{
- M createMetaData(QpidByteBuffer buf);
+ M createMetaData(List<QpidByteBuffer> buf);
}
int ordinal();
- M createMetaData(QpidByteBuffer buf);
+ M createMetaData(List<QpidByteBuffer> bufs);
ServerMessage<M> createMessage(StoredMessage<M> msg);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
index b064247..9f0098a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
@@ -24,6 +24,7 @@ package org.apache.qpid.server.store.serializer.v1;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -307,7 +308,7 @@ public class MessageStoreSerializer_v1 implements MessageStoreSerializer
MessageMetaDataTypeRegistry.fromOrdinal(metaData[0] & 0xff);
QpidByteBuffer buf = QpidByteBuffer.wrap(metaData, 1, metaData.length - 1);
final StorableMessageMetaData storableMessageMetaData =
- metaDataType.createMetaData(buf);
+ metaDataType.createMetaData(Collections.singletonList(buf));
buf.dispose();
final MessageHandle<StorableMessageMetaData> handle =
store.addMessage(storableMessageMetaData);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java b/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
index 139a4e9..d98c4f1 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
@@ -20,6 +20,7 @@
package org.apache.qpid.server.bytebuffer;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -33,6 +34,8 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
import com.google.common.io.ByteStreams;
import org.junit.Assert;
@@ -899,6 +902,19 @@ public class QpidByteBufferTest extends QpidTestCase
assertTrue("Buffer should be sparse", grandChild.isSparse());
}
+ public void testAsQpidByteBuffers() throws IOException
+ {
+ byte[] dataForTwoBufs = "01234567890".getBytes(StandardCharsets.US_ASCII);
+ Collection<QpidByteBuffer> qpidByteBuffers = QpidByteBuffer.asQpidByteBuffers(new ByteArrayInputStream(dataForTwoBufs));
+ assertEquals("Unexpected number of bufs", 2, qpidByteBuffers.size());
+ Iterator<QpidByteBuffer> itr = qpidByteBuffers.iterator();
+ assertEquals("Unexpected remaining in first buf", 10, itr.next().remaining());
+ assertEquals("Unexpected remaining in second buf", 1, itr.next().remaining());
+
+ Collection<QpidByteBuffer> bufsForEmptyBytes = QpidByteBuffer.asQpidByteBuffers(new ByteArrayInputStream(new byte[]{}));
+ assertEquals("Unexpected number of bufs for empty buffer", 0, bufsForEmptyBytes.size());
+ }
+
private void doDeflateInflate(byte[] input,
Collection<QpidByteBuffer> inputBufs,
boolean direct) throws IOException
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 82c03c7..ee8def1 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.Description;
import org.mockito.ArgumentMatcher;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -52,7 +53,12 @@ public abstract class MessageStoreTestCase extends QpidTestCase
private MessageStore _store;
private ConfiguredObject<?> _parent;
private MessageStore.MessageStoreReader _storeReader;
+ private static final int BUFFER_SIZE = 10;
+ private static final int POOL_SIZE = 20;
+ private static final double SPARSITY_FRACTION = 1.0;
+
+ @Override
public void setUp() throws Exception
{
super.setUp();
@@ -63,6 +69,16 @@ public abstract class MessageStoreTestCase extends QpidTestCase
_store.openMessageStore(_parent);
_storeReader = _store.newMessageStoreReader();
+
+ QpidByteBuffer.deinitialisePool();
+ QpidByteBuffer.initialisePool(BUFFER_SIZE, POOL_SIZE, SPARSITY_FRACTION);
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ QpidByteBuffer.deinitialisePool();
+ super.tearDown();
}
protected abstract VirtualHost createVirtualHost();
@@ -383,7 +399,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
StoredMessage<?> retrievedMessage = retrievedMessageRef.get();
assertNotNull("Message was not found", retrievedMessageRef);
- assertEquals("Unexpected retreived message", message.getMessageNumber(), retrievedMessage.getMessageNumber());
+ assertEquals("Unexpected retrieved message", message.getMessageNumber(), retrievedMessage.getMessageNumber());
retrievedMessage.remove();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
index 89beed5..381718d 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
@@ -20,15 +20,18 @@
*/
package org.apache.qpid.server.store;
+import java.util.List;
+
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.plugin.MessageMetaDataType;
public class TestMessageMetaDataFactory implements MessageMetaDataType.Factory<TestMessageMetaData>
{
- public TestMessageMetaData createMetaData(QpidByteBuffer buf)
+ public TestMessageMetaData createMetaData(List<QpidByteBuffer> bufs)
{
- long id = buf.getLong();
- int size = buf.getInt();
+ long id = QpidByteBufferUtils.getLong(bufs);
+ int size = QpidByteBufferUtils.getInt(bufs);
return new TestMessageMetaData(id, size);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 24fa263..52d6e0f 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import java.util.Collection;
+import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
@@ -41,9 +42,9 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
}
@Override
- public TestMessageMetaData createMetaData(QpidByteBuffer buf)
+ public TestMessageMetaData createMetaData(List<QpidByteBuffer> bufs)
{
- return TestMessageMetaData.FACTORY.createMetaData(buf);
+ return TestMessageMetaData.FACTORY.createMetaData(bufs);
}
@Override
@@ -52,11 +53,13 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
return new TestServerMessage(msg);
}
+ @Override
public int hashCode()
{
return ordinal();
}
+ @Override
public boolean equals(Object o)
{
return o != null && o.getClass() == getClass();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
index 1bfbb2c..2dcb5f5 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.util.List;
+
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageMetaDataType;
@@ -40,9 +42,9 @@ public class MessageMetaDataType_0_10 implements MessageMetaDataType<MessageMeta
}
@Override
- public MessageMetaData_0_10 createMetaData(QpidByteBuffer buf)
+ public MessageMetaData_0_10 createMetaData(List<QpidByteBuffer> bufs)
{
- return MessageMetaData_0_10.FACTORY.createMetaData(buf);
+ return MessageMetaData_0_10.FACTORY.createMetaData(bufs);
}
@Override
@@ -51,11 +53,13 @@ public class MessageMetaDataType_0_10 implements MessageMetaDataType<MessageMeta
return new MessageTransferMessage(msg, null);
}
+ @Override
public int hashCode()
{
return ordinal();
}
+ @Override
public boolean equals(Object o)
{
return o != null && o.getClass() == getClass();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index 065cc0d..70bcdeb 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -21,20 +21,19 @@
package org.apache.qpid.server.protocol.v0_10;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.protocol.v0_10.transport.EncoderUtils;
-import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.EncoderUtils;
import org.apache.qpid.server.protocol.v0_10.transport.Header;
import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
import org.apache.qpid.server.protocol.v0_10.transport.Struct;
+import org.apache.qpid.server.store.StorableMessageMetaData;
public class MessageMetaData_0_10 implements StorableMessageMetaData
{
@@ -246,9 +245,9 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10>
{
- public MessageMetaData_0_10 createMetaData(QpidByteBuffer buf)
+ public MessageMetaData_0_10 createMetaData(List<QpidByteBuffer> buf)
{
- ServerDecoder decoder = new ServerDecoder(Collections.singletonList(buf));
+ ServerDecoder decoder = new ServerDecoder(buf);
long arrivalTime = decoder.readInt64();
int bodySize = decoder.readInt32();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 108f52c..1c14a70 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -21,9 +21,11 @@
package org.apache.qpid.server.protocol.v0_8;
import java.util.Collection;
+import java.util.List;
import java.util.Set;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v0_8.transport.AMQProtocolVersionException;
import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
@@ -149,12 +151,37 @@ public class MessageMetaData implements StorableMessageMetaData
private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData>
{
-
- public MessageMetaData createMetaData(QpidByteBuffer buf)
+ @Override
+ public MessageMetaData createMetaData(List<QpidByteBuffer> bufs)
{
try
{
- int size = buf.getInt();
+ final int size = QpidByteBufferUtils.getInt(bufs);
+
+ final QpidByteBuffer buf;
+ final boolean disposalRequired;
+ if (bufs.size() == 1)
+ {
+ buf = bufs.get(0);
+ disposalRequired = false;
+ }
+ else
+ {
+ // This should seldom happen. For AMQP 0-8..0-91 the content header body must
+ // fit within one frame. If we get here we are either recovering after a reduction
+ // in framesize or it so happens that the content header and the size/exchange/routingkey
+ // just overfills one frame.
+ int totalRemaining = (int) QpidByteBufferUtils.remaining(bufs);
+
+ buf = QpidByteBuffer.allocateDirect(totalRemaining);
+ disposalRequired = true;
+ for (final QpidByteBuffer qpidByteBuffer : bufs)
+ {
+ buf.put(qpidByteBuffer);
+ }
+ buf.flip();
+ }
+
ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, size);
final AMQShortString exchange = AMQShortString.readAMQShortString(buf);
final AMQShortString routingKey = AMQShortString.readAMQShortString(buf);
@@ -168,6 +195,11 @@ public class MessageMetaData implements StorableMessageMetaData
(flags & MANDATORY_FLAG) != 0,
routingKey);
+ if (disposalRequired)
+ {
+ buf.dispose();
+ }
+
return new MessageMetaData(publishBody, chb, arrivalTime);
}
catch (AMQFrameDecodingException | AMQProtocolVersionException e)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java
index 76f4263..4218317 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.List;
+
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageMetaDataType;
@@ -40,9 +42,9 @@ public class MessageMetaDataType_0_8 implements MessageMetaDataType<MessageMetaD
}
@Override
- public MessageMetaData createMetaData(QpidByteBuffer buf)
+ public MessageMetaData createMetaData(List<QpidByteBuffer> bufs)
{
- return MessageMetaData.FACTORY.createMetaData(buf);
+ return MessageMetaData.FACTORY.createMetaData(bufs);
}
@Override
@@ -51,11 +53,13 @@ public class MessageMetaDataType_0_8 implements MessageMetaDataType<MessageMetaD
return new AMQMessage(msg);
}
+ @Override
public int hashCode()
{
return ordinal();
}
+ @Override
public boolean equals(Object o)
{
return o != null && o.getClass() == getClass();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataFactoryTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataFactoryTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataFactoryTest.java
new file mode 100644
index 0000000..5d36778
--- /dev/null
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataFactoryTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v0_8;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferInputStream;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class MessageMetaDataFactoryTest extends QpidTestCase
+{
+ private static final String CONTENT_TYPE = "content/type";
+ private final long _arrivalTime = System.currentTimeMillis();
+ private final AMQShortString _routingKey = AMQShortString.valueOf("routingkey");
+ private final AMQShortString _exchange = AMQShortString.valueOf("exch");
+ private MessageMetaData _mmd;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _mmd = createTestMessageMetaData();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ if (_mmd != null)
+ {
+ _mmd.dispose();
+ }
+ }
+
+ public void testUnmarshalFromSingleBuffer() throws Exception
+ {
+ try(QpidByteBuffer qpidByteBuffer = QpidByteBuffer.allocateDirect(_mmd.getStorableSize()))
+ {
+ _mmd.writeToBuffer(qpidByteBuffer);
+ qpidByteBuffer.flip();
+
+ MessageMetaData recreated = MessageMetaData.FACTORY.createMetaData(Collections.singletonList(qpidByteBuffer));
+
+ assertEquals("Unexpected arrival time", _arrivalTime, recreated.getArrivalTime());
+ assertEquals("Unexpected routing key", _routingKey, recreated.getMessagePublishInfo().getRoutingKey());
+ assertEquals("Unexpected content type", CONTENT_TYPE, recreated.getContentHeaderBody().getProperties()
+ .getContentTypeAsString());
+ recreated.dispose();
+ }
+ }
+
+ public void testUnmarshalFromMultipleBuffers() throws Exception
+ {
+ List<QpidByteBuffer> bufs = Collections.emptyList();
+ try (QpidByteBuffer qpidByteBuffer = QpidByteBuffer.allocateDirect(_mmd.getStorableSize()))
+ {
+ _mmd.writeToBuffer(qpidByteBuffer);
+ qpidByteBuffer.flip();
+
+ bufs = splitIntoSmallerBuffers(qpidByteBuffer);
+
+ final MessageMetaData recreated = MessageMetaData.FACTORY.createMetaData(bufs);
+ assertEquals("Unexpected arrival time", _arrivalTime, recreated.getArrivalTime());
+ assertEquals("Unexpected routing key", _routingKey, recreated.getMessagePublishInfo().getRoutingKey());
+ assertEquals("Unexpected content type", CONTENT_TYPE, recreated.getContentHeaderBody().getProperties().getContentTypeAsString());
+ recreated.dispose();
+ }
+ finally
+ {
+ for (final QpidByteBuffer buf : bufs)
+ {
+ buf.dispose();
+ }
+ }
+ }
+
+ private MessageMetaData createTestMessageMetaData()
+ {
+ final MessagePublishInfo publishBody = new MessagePublishInfo(_exchange,
+ false,
+ false,
+ _routingKey);
+ final BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ props.setContentType(CONTENT_TYPE);
+ final ContentHeaderBody contentHeaderBody = new ContentHeaderBody(props);
+
+ return new MessageMetaData(publishBody, contentHeaderBody, _arrivalTime);
+ }
+
+ private List<QpidByteBuffer> splitIntoSmallerBuffers(final QpidByteBuffer qpidByteBuffer) throws IOException
+ {
+ List<QpidByteBuffer> bufs = new ArrayList<>();
+ try (InputStream stream = new QpidByteBufferInputStream(Collections.singletonList(qpidByteBuffer)))
+ {
+ byte[] transferBuf = new byte[3];
+ int read = stream.read(transferBuf);
+ while (read != -1)
+ {
+ QpidByteBuffer buf = QpidByteBuffer.allocateDirect(read);
+ buf.put(transferBuf, 0, read);
+ buf.flip();
+ bufs.add(buf);
+ read = stream.read(transferBuf);
+ }
+ }
+ return bufs;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 870a961..2479e85 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -71,7 +71,7 @@ import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java
index 6173543..7db8bcd 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.util.List;
+
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageMetaDataType;
@@ -40,9 +42,9 @@ public class MessageMetaDataType_1_0 implements MessageMetaDataType<MessageMetaD
}
@Override
- public MessageMetaData_1_0 createMetaData(QpidByteBuffer buf)
+ public MessageMetaData_1_0 createMetaData(List<QpidByteBuffer> bufs)
{
- return MessageMetaData_1_0.FACTORY.createMetaData(buf);
+ return MessageMetaData_1_0.FACTORY.createMetaData(bufs);
}
@Override
@@ -51,11 +53,13 @@ public class MessageMetaDataType_1_0 implements MessageMetaDataType<MessageMetaD
return new Message_1_0(msg);
}
+ @Override
public int hashCode()
{
return ordinal();
}
+ @Override
public boolean equals(Object o)
{
return o != null && o.getClass() == getClass();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index a599b2a..b3e544d 100755
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
@@ -410,19 +411,33 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
_typeRegistry.registerSecurityLayer();
}
- public MessageMetaData_1_0 createMetaData(QpidByteBuffer buf)
+ public MessageMetaData_1_0 createMetaData(List<QpidByteBuffer> bufs)
{
try
{
- byte versionByte = buf.get(buf.position());
+ if (!QpidByteBufferUtils.hasRemaining(bufs))
+ {
+ throw new ConnectionScopedRuntimeException("No metadata found");
+ }
+
+ byte versionByte = 0;
+ for (final QpidByteBuffer buf : bufs)
+ {
+ if (buf.hasRemaining())
+ {
+ versionByte = buf.get(buf.position());
+ break;
+ }
+ }
+
long arrivalTime;
long contentSize = 0;
if (versionByte == 1)
{
// we can discard the first byte
- buf.get();
- arrivalTime = buf.getLong();
- contentSize = buf.getLong();
+ QpidByteBufferUtils.get(bufs);
+ arrivalTime = QpidByteBufferUtils.getLong(bufs);
+ contentSize = QpidByteBufferUtils.getLong(bufs);
}
else if (versionByte == 0)
{
@@ -437,7 +452,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
SectionDecoder sectionDecoder = new SectionDecoderImpl(_typeRegistry.getSectionDecoderRegistry());
- List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(Collections.singletonList(buf));
+ List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(bufs);
if (versionByte == 0)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index c29fa09..1e61657 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -29,7 +29,7 @@ import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index e2044e6..15788ef 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -73,7 +73,7 @@ import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java
index 4683709..79f0d39 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java
index b66843a..486bea0 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java
index e204adc..1d7310d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java
@@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java
index 4232fa6..2fa9e41 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java
index e838d9f..6cce923 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.*;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
@@ -43,7 +44,7 @@ public class CharTypeConstructor implements TypeConstructor<String>
@Override
public String construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
- if(QpidByteBufferUtils.hasRemaining(in,4))
+ if(QpidByteBufferUtils.hasRemaining(in, 4))
{
int codePoint = QpidByteBufferUtils.getInt(in);
char[] chars = Character.toChars(codePoint);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
index cfb952b..1d4f3f8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java
index fe7ef10..d46f4aa 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java
@@ -23,6 +23,7 @@ import java.math.BigDecimal;
import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java
index 1eb7dcc..aba2d8c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java
index f1e6ce5..66be0d3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
index fa30d85..58350e6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
@@ -24,6 +24,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.transport.ByteBufferSender;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java
index b5439e5..69f3a25 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java
index aacb469..2b44607 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java
deleted file mode 100644
index 0744a1b..0000000
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0.codec;
-
-import java.nio.BufferUnderflowException;
-import java.util.List;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-
-public class QpidByteBufferUtils
-{
- public static boolean hasRemaining(List<QpidByteBuffer> in)
- {
- if (in.isEmpty())
- {
- return false;
- }
- for (int i = 0; i < in.size(); i++)
- {
- if (in.get(i).hasRemaining())
- {
- return true;
- }
- }
- return false;
- }
-
- public static long remaining(List<QpidByteBuffer> in)
- {
- long remaining = 0L;
- for (int i = 0; i < in.size(); i++)
- {
- remaining += in.get(i).remaining();
- }
- return remaining;
- }
-
- public static byte get(List<QpidByteBuffer> in)
- {
- for (int i = 0; i < in.size(); i++)
- {
- final QpidByteBuffer buffer = in.get(i);
- if (buffer.hasRemaining())
- {
- return buffer.get();
- }
- }
- throw new BufferUnderflowException();
- }
-
- public static boolean hasRemaining(final List<QpidByteBuffer> in, int len)
- {
- for (int i = 0; i < in.size(); i++)
- {
- final QpidByteBuffer buffer = in.get(i);
- int remaining = buffer.remaining();
- if (remaining >= len)
- {
- return true;
- }
- len -= remaining;
- }
-
- return false;
- }
-
- public static long getLong(final List<QpidByteBuffer> in)
- {
- boolean bytewise = false;
- int consumed = 0;
- long result = 0L;
- for (int i = 0; i < in.size(); i++)
- {
- final QpidByteBuffer buffer = in.get(i);
- int remaining = buffer.remaining();
- if (bytewise)
- {
- while (buffer.hasRemaining() && consumed < 8)
- {
- result <<= 1;
- result |= (0xFF & buffer.get());
- consumed++;
- }
- if (consumed == 8)
- {
- return result;
- }
- }
- else
- {
- if (remaining >= 8)
- {
- return buffer.getLong();
- }
- else if (remaining != 0)
- {
- bytewise = true;
- while (buffer.hasRemaining())
- {
- result <<= 1;
- result |= (0xFF & buffer.get());
- consumed++;
- }
- }
- }
- }
- throw new BufferUnderflowException();
- }
-
- public static int getInt(final List<QpidByteBuffer> in)
- {
- boolean bytewise = false;
- int consumed = 0;
- int result = 0;
- for (int i = 0; i < in.size(); i++)
- {
- final QpidByteBuffer buffer = in.get(i);
- int remaining = buffer.remaining();
- if (bytewise)
- {
- while (buffer.hasRemaining() && consumed < 4)
- {
- result <<= 1;
- result |= (0xFF & buffer.get());
- consumed++;
- }
- if (consumed == 4)
- {
- return result;
- }
- }
- else
- {
- if (remaining >= 4)
- {
- return buffer.getInt();
- }
- else if (remaining != 0)
- {
- bytewise = true;
- while (buffer.hasRemaining())
- {
- result <<= 1;
- result |= (0xFF & buffer.get());
- consumed++;
- }
- }
- }
- }
- throw new BufferUnderflowException();
- }
-
- public static float getFloat(final List<QpidByteBuffer> in)
- {
- return Float.intBitsToFloat(getInt(in));
- }
-
- public static double getDouble(final List<QpidByteBuffer> in)
- {
- return Double.longBitsToDouble(getLong(in));
- }
-
- public static Short getShort(final List<QpidByteBuffer> in)
- {
- boolean bytewise = false;
- int consumed = 0;
- short result = 0;
- for (int i = 0; i < in.size(); i++)
- {
- final QpidByteBuffer buffer = in.get(i);
- int remaining = buffer.remaining();
- if (bytewise)
- {
- while (buffer.hasRemaining() && consumed < 2)
- {
- result <<= 1;
- result |= (0xFF & buffer.get());
- consumed++;
- }
- if (consumed == 2)
- {
- return result;
- }
- }
- else
- {
- if (remaining >= 2)
- {
- return buffer.getShort();
- }
- else if (remaining != 0)
- {
- bytewise = true;
- while (buffer.hasRemaining())
- {
- result <<= 1;
- result |= (0xFF & buffer.get());
- consumed++;
- }
- }
- }
- }
- throw new BufferUnderflowException();
- }
-
- public static int get(final List<QpidByteBuffer> in, final byte[] data)
- {
- int copied = 0;
- int i = 0;
- while (copied < data.length && i < in.size())
- {
- QpidByteBuffer buf = in.get(i);
- if (buf.hasRemaining())
- {
- int remaining = buf.remaining();
- if (remaining >= data.length - copied)
- {
- buf.get(data, copied, data.length - copied);
- return data.length;
- }
- else
- {
- buf.get(data, copied, remaining);
- copied += remaining;
- }
- }
- i++;
- }
- return copied;
- }
-
- public static void skip(final List<QpidByteBuffer> in, int length)
- {
- int skipped = 0;
- int i = 0;
- while (skipped < length && i < in.size())
- {
- QpidByteBuffer buf = in.get(i);
- if (buf.hasRemaining())
- {
- int remaining = buf.remaining();
- if (remaining >= length - skipped)
- {
- buf.position(buf.position() + length - skipped);
- return;
- }
- else
- {
- buf.position(buf.position() + remaining);
- skipped += remaining;
- }
- }
- i++;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java
index 528979e..396911a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java
index d6b3ca4..5098b34 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java
@@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java
index 1188521..0bf79d7 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java
@@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java
index d3f6ae0..32df052 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java
@@ -20,6 +20,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java
index 2a3790a..845f745 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java
@@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java
index 324d06b..3a93f29 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java
@@ -26,6 +26,7 @@ import java.nio.CharBuffer;
import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SymbolTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SymbolTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SymbolTypeConstructor.java
index 0ad2319..07a7aac 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SymbolTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SymbolTypeConstructor.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/TimestampTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/TimestampTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/TimestampTypeConstructor.java
index 49a5db3..a76ae6f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/TimestampTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/TimestampTypeConstructor.java
@@ -24,6 +24,7 @@ import java.util.Date;
import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UByteTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UByteTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UByteTypeConstructor.java
index 792c412..d03668c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UByteTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UByteTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org