You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2010/02/02 12:09:15 UTC
svn commit: r905579 - in /qpid/trunk/qpid/cpp: include/qpid/messaging/
src/qpid/client/amqp0_10/ src/qpid/messaging/ src/tests/
Author: gsim
Date: Tue Feb 2 11:09:11 2010
New Revision: 905579
URL: http://svn.apache.org/viewvc?rev=905579&view=rev
Log:
QPID-2380: recognise reliability option for sender (also added capacity to qpid_send test client and fixed handling of empty option string)
Modified:
qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h?rev=905579&r1=905578&r2=905579&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h Tue Feb 2 11:09:11 2010
@@ -120,7 +120,7 @@
* exactly-once (the latter is not yet correctly supported).</td></tr>
*
* <tr valign=top><td>filter</td><td>(only relevant for topics at present) allows bindings to
- * be created for the queue that match the given criteris (or list of
+ * be created for the queue that match the given criteria (or list of
* criteria).</td></tr>
*
* <tr valign=top><td>x-properties</td><td>allows protocol or implementation specific options
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=905579&r1=905578&r2=905579&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Tue Feb 2 11:09:11 2010
@@ -265,12 +265,12 @@
return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER));
}
-bool is_unreliable(const Address& address)
+bool AddressResolution::is_unreliable(const Address& address)
{
return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
}
-bool is_reliable(const Address& address)
+bool AddressResolution::is_reliable(const Address& address)
{
return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
}
@@ -346,7 +346,7 @@
QueueSource::QueueSource(const Address& address) :
Queue(address),
- acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
+ acceptMode(AddressResolution::is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
exclusive(false)
{
@@ -393,7 +393,7 @@
Subscription::Subscription(const Address& address, const std::string& exchangeType)
: Exchange(address),
queue(getSubscriptionName(name, address.getOption(NAME))),
- reliable(is_reliable(address)),
+ reliable(AddressResolution::is_reliable(address)),
durable(address.getOption(DURABLE_SUBSCRIPTION).asBool())
{
if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1);
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h?rev=905579&r1=905578&r2=905579&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h Tue Feb 2 11:09:11 2010
@@ -56,7 +56,8 @@
static qpid::messaging::Address convert(const qpid::framing::ReplyTo&);
static qpid::framing::ReplyTo convert(const qpid::messaging::Address&);
-
+ static bool is_unreliable(const qpid::messaging::Address& address);
+ static bool is_reliable(const qpid::messaging::Address& address);
private:
};
}}} // namespace qpid::client::amqp0_10
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=905579&r1=905578&r2=905579&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Tue Feb 2 11:09:11 2010
@@ -32,12 +32,17 @@
SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
const qpid::messaging::Address& _address) :
parent(_parent), name(_name), address(_address), state(UNRESOLVED),
- capacity(50), window(0), flushed(false) {}
+ capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {}
void SenderImpl::send(const qpid::messaging::Message& message)
{
- Send f(*this, &message);
- while (f.repeat) parent.execute(f);
+ if (unreliable) {
+ UnreliableSend f(*this, &message);
+ parent.execute(f);
+ } else {
+ Send f(*this, &message);
+ while (f.repeat) parent.execute(f);
+ }
}
void SenderImpl::close()
@@ -78,7 +83,7 @@
void SenderImpl::waitForCapacity()
{
//TODO: add option to throw exception rather than blocking?
- if (capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) {
+ if (!unreliable && capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) {
//Initial implementation is very basic. As outgoing is
//currently only reduced on receiving completions and we are
//blocking anyway we may as well sync(). If successful that
@@ -93,9 +98,8 @@
}
}
-void SenderImpl::sendImpl(const qpid::messaging::Message& m)
+void SenderImpl::sendImpl(const qpid::messaging::Message& m)
{
- //TODO: make recording for replay optional (would still want to track completion however)
std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
msg->convert(m);
msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
@@ -103,6 +107,14 @@
sink->send(session, name, outgoing.back());
}
+void SenderImpl::sendUnreliable(const qpid::messaging::Message& m)
+{
+ OutgoingMessage msg;
+ msg.convert(m);
+ msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
+ sink->send(session, name, msg);
+}
+
void SenderImpl::replay()
{
for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=905579&r1=905578&r2=905579&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Tue Feb 2 11:09:11 2010
@@ -73,6 +73,7 @@
uint32_t capacity;
uint32_t window;
bool flushed;
+ const bool unreliable;
uint32_t checkPendingSends(bool flush);
void replay();
@@ -80,6 +81,7 @@
//logic for application visible methods:
void sendImpl(const qpid::messaging::Message&);
+ void sendUnreliable(const qpid::messaging::Message&);
void closeImpl();
@@ -108,6 +110,21 @@
}
};
+ struct UnreliableSend : Command
+ {
+ const qpid::messaging::Message* message;
+
+ UnreliableSend(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
+ void operator()()
+ {
+ //TODO: ideally want to put messages on the outbound
+ //queue and pull them off in io thread, but the old
+ //0-10 client doesn't support that option so for now
+ //we simply don't queue unreliable messages
+ impl.sendUnreliable(*message);
+ }
+ };
+
struct Close : Command
{
Close(SenderImpl& i) : Command(i) {}
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp?rev=905579&r1=905578&r2=905579&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp Tue Feb 2 11:09:11 2010
@@ -47,7 +47,7 @@
{
Variant::Map options;
AddressParser parser(o);
- if (parser.parseMap(options)) {
+ if (o.empty() || parser.parseMap(options)) {
PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(options));
} else {
throw InvalidOptionString(o);
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp?rev=905579&r1=905578&r2=905579&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp Tue Feb 2 11:09:11 2010
@@ -63,6 +63,7 @@
std::string content;
uint tx;
uint rollbackFrequency;
+ uint capacity;
qpid::log::Options log;
Options(const std::string& argv0=std::string())
@@ -76,6 +77,7 @@
ttl(0),
tx(0),
rollbackFrequency(0),
+ capacity(0),
log(argv0)
{
addOptions()
@@ -94,6 +96,7 @@
("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
("user-id", qpid::optValue(userid, "USERID"), "userid for message")
("content", qpid::optValue(content, "CONTENT"), "specify textual content")
+ ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue")
("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
("help", qpid::optValue(help), "print this usage statement");
@@ -184,6 +187,7 @@
connection.open(opts.url);
Session session = connection.newSession(opts.tx > 0);
Sender sender = session.createSender(opts.address);
+ if (opts.capacity) sender.setCapacity(opts.capacity);
Message msg;
msg.setDurable(opts.durable);
if (opts.ttl) {
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org