You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2010/02/02 12:09:15 UTC

svn commit: r905579 - in /qpid/trunk/qpid/cpp: include/qpid/messaging/ src/qpid/client/amqp0_10/ src/qpid/messaging/ src/tests/

Author: gsim
Date: Tue Feb  2 11:09:11 2010
New Revision: 905579

URL: http://svn.apache.org/viewvc?rev=905579&view=rev
Log:
QPID-2380: recognise reliability option for sender (also added capacity to qpid_send test client and fixed handling of empty option string)

Modified:
    qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
    qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h?rev=905579&r1=905578&r2=905579&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h Tue Feb  2 11:09:11 2010
@@ -120,7 +120,7 @@
  * exactly-once (the latter is not yet correctly supported).</td></tr>
  * 
  * <tr valign=top><td>filter</td><td>(only relevant for topics at present) allows bindings to
- * be created for the queue that match the given criteris (or list of
+ * be created for the queue that match the given criteria (or list of
  * criteria).</td></tr>
  * 
  * <tr valign=top><td>x-properties</td><td>allows protocol or implementation specific options

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=905579&r1=905578&r2=905579&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Tue Feb  2 11:09:11 2010
@@ -265,12 +265,12 @@
     return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER));
 }
 
-bool is_unreliable(const Address& address)
+bool AddressResolution::is_unreliable(const Address& address)
 {
     return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
 }
 
-bool is_reliable(const Address& address)
+bool AddressResolution::is_reliable(const Address& address)
 {
     return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
 }
@@ -346,7 +346,7 @@
 
 QueueSource::QueueSource(const Address& address) :
     Queue(address),
-    acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
+    acceptMode(AddressResolution::is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
     acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
     exclusive(false)
 {
@@ -393,7 +393,7 @@
 Subscription::Subscription(const Address& address, const std::string& exchangeType)
     : Exchange(address),
       queue(getSubscriptionName(name, address.getOption(NAME))),
-      reliable(is_reliable(address)),
+      reliable(AddressResolution::is_reliable(address)),
       durable(address.getOption(DURABLE_SUBSCRIPTION).asBool())
 {
     if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h?rev=905579&r1=905578&r2=905579&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h Tue Feb  2 11:09:11 2010
@@ -56,7 +56,8 @@
 
     static qpid::messaging::Address convert(const qpid::framing::ReplyTo&);
     static qpid::framing::ReplyTo convert(const qpid::messaging::Address&);
-
+    static bool is_unreliable(const qpid::messaging::Address& address);
+    static bool is_reliable(const qpid::messaging::Address& address);
   private:
 };
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=905579&r1=905578&r2=905579&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Tue Feb  2 11:09:11 2010
@@ -32,12 +32,17 @@
 SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, 
                        const qpid::messaging::Address& _address) : 
     parent(_parent), name(_name), address(_address), state(UNRESOLVED),
-    capacity(50), window(0), flushed(false) {}
+    capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {}
 
 void SenderImpl::send(const qpid::messaging::Message& message) 
 {
-    Send f(*this, &message);
-    while (f.repeat) parent.execute(f);
+    if (unreliable) {
+        UnreliableSend f(*this, &message);
+        parent.execute(f);
+    } else {
+        Send f(*this, &message);
+        while (f.repeat) parent.execute(f);
+    }
 }
 
 void SenderImpl::close()
@@ -78,7 +83,7 @@
 void SenderImpl::waitForCapacity() 
 {
     //TODO: add option to throw exception rather than blocking?
-    if (capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) {
+    if (!unreliable && capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) {
         //Initial implementation is very basic. As outgoing is
         //currently only reduced on receiving completions and we are
         //blocking anyway we may as well sync(). If successful that
@@ -93,9 +98,8 @@
     }
 }
 
-void SenderImpl::sendImpl(const qpid::messaging::Message& m) 
+void SenderImpl::sendImpl(const qpid::messaging::Message& m)
 {
-    //TODO: make recording for replay optional (would still want to track completion however)
     std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
     msg->convert(m);
     msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
@@ -103,6 +107,14 @@
     sink->send(session, name, outgoing.back());
 }
 
+void SenderImpl::sendUnreliable(const qpid::messaging::Message& m)
+{
+    OutgoingMessage msg;
+    msg.convert(m);
+    msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
+    sink->send(session, name, msg);
+}
+
 void SenderImpl::replay()
 {
     for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=905579&r1=905578&r2=905579&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Tue Feb  2 11:09:11 2010
@@ -73,6 +73,7 @@
     uint32_t capacity;
     uint32_t window;
     bool flushed;
+    const bool unreliable;
 
     uint32_t checkPendingSends(bool flush);
     void replay();
@@ -80,6 +81,7 @@
 
     //logic for application visible methods:
     void sendImpl(const qpid::messaging::Message&);
+    void sendUnreliable(const qpid::messaging::Message&);
     void closeImpl();
 
 
@@ -108,6 +110,21 @@
         }
     };
 
+    struct UnreliableSend : Command
+    {
+        const qpid::messaging::Message* message;
+
+        UnreliableSend(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
+        void operator()() 
+        {
+            //TODO: ideally want to put messages on the outbound
+            //queue and pull them off in io thread, but the old
+            //0-10 client doesn't support that option so for now
+            //we simply don't queue unreliable messages
+            impl.sendUnreliable(*message);                
+        }
+    };
+
     struct Close : Command
     {
         Close(SenderImpl& i) : Command(i) {}

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp?rev=905579&r1=905578&r2=905579&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp Tue Feb  2 11:09:11 2010
@@ -47,7 +47,7 @@
 { 
     Variant::Map options;
     AddressParser parser(o);
-    if (parser.parseMap(options)) {
+    if (o.empty() || parser.parseMap(options)) {
         PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(options));
     } else {
         throw InvalidOptionString(o);

Modified: qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp?rev=905579&r1=905578&r2=905579&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp Tue Feb  2 11:09:11 2010
@@ -63,6 +63,7 @@
     std::string content;
     uint tx;
     uint rollbackFrequency;
+    uint capacity;
     qpid::log::Options log;
 
     Options(const std::string& argv0=std::string())
@@ -76,6 +77,7 @@
           ttl(0),
           tx(0),
           rollbackFrequency(0),
+          capacity(0),
           log(argv0)
     {
         addOptions()
@@ -94,6 +96,7 @@
             ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
             ("user-id", qpid::optValue(userid, "USERID"), "userid for message")
             ("content", qpid::optValue(content, "CONTENT"), "specify textual content")
+            ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue")
             ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
             ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
             ("help", qpid::optValue(help), "print this usage statement");
@@ -184,6 +187,7 @@
             connection.open(opts.url);
             Session session = connection.newSession(opts.tx > 0);
             Sender sender = session.createSender(opts.address);
+            if (opts.capacity) sender.setCapacity(opts.capacity);
             Message msg;
             msg.setDurable(opts.durable);
             if (opts.ttl) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org