You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2010/03/29 21:21:26 UTC
svn commit: r928878 - in /qpid/trunk/qpid/cpp: examples/messaging/
include/qpid/messaging/ src/qpid/client/amqp0_10/ src/qpid/messaging/
src/tests/
Author: gsim
Date: Mon Mar 29 19:21:26 2010
New Revision: 928878
URL: http://svn.apache.org/viewvc?rev=928878&view=rev
Log:
QPID-664: renamed headers as properties (to match python); added priority
Modified:
qpid/trunk/qpid/cpp/examples/messaging/drain.cpp
qpid/trunk/qpid/cpp/examples/messaging/spout.cpp
qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h
qpid/trunk/qpid/cpp/src/tests/ClientMessage.cpp
qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp
qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp
qpid/trunk/qpid/cpp/src/tests/qpid_stream.cpp
Modified: qpid/trunk/qpid/cpp/examples/messaging/drain.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/drain.cpp?rev=928878&r1=928877&r2=928878&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/drain.cpp (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/drain.cpp Mon Mar 29 19:21:26 2010
@@ -102,7 +102,7 @@ int main(int argc, char** argv)
Duration timeout = options.getTimeout();
Message message;
while (receiver.fetch(message, timeout)) {
- std::cout << "Message(properties=" << message.getHeaders() << ", content='" ;
+ std::cout << "Message(properties=" << message.getProperties() << ", content='" ;
if (message.getContentType() == "amqp/map") {
std::cout << MapView(message);
} else {
Modified: qpid/trunk/qpid/cpp/examples/messaging/spout.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/spout.cpp?rev=928878&r1=928877&r2=928878&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/spout.cpp (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/spout.cpp Mon Mar 29 19:21:26 2010
@@ -125,9 +125,9 @@ struct Options : public qpid::Options
std::string name;
std::string value;
if (nameval(property, name, value)) {
- message.getHeaders()[name] = value;
+ message.getProperties()[name] = value;
} else {
- message.getHeaders()[name] = Variant();
+ message.getProperties()[name] = Variant();
}
}
@@ -177,7 +177,7 @@ int main(int argc, char** argv)
for (uint count = 0; (count < options.count || options.count == 0) && end > now(); count++) {
if (!options.replyto.empty()) message.setReplyTo(Address(options.replyto));
std::string id = options.id.empty() ? Uuid(true).str() : options.id;
- message.getHeaders()["spout-id"] = (boost::format("%1%:%2%") % id % count).str();
+ message.getProperties()["spout-id"] = (boost::format("%1%:%2%") % id % count).str();
sender.send(message);
}
connection.close();
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h?rev=928878&r1=928877&r2=928878&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h Mon Mar 29 19:21:26 2010
@@ -65,6 +65,9 @@ class Message
QPID_CLIENT_EXTERN void setCorrelationId(const std::string&);
QPID_CLIENT_EXTERN const std::string& getCorrelationId() const;
+ QPID_CLIENT_EXTERN void setPriority(uint8_t);
+ QPID_CLIENT_EXTERN uint8_t getPriority() const;
+
/**
* Set the time to live for this message in milliseconds.
*/
@@ -77,10 +80,11 @@ class Message
QPID_CLIENT_EXTERN void setDurable(bool durable);
QPID_CLIENT_EXTERN bool getDurable() const;
- QPID_CLIENT_EXTERN bool isRedelivered() const;
+ QPID_CLIENT_EXTERN bool getRedelivered() const;
+ QPID_CLIENT_EXTERN void setRedelivered(bool);
- QPID_CLIENT_EXTERN const qpid::types::Variant::Map& getHeaders() const;
- QPID_CLIENT_EXTERN qpid::types::Variant::Map& getHeaders();
+ QPID_CLIENT_EXTERN const qpid::types::Variant::Map& getProperties() const;
+ QPID_CLIENT_EXTERN qpid::types::Variant::Map& getProperties();
QPID_CLIENT_EXTERN const std::string& getContent() const;
QPID_CLIENT_EXTERN std::string& getContent();
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp?rev=928878&r1=928877&r2=928878&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp Mon Mar 29 19:21:26 2010
@@ -56,7 +56,7 @@ struct FailoverUpdatesImpl : qpid::sys::
try {
Message message;
while (!quit && receiver.fetch(message)) {
- connection.setOption("urls", message.getHeaders()["amq.failover"]);
+ connection.setOption("urls", message.getProperties()["amq.failover"]);
session.acknowledge();
}
} catch (const qpid::TransportFailure& e) {
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=928878&r1=928877&r2=928878&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Mon Mar 29 19:21:26 2010
@@ -279,7 +279,8 @@ void populateHeaders(qpid::messaging::Me
if (deliveryProperties) {
message.setTtl(qpid::messaging::Duration(deliveryProperties->getTtl()));
message.setDurable(deliveryProperties->getDeliveryMode() == DELIVERY_MODE_PERSISTENT);
- MessageImplAccess::get(message).redelivered = deliveryProperties->getRedelivered();
+ message.setPriority(deliveryProperties->getPriority());
+ message.setRedelivered(deliveryProperties->getRedelivered());
}
if (messageProperties) {
message.setContentType(messageProperties->getContentType());
@@ -287,8 +288,8 @@ void populateHeaders(qpid::messaging::Me
message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo()));
}
message.setSubject(messageProperties->getApplicationHeaders().getAsString(SUBJECT));
- message.getHeaders().clear();
- translate(messageProperties->getApplicationHeaders(), message.getHeaders());
+ message.getProperties().clear();
+ translate(messageProperties->getApplicationHeaders(), message.getProperties());
message.setCorrelationId(messageProperties->getCorrelationId());
message.setUserId(messageProperties->getUserId());
}
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=928878&r1=928877&r2=928878&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Mon Mar 29 19:21:26 2010
@@ -46,12 +46,15 @@ void OutgoingMessage::convert(const qpid
if (address) {
message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
}
- translate(from.getHeaders(), message.getMessageProperties().getApplicationHeaders());
+ translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
if (from.getDurable()) {
message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT);
}
-
+ if (from.getRedelivered()) {
+ message.getDeliveryProperties().setRedelivered(true);
+ }
+ if (from.getPriority()) message.getDeliveryProperties().setPriority(from.getPriority());
}
namespace {
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp?rev=928878&r1=928877&r2=928878&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp Mon Mar 29 19:21:26 2010
@@ -52,16 +52,20 @@ const std::string& Message::getUserId()
void Message::setCorrelationId(const std::string& id) { impl->correlationId = id; }
const std::string& Message::getCorrelationId() const { return impl->correlationId; }
+uint8_t Message::getPriority() const { return impl->priority; }
+void Message::setPriority(uint8_t priority) { impl->priority = priority; }
+
void Message::setTtl(Duration ttl) { impl->ttl = ttl.getMilliseconds(); }
Duration Message::getTtl() const { return Duration(impl->ttl); }
void Message::setDurable(bool durable) { impl->durable = durable; }
bool Message::getDurable() const { return impl->durable; }
-bool Message::isRedelivered() const { return impl->redelivered; }
+bool Message::getRedelivered() const { return impl->redelivered; }
+void Message::setRedelivered(bool redelivered) { impl->redelivered = redelivered; }
-const VariantMap& Message::getHeaders() const { return impl->getHeaders(); }
-VariantMap& Message::getHeaders() { return impl->getHeaders(); }
+const VariantMap& Message::getProperties() const { return impl->getHeaders(); }
+VariantMap& Message::getProperties() { return impl->getHeaders(); }
void Message::setContent(const std::string& c) { impl->setBytes(c); }
void Message::setContent(const char* chars, size_t count) { impl->setBytes(chars, count); }
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp?rev=928878&r1=928877&r2=928878&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp Mon Mar 29 19:21:26 2010
@@ -31,12 +31,14 @@ const std::string EMPTY_STRING = "";
using namespace qpid::types;
MessageImpl::MessageImpl(const std::string& c) :
+ priority(0),
ttl(0),
durable(false),
redelivered(false),
bytes(c),
internalId(0) {}
MessageImpl::MessageImpl(const char* chars, size_t count) :
+ priority(0),
ttl(0),
durable (false),
redelivered(false),
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h?rev=928878&r1=928877&r2=928878&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h Mon Mar 29 19:21:26 2010
@@ -36,6 +36,7 @@ struct MessageImpl
std::string messageId;
std::string userId;
std::string correlationId;
+ uint8_t priority;
uint64_t ttl;
bool durable;
bool redelivered;
Modified: qpid/trunk/qpid/cpp/src/tests/ClientMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClientMessage.cpp?rev=928878&r1=928877&r2=928878&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClientMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClientMessage.cpp Mon Mar 29 19:21:26 2010
@@ -34,11 +34,11 @@ QPID_AUTO_TEST_CASE(testCopyConstructor)
{
Message m("my-data");
m.setSubject("my-subject");
- m.getHeaders()["a"] = "ABC";
+ m.getProperties()["a"] = "ABC";
Message c(m);
BOOST_CHECK_EQUAL(m.getContent(), c.getContent());
BOOST_CHECK_EQUAL(m.getSubject(), c.getSubject());
- BOOST_CHECK_EQUAL(m.getHeaders()["a"], c.getHeaders()["a"]);
+ BOOST_CHECK_EQUAL(m.getProperties()["a"], c.getProperties()["a"]);
}
QPID_AUTO_TEST_SUITE_END()
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=928878&r1=928877&r2=928878&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Mar 29 19:21:26 2010
@@ -241,7 +241,7 @@ QPID_AUTO_TEST_CASE(testSendReceiveHeade
Sender sender = fix.session.createSender(fix.queue);
Message out("test-message");
for (uint i = 0; i < 10; ++i) {
- out.getHeaders()["a"] = i;
+ out.getProperties()["a"] = i;
sender.send(out);
}
Receiver receiver = fix.session.createReceiver(fix.queue);
@@ -249,7 +249,7 @@ QPID_AUTO_TEST_CASE(testSendReceiveHeade
for (uint i = 0; i < 10; ++i) {
BOOST_CHECK(receiver.fetch(in, Duration::SECOND * 5));
BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
- BOOST_CHECK_EQUAL(in.getHeaders()["a"].asUint32(), i);
+ BOOST_CHECK_EQUAL(in.getProperties()["a"].asUint32(), i);
fix.session.acknowledge();
}
}
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp?rev=928878&r1=928877&r2=928878&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp Mon Mar 29 19:21:26 2010
@@ -131,7 +131,7 @@ class SequenceTracker
bool isDuplicate(Message& message)
{
- uint sn = message.getHeaders()["sn"];
+ uint sn = message.getProperties()["sn"];
if (lastSn < sn) {
lastSn = sn;
return false;
@@ -175,8 +175,8 @@ int main(int argc, char ** argv)
if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl;
if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl;
if (msg.getDurable()) std::cout << "Durable: true" << std::endl;
- if (msg.isRedelivered()) std::cout << "Redelivered: true" << std::endl;
- std::cout << "Headers: " << msg.getHeaders() << std::endl;
+ if (msg.getRedelivered()) std::cout << "Redelivered: true" << std::endl;
+ std::cout << "Properties: " << msg.getProperties() << std::endl;
std::cout << std::endl;
}
std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp?rev=928878&r1=928877&r2=928878&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp Mon Mar 29 19:21:26 2010
@@ -145,9 +145,9 @@ struct Options : public qpid::Options
std::string name;
std::string value;
if (nameval(property, name, value)) {
- message.getHeaders()[name] = value;
+ message.getProperties()[name] = value;
} else {
- message.getHeaders()[name] = Variant();
+ message.getProperties()[name] = Variant();
}
}
@@ -203,7 +203,7 @@ int main(int argc, char ** argv)
uint txCount = 0;
while (getline(std::cin, content)) {
msg.setContent(content);
- msg.getHeaders()["sn"] = ++sent;
+ msg.getProperties()["sn"] = ++sent;
sender.send(msg);
if (opts.tx && (sent % opts.tx == 0)) {
if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
@@ -214,7 +214,7 @@ int main(int argc, char ** argv)
}
}
for (uint i = opts.sendEos; i > 0; --i) {
- msg.getHeaders()["sn"] = ++sent;
+ msg.getProperties()["sn"] = ++sent;
msg.setContent(EOS);//TODO: add in ability to send digest or similar
sender.send(msg);
}
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_stream.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_stream.cpp?rev=928878&r1=928877&r2=928878&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_stream.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_stream.cpp Mon Mar 29 19:21:26 2010
@@ -119,7 +119,7 @@ struct Publish : Client
qpid::sys::AbsTime start = qpid::sys::now();
while (true) {
qpid::sys::AbsTime sentAt = qpid::sys::now();
- msg.getHeaders()[TIMESTAMP] = timestamp(sentAt);
+ msg.getProperties()[TIMESTAMP] = timestamp(sentAt);
sender.send(msg);
++sent;
qpid::sys::AbsTime waitTill(start, sent*interval);
@@ -151,7 +151,7 @@ struct Consume : Client
}
//calculate latency
uint64_t receivedAt = timestamp(qpid::sys::now());
- uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64();
+ uint64_t sentAt = msg.getProperties()[TIMESTAMP].asUint64();
double latency = ((double) (receivedAt - sentAt)) / qpid::sys::TIME_MSEC;
//update avg, min & max
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org