You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/10/01 15:30:58 UTC
svn commit: r1528082 - in /qpid/trunk/qpid/cpp/src/qpid/messaging:
ProtocolRegistry.cpp amqp/ConnectionContext.cpp amqp/EncodedMessage.cpp
amqp/SenderContext.cpp
Author: gsim
Date: Tue Oct 1 13:30:58 2013
New Revision: 1528082
URL: http://svn.apache.org/r1528082
Log:
QPID-5198: ensure qpid::Exception does not leak out from qpid::messaging
Modified:
qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp?rev=1528082&r1=1528081&r2=1528082&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp Tue Oct 1 13:30:58 2013
@@ -19,7 +19,7 @@
*
*/
#include "ProtocolRegistry.h"
-#include "qpid/Exception.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/client/amqp0_10/ConnectionImpl.h"
#include "qpid/client/LoadPlugins.h"
#include <map>
@@ -61,7 +61,7 @@ ConnectionImpl* ProtocolRegistry::create
Registry::const_iterator i = theRegistry().find(name.asString());
if (i != theRegistry().end()) return (i->second)(url, stripped);
else if (name.asString() == "amqp0-10") return new qpid::client::amqp0_10::ConnectionImpl(url, stripped);
- else throw qpid::Exception("Unsupported protocol: " + name.asString());
+ else throw MessagingException("Unsupported protocol: " + name.asString());
}
return 0;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1528082&r1=1528081&r2=1528082&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Tue Oct 1 13:30:58 2013
@@ -584,7 +584,7 @@ std::size_t ConnectionContext::decodePla
lock.notifyAll();
return n;
} else if (n == PN_ERR) {
- throw qpid::Exception(QPID_MSG("Error on input: " << getError()));
+ throw MessagingException(QPID_MSG("Error on input: " << getError()));
} else {
return 0;
}
@@ -608,7 +608,7 @@ std::size_t ConnectionContext::encodePla
haveOutput = true;
return n;
} else if (n == PN_ERR) {
- throw qpid::Exception(QPID_MSG("Error on output: " << getError()));
+ throw MessagingException(QPID_MSG("Error on output: " << getError()));
} else if (n == PN_EOS) {
haveOutput = false;
return 0;//Is this right?
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp?rev=1528082&r1=1528081&r2=1528082&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp Tue Oct 1 13:30:58 2013
@@ -20,7 +20,9 @@
*/
#include "qpid/messaging/amqp/EncodedMessage.h"
#include "qpid/messaging/Address.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/MessageImpl.h"
+#include "qpid/Exception.h"
#include "qpid/amqp/Decoder.h"
#include "qpid/amqp/DataBuilder.h"
#include "qpid/amqp/ListBuilder.h"
@@ -100,66 +102,73 @@ const char* EncodedMessage::getData() co
void EncodedMessage::init(qpid::messaging::MessageImpl& impl)
{
- //initial scan of raw data
- qpid::amqp::Decoder decoder(data, size);
- InitialScan reader(*this, impl);
- decoder.read(reader);
- bareMessage = reader.getBareMessage();
- if (bareMessage.data && !bareMessage.size) {
- bareMessage.size = (data + size) - bareMessage.data;
+ try {
+ //initial scan of raw data
+ qpid::amqp::Decoder decoder(data, size);
+ InitialScan reader(*this, impl);
+ decoder.read(reader);
+ bareMessage = reader.getBareMessage();
+ if (bareMessage.data && !bareMessage.size) {
+ bareMessage.size = (data + size) - bareMessage.data;
+ }
+ } catch (const qpid::Exception& e) {
+ throw FetchError(e.what());
}
-
}
void EncodedMessage::setNestAnnotationsOption(bool b) { nestAnnotations = b; }
void EncodedMessage::populate(qpid::types::Variant::Map& map) const
{
- //decode application properties
- if (applicationProperties) {
- qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size);
- decoder.readMap(map);
- }
- //add in 'x-amqp-' prefixed values
- if (!!firstAcquirer) {
- map["x-amqp-first-acquirer"] = firstAcquirer.get();
- }
- if (!!deliveryCount) {
- map["x-amqp-delivery-count"] = deliveryCount.get();
- }
- if (to) {
- map["x-amqp-to"] = to.str();
- }
- if (!!absoluteExpiryTime) {
- map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get();
- }
- if (!!creationTime) {
- map["x-amqp-creation-time"] = creationTime.get();
- }
- if (groupId) {
- map["x-amqp-group-id"] = groupId.str();
- }
- if (!!groupSequence) {
- map["x-amqp-qroup-sequence"] = groupSequence.get();
- }
- if (replyToGroupId) {
- map["x-amqp-reply-to-group-id"] = replyToGroupId.str();
- }
- //add in any annotations
- if (deliveryAnnotations) {
- qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size);
- if (nestAnnotations) {
- map["x-amqp-delivery-annotations"] = decoder.readMap();
- } else {
+ try {
+ //decode application properties
+ if (applicationProperties) {
+ qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size);
decoder.readMap(map);
}
- }
- if (messageAnnotations) {
- qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size);
- if (nestAnnotations) {
- map["x-amqp-message-annotations"] = decoder.readMap();
- } else {
- decoder.readMap(map);
+ //add in 'x-amqp-' prefixed values
+ if (!!firstAcquirer) {
+ map["x-amqp-first-acquirer"] = firstAcquirer.get();
+ }
+ if (!!deliveryCount) {
+ map["x-amqp-delivery-count"] = deliveryCount.get();
+ }
+ if (to) {
+ map["x-amqp-to"] = to.str();
+ }
+ if (!!absoluteExpiryTime) {
+ map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get();
+ }
+ if (!!creationTime) {
+ map["x-amqp-creation-time"] = creationTime.get();
+ }
+ if (groupId) {
+ map["x-amqp-group-id"] = groupId.str();
}
+ if (!!groupSequence) {
+ map["x-amqp-qroup-sequence"] = groupSequence.get();
+ }
+ if (replyToGroupId) {
+ map["x-amqp-reply-to-group-id"] = replyToGroupId.str();
+ }
+ //add in any annotations
+ if (deliveryAnnotations) {
+ qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size);
+ if (nestAnnotations) {
+ map["x-amqp-delivery-annotations"] = decoder.readMap();
+ } else {
+ decoder.readMap(map);
+ }
+ }
+ if (messageAnnotations) {
+ qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size);
+ if (nestAnnotations) {
+ map["x-amqp-message-annotations"] = decoder.readMap();
+ } else {
+ decoder.readMap(map);
+ }
+ }
+ } catch (const qpid::Exception& e) {
+ throw FetchError(e.what());
}
}
qpid::amqp::CharSequence EncodedMessage::getBareMessage() const
@@ -201,35 +210,39 @@ void EncodedMessage::getCorrelationId(st
}
void EncodedMessage::getBody(std::string& raw, qpid::types::Variant& c) const
{
- if (!content.isVoid()) {
- c = content;//integer types, floats, bool etc
- //TODO: populate raw data?
- } else {
- if (bodyType.empty()
- || bodyType == qpid::amqp::typecodes::BINARY_NAME
- || bodyType == qpid::types::encodings::UTF8
- || bodyType == qpid::types::encodings::ASCII)
- {
- c = std::string(body.data, body.size);
- c.setEncoding(bodyType);
- } else if (bodyType == qpid::amqp::typecodes::LIST_NAME) {
- qpid::amqp::ListBuilder builder;
- qpid::amqp::Decoder decoder(body.data, body.size);
- decoder.read(builder);
- c = builder.getList();
- raw.assign(body.data, body.size);
- } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) {
- qpid::amqp::DataBuilder builder = qpid::amqp::DataBuilder(qpid::types::Variant::Map());
- qpid::amqp::Decoder decoder(body.data, body.size);
- decoder.read(builder);
- c = builder.getValue().asMap();
- raw.assign(body.data, body.size);
- } else if (bodyType == qpid::amqp::typecodes::UUID_NAME) {
- if (body.size == qpid::types::Uuid::SIZE) c = qpid::types::Uuid(body.data);
- raw.assign(body.data, body.size);
- } else if (bodyType == qpid::amqp::typecodes::ARRAY_NAME) {
- raw.assign(body.data, body.size);
+ try {
+ if (!content.isVoid()) {
+ c = content;//integer types, floats, bool etc
+ //TODO: populate raw data?
+ } else {
+ if (bodyType.empty()
+ || bodyType == qpid::amqp::typecodes::BINARY_NAME
+ || bodyType == qpid::types::encodings::UTF8
+ || bodyType == qpid::types::encodings::ASCII)
+ {
+ c = std::string(body.data, body.size);
+ c.setEncoding(bodyType);
+ } else if (bodyType == qpid::amqp::typecodes::LIST_NAME) {
+ qpid::amqp::ListBuilder builder;
+ qpid::amqp::Decoder decoder(body.data, body.size);
+ decoder.read(builder);
+ c = builder.getList();
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) {
+ qpid::amqp::DataBuilder builder = qpid::amqp::DataBuilder(qpid::types::Variant::Map());
+ qpid::amqp::Decoder decoder(body.data, body.size);
+ decoder.read(builder);
+ c = builder.getValue().asMap();
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::UUID_NAME) {
+ if (body.size == qpid::types::Uuid::SIZE) c = qpid::types::Uuid(body.data);
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::ARRAY_NAME) {
+ raw.assign(body.data, body.size);
+ }
}
+ } catch (const qpid::Exception& e) {
+ throw FetchError(e.what());
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1528082&r1=1528081&r2=1528082&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Tue Oct 1 13:30:58 2013
@@ -21,6 +21,8 @@
#include "qpid/messaging/amqp/SenderContext.h"
#include "qpid/messaging/amqp/EncodedMessage.h"
#include "qpid/messaging/AddressImpl.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/Exception.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/amqp/MapHandler.h"
#include "qpid/amqp/MessageEncoder.h"
@@ -435,59 +437,63 @@ void SenderContext::Delivery::reset()
void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
{
- boost::shared_ptr<const EncodedMessage> original = msg.getEncoded();
+ try {
+ boost::shared_ptr<const EncodedMessage> original = msg.getEncoded();
- if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered
- //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received?
- if (original->hasHeaderChanged(msg)) {
- //since as yet have no annotations, just write the revised header then the rest of the message as received
- encoded.resize(16/*max header size*/ + original->getBareMessage().size);
- qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
+ if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered
+ //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received?
+ if (original->hasHeaderChanged(msg)) {
+ //since as yet have no annotations, just write the revised header then the rest of the message as received
+ encoded.resize(16/*max header size*/ + original->getBareMessage().size);
+ qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
+ HeaderAdapter header(msg);
+ encoder.writeHeader(header);
+ ::memcpy(encoded.getData() + encoder.getPosition(), original->getBareMessage().data, original->getBareMessage().size);
+ } else {
+ //since as yet have no annotations, if the header hasn't
+ //changed and we still have the original bare message, can
+ //send the entire content as is
+ encoded.resize(original->getSize());
+ ::memcpy(encoded.getData(), original->getData(), original->getSize());
+ }
+ } else {
HeaderAdapter header(msg);
+ PropertiesAdapter properties(msg, address.getSubject());
+ ApplicationPropertiesAdapter applicationProperties(msg.getHeaders());
+ //compute size:
+ size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header)
+ + qpid::amqp::MessageEncoder::getEncodedSize(properties)
+ + qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties);
+ if (msg.getContent().isVoid()) {
+ contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes());
+ } else {
+ contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/;
+ }
+ encoded.resize(contentSize);
+ QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")
+ qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
+ //write header:
encoder.writeHeader(header);
- ::memcpy(encoded.getData() + encoder.getPosition(), original->getBareMessage().data, original->getBareMessage().size);
- } else {
- //since as yet have no annotations, if the header hasn't
- //changed and we still have the original bare message, can
- //send the entire content as is
- encoded.resize(original->getSize());
- ::memcpy(encoded.getData(), original->getData(), original->getSize());
- }
- } else {
- HeaderAdapter header(msg);
- PropertiesAdapter properties(msg, address.getSubject());
- ApplicationPropertiesAdapter applicationProperties(msg.getHeaders());
- //compute size:
- size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header)
- + qpid::amqp::MessageEncoder::getEncodedSize(properties)
- + qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties);
- if (msg.getContent().isVoid()) {
- contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes());
- } else {
- contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/;
- }
- encoded.resize(contentSize);
- QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")
- qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
- //write header:
- encoder.writeHeader(header);
- //write delivery-annotations, write message-annotations (none yet supported)
- //write properties
- encoder.writeProperties(properties);
- //write application-properties
- encoder.writeApplicationProperties(applicationProperties);
- //write body
- if (!msg.getContent().isVoid()) {
- //write as AmqpValue
- encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE);
- } else if (msg.getBytes().size()) {
- encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
- }
- if (encoder.getPosition() < encoded.getSize()) {
- QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition());
- encoded.trim(encoder.getPosition());
+ //write delivery-annotations, write message-annotations (none yet supported)
+ //write properties
+ encoder.writeProperties(properties);
+ //write application-properties
+ encoder.writeApplicationProperties(applicationProperties);
+ //write body
+ if (!msg.getContent().isVoid()) {
+ //write as AmqpValue
+ encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE);
+ } else if (msg.getBytes().size()) {
+ encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
+ }
+ if (encoder.getPosition() < encoded.getSize()) {
+ QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition());
+ encoded.trim(encoder.getPosition());
+ }
+ //write footer (no annotations yet supported)
}
- //write footer (no annotations yet supported)
+ } catch (const qpid::Exception& e) {
+ throw SendError(e.what());
}
}
void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org