You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/01/22 16:57:30 UTC

svn commit: r1560394 - in /qpid/trunk/qpid/cpp/src/qpid/messaging/amqp: ConnectionContext.cpp ConnectionContext.h ReceiverContext.cpp ReceiverContext.h SessionContext.cpp SessionContext.h SessionHandle.cpp

Author: gsim
Date: Wed Jan 22 15:57:29 2014
New Revision: 1560394

URL: http://svn.apache.org/r1560394
Log:
QPID-5503 implement nextReceiver()

Modified:
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1560394&r1=1560393&r2=1560394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Wed Jan 22 15:57:29 2014
@@ -280,6 +280,23 @@ bool ConnectionContext::get(boost::share
     return false;
 }
 
+boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout)
+{
+    qpid::sys::AbsTime until(convert(timeout));
+    while (true) {
+        qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+        checkClosed(ssn);
+        boost::shared_ptr<ReceiverContext> r = ssn->nextReceiver();
+        if (r) {
+            return r;
+        } else if (until > qpid::sys::now()) {
+            waitUntil(ssn, until);
+        } else {
+            return boost::shared_ptr<ReceiverContext>();
+        }
+    }
+}
+
 void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1560394&r1=1560393&r2=1560394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Wed Jan 22 15:57:29 2014
@@ -85,6 +85,7 @@ class ConnectionContext : public qpid::s
     void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
     void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject);
     void sync(boost::shared_ptr<SessionContext> ssn);
+    boost::shared_ptr<ReceiverContext> nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout);
 
     void setOption(const std::string& name, const qpid::types::Variant& value);
     std::string getAuthenticatedUsername();

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1560394&r1=1560393&r2=1560394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Wed Jan 22 15:57:29 2014
@@ -129,4 +129,9 @@ void ReceiverContext::reset(pn_session_t
     configure();
 }
 
+bool ReceiverContext::hasCurrent()
+{
+    return pn_link_current(receiver);
+}
+
 }}} // namespace qpid::messaging::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1560394&r1=1560393&r2=1560394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h Wed Jan 22 15:57:29 2014
@@ -59,6 +59,7 @@ class ReceiverContext
     void configure();
     void verify();
     Address getAddress() const;
+    bool hasCurrent();
   private:
     friend class ConnectionContext;
     const std::string name;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1560394&r1=1560393&r2=1560394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Wed Jan 22 15:57:29 2014
@@ -89,8 +89,14 @@ void SessionContext::removeSender(const 
     senders.erase(n);
 }
 
-boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/)
+boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver()
 {
+    for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+        if (i->second->hasCurrent()) {
+            return i->second;
+        }
+    }
+
     return boost::shared_ptr<ReceiverContext>();
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1560394&r1=1560393&r2=1560394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Wed Jan 22 15:57:29 2014
@@ -57,7 +57,7 @@ class SessionContext
     boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const;
     void removeReceiver(const std::string&);
     void removeSender(const std::string&);
-    boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout);
+    boost::shared_ptr<ReceiverContext> nextReceiver();
     uint32_t getReceivable();
     uint32_t getUnsettledAcks();
     bool settled();

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp?rev=1560394&r1=1560393&r2=1560394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp Wed Jan 22 15:57:29 2014
@@ -108,7 +108,7 @@ qpid::messaging::Receiver SessionHandle:
 
 bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout)
 {
-    boost::shared_ptr<ReceiverContext> r = session->nextReceiver(timeout);
+    boost::shared_ptr<ReceiverContext> r = connection->nextReceiver(session, timeout);
     if (r) {
         //TODO: cache handles in this case to avoid frequent allocation
         receiver = qpid::messaging::Receiver(new ReceiverHandle(connection, session, r));



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org