You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC
svn commit: r1525101 [4/21] - in /qpid/branches/linearstore/qpid: ./ bin/
cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/
cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/...
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/MessageReader.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/MessageReader.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/MessageReader.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/MessageReader.cpp Fri Sep 20 18:59:30 2013
@@ -21,6 +21,7 @@
#include "qpid/amqp/MessageReader.h"
#include "qpid/amqp/Descriptor.h"
#include "qpid/amqp/descriptors.h"
+#include "qpid/amqp/typecodes.h"
#include "qpid/types/Uuid.h"
#include "qpid/types/Variant.h"
#include "qpid/log/Statement.h"
@@ -55,40 +56,6 @@ const size_t REPLY_TO_GROUP_ID(12);
}
-/*
-Reader& MessageReader::HeaderReader::getReader(size_t index)
-{
- switch (index) {
- case DURABLE: return durableReader;
- case PRIORITY: return priorityReader;
- case TTL: return ttlReader;
- case FIRST_ACQUIRER: return firstAcquirerReader;
- case DELIVERY_COUNT: return deliveryCountReader;
- default: return noSuchFieldReader;
- }
-}
-
-Reader& MessageReader::PropertiesReader::getReader(size_t index)
-{
- switch (index) {
- case MESSAGE_ID: return messageIdReader;
- case USER_ID: return userIdReader;
- case TO: return toReader;
- case SUBJECT: return subjectReader;
- case REPLY_TO: return replyToReader;
- case CORRELATION_ID: return correlationIdReader;
- case CONTENT_TYPE: return contentTypeReader;
- case CONTENT_ENCODING: return contentEncodingReader;
- case ABSOLUTE_EXPIRY_TIME: return absoluteExpiryTimeReader;
- case CREATION_TIME: return creationTimeReader;
- case GROUP_ID: return groupIdReader;
- case GROUP_SEQUENCE: return groupSequenceReader;
- case REPLY_TO_GROUP_ID: return replyToGroupIdReader;
- default: return noSuchFieldReader;
- }
-}
-*/
-
MessageReader::HeaderReader::HeaderReader(MessageReader& p) : parent(p), index(0) {}
void MessageReader::HeaderReader::onBoolean(bool v, const Descriptor*) // durable, first-acquirer
{
@@ -132,7 +99,7 @@ void MessageReader::PropertiesReader::on
if (index == MESSAGE_ID) {
parent.onMessageId(v, qpid::types::VAR_UUID);
} else if (index == CORRELATION_ID) {
- parent.onCorrelationId(v);
+ parent.onCorrelationId(v, qpid::types::VAR_UUID);
} else {
QPID_LOG(warning, "Unexpected message format, got uuid at index " << index << " of properties");
}
@@ -154,7 +121,7 @@ void MessageReader::PropertiesReader::on
if (index == MESSAGE_ID) {
parent.onMessageId(v, qpid::types::VAR_STRING);
} else if (index == CORRELATION_ID) {
- parent.onCorrelationId(v);
+ parent.onCorrelationId(v, qpid::types::VAR_STRING);
} else if (index == USER_ID) {
parent.onUserId(v);
} else {
@@ -165,9 +132,9 @@ void MessageReader::PropertiesReader::on
void MessageReader::PropertiesReader::onString(const CharSequence& v, const Descriptor*) // message-id, correlation-id, group-id, reply-to-group-id, subject, to, reply-to
{
if (index == MESSAGE_ID) {
- parent.onMessageId(v);
+ parent.onMessageId(v, qpid::types::VAR_STRING);
} else if (index == CORRELATION_ID) {
- parent.onCorrelationId(v);
+ parent.onCorrelationId(v, qpid::types::VAR_STRING);
} else if (index == GROUP_ID) {
parent.onGroupId(v);
} else if (index == REPLY_TO_GROUP_ID) {
@@ -218,129 +185,76 @@ void MessageReader::PropertiesReader::on
{
++index;
}
-
-/*
-MessageReader::DurableReader::DurableReader(MessageReader& p) : parent(p) {}
-void MessageReader::DurableReader::onBoolean(bool v, const Descriptor*)
-{
- parent.onDurable(v);
-}
-MessageReader::PriorityReader::PriorityReader(MessageReader& p) : parent(p) {}
-void MessageReader::PriorityReader::onUByte(uint8_t v, const Descriptor*)
-{
- parent.onPriority(v);
-}
-MessageReader::TtlReader::TtlReader(MessageReader& p) : parent(p) {}
-void MessageReader::TtlReader::onUInt(uint32_t v, const Descriptor*)
-{
- parent.onTtl(v);
-}
-MessageReader::FirstAcquirerReader::FirstAcquirerReader(MessageReader& p) : parent(p) {}
-void MessageReader::FirstAcquirerReader::onBoolean(bool v, const Descriptor*)
-{
- parent.onFirstAcquirer(v);
-}
-MessageReader::DeliveryCountReader::DeliveryCountReader(MessageReader& p) : parent(p) {}
-void MessageReader::DeliveryCountReader::onUInt(uint32_t v, const Descriptor*)
-{
- parent.onDeliveryCount(v);
-}
-MessageReader::MessageIdReader::MessageIdReader(MessageReader& p) : parent(p) {}
-void MessageReader::MessageIdReader::onUuid(const qpid::types::Uuid& v, const Descriptor*)
-{
- parent.onMessageId(v);
-}
-void MessageReader::MessageIdReader::onULong(uint64_t v, const Descriptor*)
-{
- parent.onMessageId(v);
-}
-void MessageReader::MessageIdReader::onString(const CharSequence& v, const Descriptor*)
+void MessageReader::PropertiesReader::onBoolean(bool, const Descriptor*)
{
- parent.onMessageId(v);
-}
-void MessageReader::MessageIdReader::onBinary(const CharSequence& v, const Descriptor*)
-{
- parent.onMessageId(v);
-}
-MessageReader::UserIdReader::UserIdReader(MessageReader& p) : parent(p) {}
-void MessageReader::UserIdReader::onBinary(const CharSequence& v, const Descriptor*)
-{
- parent.onUserId(v);
-}
-MessageReader::ToReader::ToReader(MessageReader& p) : parent(p) {}
-void MessageReader::ToReader::onString(const CharSequence& v, const Descriptor*)
-{
- parent.onTo(v);
-}
-MessageReader::SubjectReader::SubjectReader(MessageReader& p) : parent(p) {}
-void MessageReader::SubjectReader::onString(const CharSequence& v, const Descriptor*)
-{
- parent.onSubject(v);
-}
-MessageReader::ReplyToReader::ReplyToReader(MessageReader& p) : parent(p) {}
-void MessageReader::ReplyToReader::onString(const CharSequence& v, const Descriptor*)
-{
- parent.onReplyTo(v);
+ QPID_LOG(info, "skipping message property at index " << index << " unexpected type (boolean)");
+ ++index;
}
-MessageReader::CorrelationIdReader::CorrelationIdReader(MessageReader& p) : parent(p) {}
-void MessageReader::CorrelationIdReader::onUuid(const qpid::types::Uuid& v, const Descriptor*)
+void MessageReader::PropertiesReader::onUByte(uint8_t, const Descriptor*)
{
- parent.onCorrelationId(v);
+ QPID_LOG(info, "skipping message property at index " << index << " unexpected type (ubyte)");
+ ++index;
}
-void MessageReader::CorrelationIdReader::onULong(uint64_t v, const Descriptor*)
+void MessageReader::PropertiesReader::onUShort(uint16_t, const Descriptor*)
{
- parent.onCorrelationId(v);
+ QPID_LOG(info, "skipping message property at index " << index << " unexpected type (ushort)");
+ ++index;
}
-void MessageReader::CorrelationIdReader::onString(const CharSequence& v, const Descriptor*)
+void MessageReader::PropertiesReader::onByte(int8_t, const Descriptor*)
{
- parent.onCorrelationId(v);
+ QPID_LOG(info, "skipping message property at index " << index << " unexpected type (byte)");
+ ++index;
}
-void MessageReader::CorrelationIdReader::onBinary(const CharSequence& v, const Descriptor*)
+void MessageReader::PropertiesReader::onShort(int16_t, const Descriptor*)
{
- parent.onCorrelationId(v);
+ QPID_LOG(info, "skipping message property at index " << index << " unexpected type (short)");
+ ++index;
}
-MessageReader::ContentTypeReader::ContentTypeReader(MessageReader& p) : parent(p) {}
-void MessageReader::ContentTypeReader::onString(const CharSequence& v, const Descriptor*)
+void MessageReader::PropertiesReader::onInt(int32_t, const Descriptor*)
{
- parent.onContentType(v);
+ QPID_LOG(info, "skipping message property at index " << index << " unexpected type (int)");
+ ++index;
}
-MessageReader::ContentEncodingReader::ContentEncodingReader(MessageReader& p) : parent(p) {}
-void MessageReader::ContentEncodingReader::onString(const CharSequence& v, const Descriptor*)
+void MessageReader::PropertiesReader::onLong(int64_t, const Descriptor*)
{
- parent.onContentEncoding(v);
+ QPID_LOG(info, "skipping message property at index " << index << " unexpected type (long)");
+ ++index;
}
-MessageReader::AbsoluteExpiryTimeReader::AbsoluteExpiryTimeReader(MessageReader& p) : parent(p) {}
-void MessageReader::AbsoluteExpiryTimeReader::onTimestamp(int64_t v, const Descriptor*)
+void MessageReader::PropertiesReader::onFloat(float, const Descriptor*)
{
- parent.onAbsoluteExpiryTime(v);
+ QPID_LOG(info, "skipping message property at index " << index << " unexpected type (float)");
+ ++index;
}
-MessageReader::CreationTimeReader::CreationTimeReader(MessageReader& p) : parent(p) {}
-void MessageReader::CreationTimeReader::onTimestamp(int64_t v, const Descriptor*)
+void MessageReader::PropertiesReader::onDouble(double, const Descriptor*)
{
- parent.onCreationTime(v);
+ QPID_LOG(info, "skipping message property at index " << index << " unexpected type (double)");
+ ++index;
}
-MessageReader::GroupIdReader::GroupIdReader(MessageReader& p) : parent(p) {}
-void MessageReader::GroupIdReader::onString(const CharSequence& v, const Descriptor*)
+bool MessageReader::PropertiesReader::onStartList(uint32_t /*count*/, const CharSequence& /*elements*/, const CharSequence& /*complete*/, const Descriptor*)
{
- parent.onGroupId(v);
+ QPID_LOG(info, "skipping message property at index " << index << " unexpected type (list)");
+ ++index;
+ return false;
}
-MessageReader::GroupSequenceReader::GroupSequenceReader(MessageReader& p) : parent(p) {}
-void MessageReader::GroupSequenceReader::onUInt(uint32_t v, const Descriptor*)
+bool MessageReader::PropertiesReader::onStartMap(uint32_t /*count*/, const CharSequence& /*elements*/, const CharSequence& /*complete*/, const Descriptor*)
{
- parent.onGroupSequence(v);
+ QPID_LOG(info, "skipping message property at index " << index << " unexpected type (map)");
+ ++index;
+ return false;
}
-MessageReader::ReplyToGroupIdReader::ReplyToGroupIdReader(MessageReader& p) : parent(p) {}
-void MessageReader::ReplyToGroupIdReader::onString(const CharSequence& v, const Descriptor*)
+bool MessageReader::PropertiesReader::onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*)
{
- parent.onReplyToGroupId(v);
+ QPID_LOG(info, "skipping message property at index " << index << " unexpected type (array)");
+ ++index;
+ return false;
}
-*/
+
//header, properties, amqp-sequence, amqp-value
-bool MessageReader::onStartList(uint32_t count, const CharSequence& raw, const Descriptor* descriptor)
+bool MessageReader::onStartList(uint32_t count, const CharSequence& elements, const CharSequence& raw, const Descriptor* descriptor)
{
if (delegate) {
- return delegate->onStartList(count, raw, descriptor);
+ return delegate->onStartList(count, elements, raw, descriptor);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got no descriptor for list.");
@@ -351,8 +265,11 @@ bool MessageReader::onStartList(uint32_t
} else if (descriptor->match(PROPERTIES_SYMBOL, PROPERTIES_CODE)) {
delegate = &propertiesReader;
return true;
- } else if (descriptor->match(AMQP_SEQUENCE_SYMBOL, AMQP_SEQUENCE_CODE) || descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
- onBody(raw, *descriptor);
+ } else if (descriptor->match(AMQP_SEQUENCE_SYMBOL, AMQP_SEQUENCE_CODE)) {
+ onAmqpSequence(raw);
+ return false;
+ } else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
+ onAmqpValue(elements, qpid::amqp::typecodes::LIST_NAME);
return false;
} else {
QPID_LOG(warning, "Unexpected described list: " << *descriptor);
@@ -372,28 +289,28 @@ void MessageReader::onEndList(uint32_t c
}
//delivery-annotations, message-annotations, application-properties, amqp-value
-bool MessageReader::onStartMap(uint32_t count, const CharSequence& raw, const Descriptor* descriptor)
+bool MessageReader::onStartMap(uint32_t count, const CharSequence& elements, const CharSequence& raw, const Descriptor* descriptor)
{
if (delegate) {
- return delegate->onStartMap(count, raw, descriptor);
+ return delegate->onStartMap(count, elements, raw, descriptor);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got no descriptor for map.");
return false;
} else if (descriptor->match(DELIVERY_ANNOTATIONS_SYMBOL, DELIVERY_ANNOTATIONS_CODE)) {
- onDeliveryAnnotations(raw);
+ onDeliveryAnnotations(elements, raw);
return false;
} else if (descriptor->match(MESSAGE_ANNOTATIONS_SYMBOL, MESSAGE_ANNOTATIONS_CODE)) {
- onMessageAnnotations(raw);
+ onMessageAnnotations(elements, raw);
return false;
} else if (descriptor->match(FOOTER_SYMBOL, FOOTER_CODE)) {
- onFooter(raw);
+ onFooter(elements, raw);
return false;
} else if (descriptor->match(APPLICATION_PROPERTIES_SYMBOL, APPLICATION_PROPERTIES_CODE)) {
- onApplicationProperties(raw);
+ onApplicationProperties(elements, raw);
return false;
} else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
- onBody(raw, *descriptor);
+ onAmqpValue(elements, qpid::amqp::typecodes::MAP_NAME);
return false;
} else {
QPID_LOG(warning, "Unexpected described map: " << *descriptor);
@@ -417,8 +334,10 @@ void MessageReader::onBinary(const CharS
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got binary value with no descriptor.");
- } else if (descriptor->match(DATA_SYMBOL, DATA_CODE) || descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
- onBody(bytes, *descriptor);
+ } else if (descriptor->match(DATA_SYMBOL, DATA_CODE)) {
+ onData(bytes);
+ } else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
+ onAmqpValue(bytes, qpid::amqp::typecodes::BINARY_NAME);
} else {
QPID_LOG(warning, "Unexpected binary value with descriptor: " << *descriptor);
}
@@ -434,7 +353,7 @@ void MessageReader::onNull(const Descrip
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant v;
- onBody(v, *descriptor);
+ onAmqpValue(v);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got null value with no descriptor.");
@@ -450,7 +369,7 @@ void MessageReader::onString(const CharS
delegate->onString(v, descriptor);
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
- onBody(v, *descriptor);
+ onAmqpValue(v, qpid::amqp::typecodes::STRING_NAME);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got string value with no descriptor.");
@@ -466,7 +385,7 @@ void MessageReader::onSymbol(const CharS
delegate->onSymbol(v, descriptor);
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
- onBody(v, *descriptor);
+ onAmqpValue(v, qpid::amqp::typecodes::SYMBOL_NAME);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got symbol value with no descriptor.");
@@ -484,7 +403,7 @@ void MessageReader::onBoolean(bool v, co
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got boolean value with no descriptor.");
@@ -502,7 +421,7 @@ void MessageReader::onUByte(uint8_t v, c
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got ubyte value with no descriptor.");
@@ -520,7 +439,7 @@ void MessageReader::onUShort(uint16_t v,
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got ushort value with no descriptor.");
@@ -538,7 +457,7 @@ void MessageReader::onUInt(uint32_t v, c
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got uint value with no descriptor.");
@@ -556,7 +475,7 @@ void MessageReader::onULong(uint64_t v,
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got ulong value with no descriptor.");
@@ -574,7 +493,7 @@ void MessageReader::onByte(int8_t v, con
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got byte value with no descriptor.");
@@ -592,7 +511,7 @@ void MessageReader::onShort(int16_t v, c
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got short value with no descriptor.");
@@ -610,7 +529,7 @@ void MessageReader::onInt(int32_t v, con
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got int value with no descriptor.");
@@ -628,7 +547,7 @@ void MessageReader::onLong(int64_t v, co
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got long value with no descriptor.");
@@ -646,7 +565,7 @@ void MessageReader::onFloat(float v, con
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got float value with no descriptor.");
@@ -664,7 +583,7 @@ void MessageReader::onDouble(double v, c
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got double value with no descriptor.");
@@ -681,7 +600,7 @@ void MessageReader::onUuid(const CharSeq
delegate->onUuid(v, descriptor);
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
- onBody(v, *descriptor);
+ onAmqpValue(v, qpid::amqp::typecodes::UUID_NAME);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got uuid value with no descriptor.");
@@ -699,7 +618,7 @@ void MessageReader::onTimestamp(int64_t
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got timestamp value with no descriptor.");
@@ -716,7 +635,8 @@ bool MessageReader::onStartArray(uint32_
return delegate->onStartArray(count, raw, constructor, descriptor);
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
- onBody(raw, *descriptor);
+ //TODO: might be better to decode this here
+ onAmqpValue(raw, qpid::amqp::typecodes::ARRAY_NAME);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got array with no descriptor.");
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/MessageReader.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/MessageReader.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/MessageReader.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/MessageReader.h Fri Sep 20 18:59:30 2013
@@ -24,7 +24,6 @@
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp/Reader.h"
-#include "qpid/amqp/ListReader.h"
#include "qpid/types/Variant.h"
#include "qpid/CommonImportExport.h"
@@ -40,11 +39,11 @@ class MessageReader : public Reader
QPID_COMMON_EXTERN MessageReader();
//header, properties, amqp-sequence, amqp-value
- QPID_COMMON_EXTERN bool onStartList(uint32_t, const CharSequence&, const Descriptor*);
+ QPID_COMMON_EXTERN bool onStartList(uint32_t, const CharSequence&, const CharSequence&, const Descriptor*);
QPID_COMMON_EXTERN void onEndList(uint32_t, const Descriptor*);
//delivery-annotations, message-annotations, application-headers, amqp-value
- QPID_COMMON_EXTERN bool onStartMap(uint32_t, const CharSequence&, const Descriptor*);
+ QPID_COMMON_EXTERN bool onStartMap(uint32_t, const CharSequence&, const CharSequence&, const Descriptor*);
QPID_COMMON_EXTERN void onEndMap(uint32_t, const Descriptor*);
//data, amqp-value
@@ -95,175 +94,24 @@ class MessageReader : public Reader
virtual void onGroupSequence(uint32_t) = 0;
virtual void onReplyToGroupId(const CharSequence&) = 0;
- virtual void onApplicationProperties(const CharSequence&) = 0;
- virtual void onDeliveryAnnotations(const CharSequence&) = 0;
- virtual void onMessageAnnotations(const CharSequence&) = 0;
- virtual void onBody(const CharSequence&, const Descriptor&) = 0;
- virtual void onBody(const qpid::types::Variant&, const Descriptor&) = 0;
- virtual void onFooter(const CharSequence&) = 0;
+ virtual void onApplicationProperties(const CharSequence& /*values*/, const CharSequence& /*full*/) = 0;
+ virtual void onDeliveryAnnotations(const CharSequence& /*values*/, const CharSequence& /*full*/) = 0;
+ virtual void onMessageAnnotations(const CharSequence& /*values*/, const CharSequence& /*full*/) = 0;
+
+ virtual void onData(const CharSequence&) = 0;
+ virtual void onAmqpSequence(const CharSequence&) = 0;
+ virtual void onAmqpValue(const CharSequence&, const std::string& type) = 0;
+ virtual void onAmqpValue(const qpid::types::Variant&) = 0;
+
+ virtual void onFooter(const CharSequence& /*values*/, const CharSequence& /*full*/) = 0;
QPID_COMMON_EXTERN CharSequence getBareMessage() const;
private:
- /*
- class DurableReader : public Reader
- {
- public:
- DurableReader(MessageReader&);
- void onBoolean(bool v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class PriorityReader : public Reader
- {
- public:
- PriorityReader(MessageReader&);
- void onUByte(uint8_t v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class TtlReader : public Reader
- {
- public:
- TtlReader(MessageReader&);
- void onUInt(uint32_t v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class FirstAcquirerReader : public Reader
- {
- public:
- FirstAcquirerReader(MessageReader&);
- void onBoolean(bool v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class DeliveryCountReader : public Reader
- {
- public:
- DeliveryCountReader(MessageReader&);
- void onUInt(uint32_t v, const Descriptor*);
- private:
- MessageReader& parent;
- };
-
- class MessageIdReader : public Reader
- {
- public:
- MessageIdReader(MessageReader&);
- void onUuid(const qpid::types::Uuid& v, const Descriptor*);
- void onULong(uint64_t v, const Descriptor*);
- void onString(const CharSequence& v, const Descriptor*);
- void onBinary(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class UserIdReader : public Reader
- {
- public:
- UserIdReader(MessageReader&);
- void onBinary(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class ToReader : public Reader
- {
- public:
- ToReader(MessageReader&);
- void onString(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class SubjectReader : public Reader
- {
- public:
- SubjectReader(MessageReader&);
- void onString(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class ReplyToReader : public Reader
- {
- public:
- ReplyToReader(MessageReader&);
- void onString(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class CorrelationIdReader : public Reader
- {
- public:
- CorrelationIdReader(MessageReader&);
- void onUuid(const qpid::types::Uuid& v, const Descriptor*);
- void onULong(uint64_t v, const Descriptor*);
- void onString(const CharSequence& v, const Descriptor*);
- void onBinary(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class ContentTypeReader : public Reader
- {
- public:
- ContentTypeReader(MessageReader&);
- void onString(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class ContentEncodingReader : public Reader
- {
- public:
- ContentEncodingReader(MessageReader&);
- void onString(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class AbsoluteExpiryTimeReader : public Reader
- {
- public:
- AbsoluteExpiryTimeReader(MessageReader&);
- void onTimestamp(int64_t v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class CreationTimeReader : public Reader
- {
- public:
- CreationTimeReader(MessageReader&);
- void onTimestamp(int64_t v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class GroupIdReader : public Reader
- {
- public:
- GroupIdReader(MessageReader&);
- void onString(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class GroupSequenceReader : public Reader
- {
- public:
- GroupSequenceReader(MessageReader&);
- void onUInt(uint32_t v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class ReplyToGroupIdReader : public Reader
- {
- public:
- ReplyToGroupIdReader(MessageReader&);
- void onString(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- */
- class HeaderReader : public Reader //public ListReader
+ class HeaderReader : public Reader
{
public:
- //Reader& getReader(size_t index);
-
HeaderReader(MessageReader&);
void onBoolean(bool v, const Descriptor*); // durable, first-acquirer
void onUByte(uint8_t v, const Descriptor*); // priority
@@ -273,11 +121,9 @@ class MessageReader : public Reader
MessageReader& parent;
size_t index;
};
- class PropertiesReader : public Reader //public ListReader
+ class PropertiesReader : public Reader
{
public:
- //Reader& getReader(size_t index);
-
PropertiesReader(MessageReader&);
void onUuid(const CharSequence& v, const Descriptor*); // message-id, correlation-id
void onULong(uint64_t v, const Descriptor*); // message-id, correlation-id
@@ -287,6 +133,20 @@ class MessageReader : public Reader
void onTimestamp(int64_t v, const Descriptor*); // absolute-expiry-time, creation-time
void onUInt(uint32_t v, const Descriptor*); // group-sequence
void onNull(const Descriptor*);
+
+ void onBoolean(bool, const Descriptor*);
+ void onUByte(uint8_t, const Descriptor*);
+ void onUShort(uint16_t, const Descriptor*);
+ void onByte(int8_t, const Descriptor*);
+ void onShort(int16_t, const Descriptor*);
+ void onInt(int32_t, const Descriptor*);
+ void onLong(int64_t, const Descriptor*);
+ void onFloat(float, const Descriptor*);
+ void onDouble(double, const Descriptor*);
+ bool onStartList(uint32_t /*count*/, const CharSequence& /*elements*/, const CharSequence& /*complete*/, const Descriptor*);
+ bool onStartMap(uint32_t /*count*/, const CharSequence& /*elements*/, const CharSequence& /*complete*/, const Descriptor*);
+ bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*);
+
private:
MessageReader& parent;
size_t index;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/Reader.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/Reader.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/Reader.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/Reader.h Fri Sep 20 18:59:30 2013
@@ -63,8 +63,8 @@ class Reader
* @return true to get elements of the compound value, false
* to skip over it
*/
- virtual bool onStartList(uint32_t /*count*/, const CharSequence&, const Descriptor*) { return true; }
- virtual bool onStartMap(uint32_t /*count*/, const CharSequence&, const Descriptor*) { return true; }
+ virtual bool onStartList(uint32_t /*count*/, const CharSequence& /*elements*/, const CharSequence& /*complete*/, const Descriptor*) { return true; }
+ virtual bool onStartMap(uint32_t /*count*/, const CharSequence& /*elements*/, const CharSequence& /*complete*/, const Descriptor*) { return true; }
virtual bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*) { return true; }
virtual void onEndList(uint32_t /*count*/, const Descriptor*) {}
virtual void onEndMap(uint32_t /*count*/, const Descriptor*) {}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/Sasl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/Sasl.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/Sasl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/Sasl.h Fri Sep 20 18:59:30 2013
@@ -35,12 +35,12 @@ namespace amqp {
class Sasl : protected Reader
{
public:
- Sasl(const std::string& id);
- virtual ~Sasl();
- std::size_t read(const char* data, size_t available);
- std::size_t write(char* data, size_t available);
- std::size_t readProtocolHeader(const char* buffer, std::size_t size);
- std::size_t writeProtocolHeader(char* buffer, std::size_t size);
+ QPID_COMMON_EXTERN Sasl(const std::string& id);
+ QPID_COMMON_EXTERN virtual ~Sasl();
+ QPID_COMMON_EXTERN std::size_t read(const char* data, size_t available);
+ QPID_COMMON_EXTERN std::size_t write(char* data, size_t available);
+ QPID_COMMON_EXTERN std::size_t readProtocolHeader(const char* buffer, std::size_t size);
+ QPID_COMMON_EXTERN std::size_t writeProtocolHeader(char* buffer, std::size_t size);
protected:
const std::string id;
std::vector<char> buffer;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslClient.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslClient.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslClient.cpp Fri Sep 20 18:59:30 2013
@@ -116,7 +116,7 @@ class SaslOutcomeReader : public Reader
};
}
-bool SaslClient::onStartList(uint32_t count, const CharSequence& arguments, const Descriptor* descriptor)
+bool SaslClient::onStartList(uint32_t count, const CharSequence& arguments, const CharSequence& /*full raw data*/, const Descriptor* descriptor)
{
if (!descriptor) {
QPID_LOG(error, "Expected described type in SASL negotiation but got no descriptor");
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslClient.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslClient.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslClient.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslClient.h Fri Sep 20 18:59:30 2013
@@ -21,6 +21,7 @@
* under the License.
*
*/
+#include <qpid/CommonImportExport.h>
#include "qpid/amqp/Sasl.h"
namespace qpid {
@@ -33,19 +34,19 @@ namespace amqp {
class SaslClient : public Sasl
{
public:
- SaslClient(const std::string& id);
- virtual ~SaslClient();
- virtual void mechanisms(const std::string&) = 0;
- virtual void challenge(const std::string&) = 0;
- virtual void challenge() = 0; //null != empty string
- virtual void outcome(uint8_t result, const std::string&) = 0;
- virtual void outcome(uint8_t result) = 0;
+ QPID_COMMON_EXTERN SaslClient(const std::string& id);
+ QPID_COMMON_EXTERN virtual ~SaslClient();
+ QPID_COMMON_EXTERN virtual void mechanisms(const std::string&) = 0;
+ QPID_COMMON_EXTERN virtual void challenge(const std::string&) = 0;
+ QPID_COMMON_EXTERN virtual void challenge() = 0; //null != empty string
+ QPID_COMMON_EXTERN virtual void outcome(uint8_t result, const std::string&) = 0;
+ QPID_COMMON_EXTERN virtual void outcome(uint8_t result) = 0;
- void init(const std::string& mechanism, const std::string* response, const std::string* hostname);
- void response(const std::string*);
+ QPID_COMMON_EXTERN void init(const std::string& mechanism, const std::string* response, const std::string* hostname);
+ QPID_COMMON_EXTERN void response(const std::string*);
private:
- bool onStartList(uint32_t count, const CharSequence& arguments, const Descriptor* descriptor);
+ QPID_COMMON_EXTERN bool onStartList(uint32_t count, const CharSequence& arguments, const CharSequence&, const Descriptor* descriptor);
};
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslServer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslServer.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslServer.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslServer.cpp Fri Sep 20 18:59:30 2013
@@ -158,7 +158,7 @@ class SaslResponseReader : public Reader
};
}
-bool SaslServer::onStartList(uint32_t count, const CharSequence& arguments, const Descriptor* descriptor)
+bool SaslServer::onStartList(uint32_t count, const CharSequence& arguments, const CharSequence& /*full raw data*/, const Descriptor* descriptor)
{
if (!descriptor) {
QPID_LOG(error, "Expected described type in SASL negotiation but got no descriptor");
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslServer.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslServer.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslServer.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/SaslServer.h Fri Sep 20 18:59:30 2013
@@ -33,17 +33,17 @@ namespace amqp {
class SaslServer : public Sasl
{
public:
- SaslServer(const std::string& id);
- virtual ~SaslServer();
+ QPID_COMMON_EXTERN SaslServer(const std::string& id);
+ QPID_COMMON_EXTERN virtual ~SaslServer();
virtual void init(const std::string& mechanism, const std::string* response, const std::string* hostname) = 0;
virtual void response(const std::string*) = 0;
- void mechanisms(const std::string& mechanisms);
- void challenge(const std::string*);
- void completed(bool succeeded);
+ QPID_COMMON_EXTERN void mechanisms(const std::string& mechanisms);
+ QPID_COMMON_EXTERN void challenge(const std::string*);
+ QPID_COMMON_EXTERN void completed(bool succeeded);
private:
- bool onStartList(uint32_t count, const CharSequence& arguments, const Descriptor* descriptor);
+ QPID_COMMON_EXTERN bool onStartList(uint32_t count, const CharSequence& arguments, const CharSequence&, const Descriptor* descriptor);
};
}} // namespace qpid::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/descriptors.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/descriptors.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/descriptors.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/descriptors.h Fri Sep 20 18:59:30 2013
@@ -52,6 +52,7 @@ const Descriptor DELIVERY_ANNOTATIONS(DE
const Descriptor MESSAGE_ANNOTATIONS(MESSAGE_ANNOTATIONS_CODE);
const Descriptor PROPERTIES(PROPERTIES_CODE);
const Descriptor APPLICATION_PROPERTIES(APPLICATION_PROPERTIES_CODE);
+const Descriptor AMQP_VALUE(AMQP_VALUE_CODE);
const Descriptor DATA(DATA_CODE);
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/typecodes.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/typecodes.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/typecodes.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/typecodes.h Fri Sep 20 18:59:30 2013
@@ -83,7 +83,7 @@ const uint8_t ARRAY32(0xf0);
const std::string NULL_NAME("null");
-const std::string BOOLEAN_NAME("name");
+const std::string BOOLEAN_NAME("bool");
const std::string UBYTE_NAME("ubyte");
const std::string USHORT_NAME("ushort");
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/broker:r1501885-1525056
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp Fri Sep 20 18:59:30 2013
@@ -38,6 +38,7 @@
#include "qpid/broker/PersistableObject.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/QueueSettings.h"
+#include "qpid/broker/TransactionObserver.h"
#include "qpid/broker/MessageGroupManager.h"
#include "qmf/org/apache/qpid/broker/Package.h"
@@ -139,7 +140,7 @@ Broker::Options::Options(const std::stri
requireEncrypted(false),
knownHosts(knownHostsNone),
qmf2Support(true),
- qmf1Support(true),
+ qmf1Support(false),
queueFlowStopRatio(80),
queueFlowResumeRatio(70),
queueThresholdEventRatio(80),
@@ -399,7 +400,7 @@ boost::intrusive_ptr<Broker> Broker::cre
return boost::intrusive_ptr<Broker>(new Broker(opts));
}
-void Broker::setStore (boost::shared_ptr<MessageStore>& _store)
+void Broker::setStore (const boost::shared_ptr<MessageStore>& _store)
{
store.reset(new MessageStoreModule (_store));
setStore();
@@ -442,6 +443,8 @@ void Broker::shutdown() {
}
Broker::~Broker() {
+ if (mgmtObject != 0)
+ mgmtObject->debugStats("destroying");
shutdown();
finalize(); // Finalize any plugins.
if (config.auth)
@@ -1446,7 +1449,7 @@ void Broker::bind(const std::string& que
throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
} else {
if (queue->bind(exchange, key, arguments)) {
- getConfigurationObservers().bind(exchange, queue, key, arguments);
+ getBrokerObservers().bind(exchange, queue, key, arguments);
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName,
queueName, key, ManagementAgent::toMap(arguments)));
@@ -1488,7 +1491,7 @@ void Broker::unbind(const std::string& q
if (exchange->isDurable() && queue->isDurable()) {
store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
}
- getConfigurationObservers().unbind(
+ getBrokerObservers().unbind(
exchange, queue, key, framing::FieldTable());
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key));
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.h Fri Sep 20 18:59:30 2013
@@ -38,7 +38,7 @@
#include "qpid/broker/System.h"
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/ConnectionObservers.h"
-#include "qpid/broker/ConfigurationObservers.h"
+#include "qpid/broker/BrokerObservers.h"
#include "qpid/management/Manageable.h"
#include "qpid/sys/ConnectionCodec.h"
#include "qpid/sys/Mutex.h"
@@ -65,6 +65,7 @@ class AclModule;
class ExpiryPolicy;
class Message;
struct QueueSettings;
+
static const uint16_t DEFAULT_PORT=5672;
struct NoSuchTransportException : qpid::Exception
@@ -175,7 +176,7 @@ class Broker : public sys::Runnable, pub
AclModule* acl;
DataDir dataDir;
ConnectionObservers connectionObservers;
- ConfigurationObservers configurationObservers;
+ BrokerObservers brokerObservers;
QueueRegistry queues;
ExchangeRegistry exchanges;
@@ -225,7 +226,8 @@ class Broker : public sys::Runnable, pub
/** Shut down the broker */
QPID_BROKER_EXTERN virtual void shutdown();
- QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store);
+ QPID_BROKER_EXTERN void setStore (const boost::shared_ptr<MessageStore>& store);
+ bool hasStore() const { return store.get(); }
MessageStore& getStore() { return *store; }
void setAcl (AclModule* _acl) {acl = _acl;}
AclModule* getAcl() { return acl; }
@@ -352,7 +354,7 @@ class Broker : public sys::Runnable, pub
ConsumerFactories& getConsumerFactories() { return consumerFactories; }
ConnectionObservers& getConnectionObservers() { return connectionObservers; }
- ConfigurationObservers& getConfigurationObservers() { return configurationObservers; }
+ BrokerObservers& getBrokerObservers() { return brokerObservers; }
/** Properties to be set on outgoing link connections */
QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Sep 20 18:59:30 2013
@@ -106,7 +106,9 @@ void DeliveryRecord::dequeue(Transaction
void DeliveryRecord::committed() const
{
- queue->dequeueCommitted(msg);
+ if (acquired && !ended) {
+ queue->dequeueCommitted(msg);
+ }
}
void DeliveryRecord::reject()
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DirectExchange.cpp Fri Sep 20 18:59:30 2013
@@ -196,6 +196,9 @@ bool DirectExchange::isBound(Queue::shar
return false;
}
-DirectExchange::~DirectExchange() {}
+DirectExchange::~DirectExchange() {
+ if (mgmtExchange != 0)
+ mgmtExchange->debugStats("destroying");
+}
const std::string DirectExchange::typeName("direct");
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DirectExchange.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DirectExchange.h Fri Sep 20 18:59:30 2013
@@ -42,7 +42,7 @@ class DirectExchange : public virtual Ex
qpid::sys::Mutex lock;
public:
- static const std::string typeName;
+ QPID_BROKER_EXTERN static const std::string typeName;
QPID_BROKER_EXTERN DirectExchange(const std::string& name,
management::Manageable* parent = 0, Broker* broker = 0);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DtxAck.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DtxAck.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DtxAck.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/DtxAck.h Fri Sep 20 18:59:30 2013
@@ -39,6 +39,9 @@ class DtxAck : public TxOp{
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
+ // TODO aconway 2013-07-08:
+ virtual void callObserver(const boost::shared_ptr<TransactionObserver>&) {}
+
virtual ~DtxAck(){}
const DeliveryRecords& getPending() const { return pending; }
};
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Exchange.cpp Fri Sep 20 18:59:30 2013
@@ -353,6 +353,7 @@ Exchange::Binding::Binding(const string&
Exchange::Binding::~Binding ()
{
if (mgmtBinding != 0) {
+ mgmtBinding->debugStats("destroying");
_qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject());
if (mo != 0)
mo->dec_bindingCount();
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Fri Sep 20 18:59:30 2013
@@ -81,7 +81,7 @@ pair<Exchange::shared_ptr, bool> Exchang
result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
if (alternate) exchange->setAlternate(alternate);
// Call exchangeCreate inside the lock to ensure correct ordering.
- if (broker) broker->getConfigurationObservers().exchangeCreate(exchange);
+ if (broker) broker->getBrokerObservers().exchangeCreate(exchange);
} else {
result = std::pair<Exchange::shared_ptr, bool>(i->second, false);
}
@@ -118,7 +118,7 @@ void ExchangeRegistry::destroy(
if (broker) {
// Call exchangeDestroy and raiseEvent inside the lock to ensure
// correct ordering.
- broker->getConfigurationObservers().exchangeDestroy(i->second);
+ broker->getBrokerObservers().exchangeDestroy(i->second);
if (broker->getManagementAgent())
broker->getManagementAgent()->raiseEvent(
_qmf::EventExchangeDelete(connectionId, userId, name));
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Fri Sep 20 18:59:30 2013
@@ -117,6 +117,9 @@ bool FanOutExchange::isBound(Queue::shar
}
-FanOutExchange::~FanOutExchange() {}
+FanOutExchange::~FanOutExchange() {
+ if (mgmtExchange != 0)
+ mgmtExchange->debugStats("destroying");
+}
const std::string FanOutExchange::typeName("fanout");
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Fri Sep 20 18:59:30 2013
@@ -339,7 +339,10 @@ void HeadersExchange::getNonFedArgs(cons
}
}
-HeadersExchange::~HeadersExchange() {}
+HeadersExchange::~HeadersExchange() {
+ if (mgmtExchange != 0)
+ mgmtExchange->debugStats("destroying");
+}
const std::string HeadersExchange::typeName("headers");
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/HeadersExchange.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/HeadersExchange.h Fri Sep 20 18:59:30 2013
@@ -79,7 +79,7 @@ class HeadersExchange : public virtual E
framing::FieldTable& nonFedArgs);
public:
- static const std::string typeName;
+ QPID_BROKER_EXTERN static const std::string typeName;
QPID_BROKER_EXTERN HeadersExchange(const std::string& name,
management::Manageable* parent = 0, Broker* broker = 0);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/IngressCompletion.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/IngressCompletion.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/IngressCompletion.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/IngressCompletion.h Fri Sep 20 18:59:30 2013
@@ -38,7 +38,7 @@ class Queue;
class IngressCompletion : public AsyncCompletion
{
public:
- virtual ~IngressCompletion();
+ QPID_BROKER_EXTERN virtual ~IngressCompletion();
void enqueueAsync(boost::shared_ptr<Queue>);
void flush();
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Link.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Link.cpp Fri Sep 20 18:59:30 2013
@@ -536,7 +536,7 @@ bool Link::tryFailoverLH() {
if (url.empty()) return false;
Address next = url[reconnectNext++];
if (next.host != host || next.port != port || next.protocol != transport) {
- QPID_LOG(notice, "Inter-broker link '" << name << "' failing over to " << next);
+ QPID_LOG(info, "Inter-broker link '" << name << "' failing over to " << next);
reconnectLH(next);
return true;
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Link.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Link.h Fri Sep 20 18:59:30 2013
@@ -151,8 +151,8 @@ class Link : public PersistableConfig, p
bool isDurable() { return durable; }
void maintenanceVisit ();
- framing::ChannelId nextChannel(); // allocate channel from link free pool
- void returnChannel(framing::ChannelId); // return channel to link free pool
+ QPID_BROKER_EXTERN framing::ChannelId nextChannel(); // allocate channel from link free pool
+ QPID_BROKER_EXTERN void returnChannel(framing::ChannelId); // return channel to link free pool
void add(Bridge::shared_ptr);
void cancel(Bridge::shared_ptr);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/LossyQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/LossyQueue.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/LossyQueue.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/LossyQueue.cpp Fri Sep 20 18:59:30 2013
@@ -52,7 +52,7 @@ bool LossyQueue::checkDepth(const QueueD
QPID_LOG(debug, "purging " << name << ": current depth is [" << current << "], max depth is [" << settings.maxDepth << "], new message has size " << increment.getSize());
qpid::sys::Mutex::ScopedUnlock u(messageLock);
//TODO: arguably we should try and purge expired messages first but that is potentially expensive
- if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), MessageFunctor(), PURGE, false)) {
+ if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), boost::bind(&reroute, alternateExchange, _1), PURGE, false)) {
if (mgmtObject) {
mgmtObject->inc_discardsRing(1);
if (brokerMgmtObject)
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Lvq.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Lvq.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Lvq.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Lvq.cpp Fri Sep 20 18:59:30 2013
@@ -57,7 +57,8 @@ void Lvq::push(Message& message, bool is
copy.notify();
if (removed) {
if (isRecovery) pendingDequeues.push_back(old);
- else dequeueFromStore(old.getPersistentContext());//do outside of lock
+ else if (old.isPersistent())
+ dequeueFromStore(old.getPersistentContext());//do outside of lock
}
}
}} // namespace qpid::broker
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.cpp Fri Sep 20 18:59:30 2013
@@ -145,7 +145,7 @@ uint64_t Message::getTtl() const
}
}
-bool Message::getTtl(uint64_t ttl) const
+bool Message::getTtl(uint64_t& ttl) const
{
if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) {
sys::Duration remaining(sys::AbsTime::now(), getExpiration());
@@ -182,8 +182,10 @@ void Message::addAnnotation(const std::s
void Message::annotationsChanged()
{
if (persistentContext) {
+ uint64_t id = persistentContext->getPersistenceId();
persistentContext = persistentContext->merge(annotations);
persistentContext->setIngressCompletion(encoding);
+ persistentContext->setPersistenceId(id);
}
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.h Fri Sep 20 18:59:30 2013
@@ -39,7 +39,7 @@
namespace qpid {
namespace amqp {
class MapHandler;
-class MessageId;
+struct MessageId;
}
namespace management {
@@ -104,7 +104,7 @@ public:
sys::AbsTime getExpiration() const { return expiration; }
void setExpiration(sys::AbsTime exp) { expiration = exp; }
uint64_t getTtl() const;
- bool getTtl(uint64_t) const;
+ QPID_BROKER_EXTERN bool getTtl(uint64_t&) const;
/** set the timestamp delivery property to the current time-of-day */
QPID_BROKER_EXTERN void setTimestamp();
@@ -136,7 +136,7 @@ public:
QPID_BROKER_EXTERN qpid::types::Variant getAnnotation(const std::string& key) const;
QPID_BROKER_EXTERN const qpid::types::Variant::Map& getAnnotations() const;
- std::string getUserId() const;
+ QPID_BROKER_EXTERN std::string getUserId() const;
QPID_BROKER_EXTERN std::string getContent() const;//Used for ha, management, when content needs to be decoded
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Fri Sep 20 18:59:30 2013
@@ -33,7 +33,7 @@ using std::string;
namespace qpid {
namespace broker {
-MessageStoreModule::MessageStoreModule(boost::shared_ptr<MessageStore>& _store)
+MessageStoreModule::MessageStoreModule(const boost::shared_ptr<MessageStore>& _store)
: store(_store) {}
MessageStoreModule::~MessageStoreModule()
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/MessageStoreModule.h Fri Sep 20 18:59:30 2013
@@ -38,7 +38,7 @@ class MessageStoreModule : public Messag
{
boost::shared_ptr<MessageStore> store;
public:
- MessageStoreModule(boost::shared_ptr<MessageStore>& store);
+ MessageStoreModule(const boost::shared_ptr<MessageStore>& store);
bool init(const Options* options);
std::auto_ptr<TransactionContext> begin();
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/ObjectFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/ObjectFactory.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/ObjectFactory.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/ObjectFactory.h Fri Sep 20 18:59:30 2013
@@ -21,6 +21,7 @@
* under the License.
*
*/
+#include "qpid/broker/BrokerImportExport.h"
#include "qpid/types/Variant.h"
#include <vector>
@@ -56,7 +57,7 @@ class ObjectFactoryRegistry : public Obj
bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, uint64_t persistenceId);
~ObjectFactoryRegistry();
- void add(ObjectFactory*);
+ QPID_BROKER_EXTERN void add(ObjectFactory*);
private:
typedef std::vector<ObjectFactory*> Factories;
Factories factories;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PagedQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PagedQueue.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PagedQueue.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PagedQueue.cpp Fri Sep 20 18:59:30 2013
@@ -64,7 +64,7 @@ size_t decode(ProtocolRegistry& protocol
}
PagedQueue::PagedQueue(const std::string& name, const std::string& directory, uint m, uint factor, ProtocolRegistry& p)
- : pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0)
+ : pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0)
{
path = file.open(name, directory);
QPID_LOG(debug, "PagedQueue[" << path << "]");
@@ -295,7 +295,8 @@ void PagedQueue::Page::load(MemoryMapped
bool haveData = used > 0;
used = 4;//first 4 bytes are the count
if (haveData) {
- uint32_t count = *(reinterpret_cast<uint32_t*>(region));
+ qpid::framing::Buffer buffer(region, sizeof(uint32_t));
+ uint32_t count = buffer.getLong();
//decode messages into Page::messages
for (size_t i = 0; i < count; ++i) {
Message message;
@@ -331,7 +332,8 @@ void PagedQueue::Page::unload(MemoryMapp
if (i->getState() == ACQUIRED) acquired.add(i->getSequence());
}
uint32_t count = messages.size();
- ::memcpy(region, &count, sizeof(count));
+ qpid::framing::Buffer buffer(region, sizeof(uint32_t));
+ buffer.putLong(count);
file.flush(region, size);
file.unmap(region, size);
//remove messages from memory
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PersistableMessage.h Fri Sep 20 18:59:30 2013
@@ -62,8 +62,8 @@ class PersistableMessage : public Persis
mutable uint64_t persistenceId;
public:
- virtual ~PersistableMessage();
- PersistableMessage();
+ QPID_BROKER_EXTERN virtual ~PersistableMessage();
+ QPID_BROKER_EXTERN PersistableMessage();
virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PersistableObject.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PersistableObject.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PersistableObject.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PersistableObject.h Fri Sep 20 18:59:30 2013
@@ -21,6 +21,7 @@
* under the License.
*
*/
+#include "qpid/broker/BrokerImportExport.h"
#include "PersistableConfig.h"
#include "qpid/types/Variant.h"
#include <vector>
@@ -37,13 +38,13 @@ class RecoverableConfig;
class PersistableObject : public PersistableConfig
{
public:
- PersistableObject(const std::string& name, const std::string& type, const qpid::types::Variant::Map properties);
- virtual ~PersistableObject();
- const std::string& getName() const;
- void setPersistenceId(uint64_t id) const;
- uint64_t getPersistenceId() const;
- void encode(framing::Buffer& buffer) const;
- uint32_t encodedSize() const;
+ QPID_BROKER_EXTERN PersistableObject(const std::string& name, const std::string& type, const qpid::types::Variant::Map properties);
+ QPID_BROKER_EXTERN virtual ~PersistableObject();
+ QPID_BROKER_EXTERN const std::string& getName() const;
+ QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const;
+ QPID_BROKER_EXTERN uint64_t getPersistenceId() const;
+ QPID_BROKER_EXTERN void encode(framing::Buffer& buffer) const;
+ QPID_BROKER_EXTERN uint32_t encodedSize() const;
friend class RecoveredObjects;
private:
std::string name;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PriorityQueue.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PriorityQueue.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/PriorityQueue.cpp Fri Sep 20 18:59:30 2013
@@ -135,7 +135,12 @@ void PriorityQueue::publish(const Messag
Message* PriorityQueue::release(const QueueCursor& cursor)
{
MessagePointer* ptr = fifo.release(cursor);
- return ptr ? &(ptr->holder->message) : 0;
+ if (ptr) {
+ messages[ptr->holder->priority].resetCursors();
+ return &(ptr->holder->message);
+ } else {
+ return 0;
+ }
}
void PriorityQueue::foreach(Functor f)
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Protocol.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Protocol.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Protocol.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Protocol.h Fri Sep 20 18:59:30 2013
@@ -72,7 +72,7 @@ class ProtocolRegistry : public Protocol
QPID_BROKER_EXTERN Message decode(qpid::framing::Buffer&);
QPID_BROKER_EXTERN ~ProtocolRegistry();
- void add(const std::string&, Protocol*);
+ QPID_BROKER_EXTERN void add(const std::string&, Protocol*);
private:
//name may be useful for descriptive purposes or even for some
//limited manipulation of ordering
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp Fri Sep 20 18:59:30 2013
@@ -34,6 +34,7 @@
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/Selector.h"
+#include "qpid/broker/TransactionObserver.h"
//TODO: get rid of this
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
@@ -167,6 +168,12 @@ void Queue::TxPublish::rollback() throw(
}
}
+void Queue::TxPublish::callObserver(
+ const boost::shared_ptr<TransactionObserver>& observer)
+{
+ observer->enqueue(queue, message);
+}
+
Queue::Queue(const string& _name, const QueueSettings& _settings,
MessageStore* const _store,
Manageable* parent,
@@ -216,6 +223,8 @@ Queue::Queue(const string& _name, const
Queue::~Queue()
{
+ if (mgmtObject != 0)
+ mgmtObject->debugStats("destroying");
}
bool Queue::isLocal(const Message& msg)
@@ -270,6 +279,7 @@ void Queue::deliver(Message msg, TxBuffe
void Queue::deliverTo(Message msg, TxBuffer* txn)
{
if (accept(msg)) {
+ interceptors.record(msg);
if (txn) {
TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
txn->enlist(op);
@@ -385,6 +395,7 @@ bool Queue::getNextMessage(Message& m, C
if (!checkNotDeleted(c)) return false;
QueueListeners::NotificationSet set;
ScopedAutoDelete autodelete(*this);
+ bool messageFound(false);
while (true) {
//TODO: reduce lock scope
Mutex::ScopedLock locker(messageLock);
@@ -426,7 +437,8 @@ bool Queue::getNextMessage(Message& m, C
QPID_LOG(debug, "Message " << msg->getSequence() << " retrieved from '"
<< name << "'");
m = *msg;
- return true;
+ messageFound = true;
+ break;
} else {
//message(s) are available but consumer hasn't got enough credit
QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
@@ -448,11 +460,12 @@ bool Queue::getNextMessage(Message& m, C
} else {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
- return false;
+ break;
}
+
}
set.notify();
- return false;
+ return messageFound;
}
void Queue::removeListener(Consumer::shared_ptr c)
@@ -677,17 +690,6 @@ namespace {
return new MessageFilter();
}
- bool reroute(boost::shared_ptr<Exchange> e, const Message& m)
- {
- if (e) {
- DeliverableMessage d(m, 0);
- d.getMessage().clearTrace();
- e->routeWithAlternate(d);
- return true;
- } else {
- return false;
- }
- }
void moveTo(boost::shared_ptr<Queue> q, Message& m)
{
if (q) {
@@ -846,7 +848,6 @@ bool Queue::isEmpty(const Mutex::ScopedL
*/
bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
{
- interceptors.record(msg);
ScopedUse u(barrier);
if (!u.acquired) return false;
@@ -1684,6 +1685,19 @@ void Queue::setMgmtRedirectState( std::s
mgmtObject->set_redirectSource(isSrc);
}
}
+
+bool Queue::reroute(boost::shared_ptr<Exchange> e, const Message& m)
+{
+ if (e) {
+ DeliverableMessage d(m, 0);
+ d.getMessage().clearTrace();
+ e->routeWithAlternate(d);
+ return true;
+ } else {
+ return false;
+ }
+}
+
Queue::QueueUsers::QueueUsers() : consumers(0), browsers(0), others(0), controller(false) {}
void Queue::QueueUsers::addConsumer() { ++consumers; }
void Queue::QueueUsers::addBrowser() { ++browsers; }
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.h Fri Sep 20 18:59:30 2013
@@ -118,6 +118,7 @@ class Queue : public boost::enable_share
bool prepare(TransactionContext* ctxt) throw();
void commit() throw();
void rollback() throw();
+ void callObserver(const boost::shared_ptr<TransactionObserver>&);
};
/**
@@ -501,6 +502,9 @@ class Queue : public boost::enable_share
QPID_BROKER_EXTERN bool isRedirectSource() const { return redirectSource; }
QPID_BROKER_EXTERN void setMgmtRedirectState( std::string peer, bool enabled, bool isSrc );
+ //utility function
+ static bool reroute(boost::shared_ptr<Exchange> e, const Message& m);
+
friend class QueueFactory;
};
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueFactory.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueFactory.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueFactory.cpp Fri Sep 20 18:59:30 2013
@@ -33,6 +33,7 @@
#include "qpid/broker/PagedQueue.h"
#include "qpid/broker/PriorityQueue.h"
#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/SelfDestructQueue.h"
#include "qpid/broker/ThresholdAlerts.h"
#include "qpid/broker/FifoDistributor.h"
#include "qpid/log/Statement.h"
@@ -53,6 +54,8 @@ boost::shared_ptr<Queue> QueueFactory::c
boost::shared_ptr<Queue> queue;
if (settings.dropMessagesAtLimit) {
queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker));
+ } else if (settings.selfDestructAtLimit) {
+ queue = boost::shared_ptr<Queue>(new SelfDestructQueue(name, settings, settings.durable ? store : 0, parent, broker));
} else if (settings.lvqKey.size()) {
std::auto_ptr<MessageMap> map(new MessageMap(settings.lvqKey));
queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? store : 0, parent, broker));
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Fri Sep 20 18:59:30 2013
@@ -59,8 +59,8 @@ QueueRegistry::declare(const string& nam
QueueMap::iterator i = queues.find(name);
if (i == queues.end()) {
Queue::shared_ptr queue = create(name, settings);
- // Allow ConfigurationObserver to modify settings before storing the message.
- if (getBroker()) getBroker()->getConfigurationObservers().queueCreate(queue);
+ // Allow BrokerObserver to modify settings before storing the message.
+ if (getBroker()) getBroker()->getBrokerObservers().queueCreate(queue);
//Move this to factory also?
if (alternate)
queue->setAlternateExchange(alternate);//need to do this *before* create
@@ -100,7 +100,7 @@ void QueueRegistry::destroy(
// NOTE: queueDestroy and raiseEvent must be called with the
// lock held in order to ensure events are generated
// in the correct order.
- getBroker()->getConfigurationObservers().queueDestroy(q);
+ getBroker()->getBrokerObservers().queueDestroy(q);
if (getBroker()->getManagementAgent())
getBroker()->getManagementAgent()->raiseEvent(
_qmf::EventQueueDelete(connectionId, userId, name));
@@ -121,7 +121,9 @@ Queue::shared_ptr QueueRegistry::find(co
Queue::shared_ptr QueueRegistry::get(const string& name) {
Queue::shared_ptr q = find(name);
- if (!q) throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name));
+ if (!q) {
+ throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name));
+ }
return q;
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.cpp Fri Sep 20 18:59:30 2013
@@ -38,6 +38,7 @@ const std::string MAX_FILE_SIZE("qpid.fi
const std::string POLICY_TYPE("qpid.policy_type");
const std::string POLICY_TYPE_REJECT("reject");
const std::string POLICY_TYPE_RING("ring");
+const std::string POLICY_TYPE_SELF_DESTRUCT("self-destruct");
const std::string NO_LOCAL("no-local");
const std::string BROWSE_ONLY("qpid.browse-only");
const std::string TRACE_ID("qpid.trace.id");
@@ -96,6 +97,7 @@ QueueSettings::QueueSettings(bool d, boo
shareGroups(false),
addTimestamp(false),
dropMessagesAtLimit(false),
+ selfDestructAtLimit(false),
paging(false),
maxPages(0),
pageFactor(0),
@@ -120,6 +122,9 @@ bool QueueSettings::handle(const std::st
if (value.getString() == POLICY_TYPE_RING) {
dropMessagesAtLimit = true;
return true;
+ } else if (value.getString() == POLICY_TYPE_SELF_DESTRUCT) {
+ selfDestructAtLimit = true;
+ return true;
} else if (value.getString() == POLICY_TYPE_REJECT) {
//do nothing, thats the default
return true;
@@ -168,6 +173,7 @@ bool QueueSettings::handle(const std::st
return true;
} else if (key == AUTO_DELETE_TIMEOUT) {
autoDeleteDelay = value;
+ if (autoDeleteDelay) autodelete = true;
return true;
} else if (key == QueueFlowLimit::flowStopCountKey) {
flowStop.setCount(value);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.h Fri Sep 20 18:59:30 2013
@@ -70,6 +70,7 @@ struct QueueSettings
QueueDepth maxDepth;
bool dropMessagesAtLimit;//aka ring queue policy
+ bool selfDestructAtLimit;
//PagedQueue:
bool paging;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h Fri Sep 20 18:59:30 2013
@@ -33,7 +33,7 @@ class RecoverableMessageImpl : public Re
{
Message msg;
public:
- RecoverableMessageImpl(const Message& _msg);
+ QPID_BROKER_EXTERN RecoverableMessageImpl(const Message& _msg);
~RecoverableMessageImpl() {};
void setPersistenceId(uint64_t id);
void setRedelivered();
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/RecoveredDequeue.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/RecoveredDequeue.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/RecoveredDequeue.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/RecoveredDequeue.h Fri Sep 20 18:59:30 2013
@@ -41,6 +41,8 @@ namespace qpid {
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
+ // TODO aconway 2013-07-08: revisit
+ virtual void callObserver(const boost::shared_ptr<TransactionObserver>&) {}
virtual ~RecoveredDequeue(){}
boost::shared_ptr<Queue> getQueue() const { return queue; }
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h Fri Sep 20 18:59:30 2013
@@ -41,6 +41,8 @@ class RecoveredEnqueue : public TxOp{
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
+ // TODO aconway 2013-07-08: revisit
+ virtual void callObserver(const boost::shared_ptr<TransactionObserver>&) {}
virtual ~RecoveredEnqueue(){}
boost::shared_ptr<Queue> getQueue() const { return queue; }
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Selector.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Selector.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Selector.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Selector.h Fri Sep 20 18:59:30 2013
@@ -64,7 +64,7 @@ public:
* @param msg message to filter against selector
* @return true if msg meets the selector specification
*/
- bool filter(const Message& msg);
+ QPID_BROKER_EXTERN bool filter(const Message& msg);
};
/**
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Sep 20 18:59:30 2013
@@ -31,6 +31,7 @@
#include "qpid/broker/Selector.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SessionOutputException.h"
+#include "qpid/broker/TransactionObserver.h"
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/reply_exceptions.h"
@@ -65,6 +66,7 @@ namespace broker {
using namespace std;
using boost::intrusive_ptr;
+using boost::shared_ptr;
using boost::bind;
using namespace qpid::broker;
using namespace qpid::framing;
@@ -165,13 +167,15 @@ bool SemanticState::cancel(const string&
void SemanticState::startTx()
{
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
+ session.getBroker().getBrokerObservers().startTx(txBuffer);
+ session.startTx(); //just to update statistics
}
void SemanticState::commit(MessageStore* const store)
{
if (!txBuffer) throw
CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
-
+ session.commitTx(); //just to update statistics
TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked)));
txBuffer->enlist(txAck);
if (txBuffer->commitLocal(store)) {
@@ -185,7 +189,7 @@ void SemanticState::rollback()
{
if (!txBuffer)
throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
-
+ session.rollbackTx(); //just to update statistics
txBuffer->rollback();
accumulatedAck.clear();
}
@@ -202,6 +206,7 @@ void SemanticState::startDtx(const std::
}
dtxBuffer.reset(new DtxBuffer(xid));
txBuffer = dtxBuffer;
+ session.getBroker().getBrokerObservers().startDtx(dtxBuffer);
if (join) {
mgr.join(xid, dtxBuffer);
} else {
@@ -432,8 +437,10 @@ bool SemanticStateConsumerImpl::checkCre
SemanticStateConsumerImpl::~SemanticStateConsumerImpl()
{
- if (mgmtObject != 0)
+ if (mgmtObject != 0) {
+ mgmtObject->debugStats("destroying");
mgmtObject->resourceDestroy ();
+ }
}
void SemanticState::disable(ConsumerImpl::shared_ptr c)
@@ -658,9 +665,7 @@ Queue::shared_ptr SemanticState::getQueu
if (name.empty()) {
throw NotAllowedException(QPID_MSG("No queue name specified."));
} else {
- queue = session.getBroker().getQueues().find(name);
- if (!queue)
- throw NotFoundException(QPID_MSG("Queue not found: "<<name));
+ queue = session.getBroker().getQueues().get(name);
}
return queue;
}
@@ -767,7 +772,6 @@ void SemanticState::accepted(const Seque
TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
accumulatedAck.clear();
dtxBuffer->enlist(txAck);
-
//mark the relevant messages as 'ended' in unacked
//if the messages are already completed, they can be
//removed from the record
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Fri Sep 20 18:59:30 2013
@@ -685,9 +685,7 @@ Queue::shared_ptr SessionAdapter::Handle
if (name.empty()) {
throw framing::IllegalArgumentException(QPID_MSG("No queue name specified."));
} else {
- queue = session.getBroker().getQueues().find(name);
- if (!queue)
- throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name));
+ queue = session.getBroker().getQueues().get(name);
}
return queue;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org