You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/01/22 16:57:30 UTC
svn commit: r1560394 - in /qpid/trunk/qpid/cpp/src/qpid/messaging/amqp:
ConnectionContext.cpp ConnectionContext.h ReceiverContext.cpp
ReceiverContext.h SessionContext.cpp SessionContext.h SessionHandle.cpp
Author: gsim
Date: Wed Jan 22 15:57:29 2014
New Revision: 1560394
URL: http://svn.apache.org/r1560394
Log:
QPID-5503 implement nextReceiver()
Modified:
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1560394&r1=1560393&r2=1560394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Wed Jan 22 15:57:29 2014
@@ -280,6 +280,23 @@ bool ConnectionContext::get(boost::share
return false;
}
+boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout)
+{
+ qpid::sys::AbsTime until(convert(timeout));
+ while (true) {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn);
+ boost::shared_ptr<ReceiverContext> r = ssn->nextReceiver();
+ if (r) {
+ return r;
+ } else if (until > qpid::sys::now()) {
+ waitUntil(ssn, until);
+ } else {
+ return boost::shared_ptr<ReceiverContext>();
+ }
+ }
+}
+
void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1560394&r1=1560393&r2=1560394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Wed Jan 22 15:57:29 2014
@@ -85,6 +85,7 @@ class ConnectionContext : public qpid::s
void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject);
void sync(boost::shared_ptr<SessionContext> ssn);
+ boost::shared_ptr<ReceiverContext> nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout);
void setOption(const std::string& name, const qpid::types::Variant& value);
std::string getAuthenticatedUsername();
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1560394&r1=1560393&r2=1560394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Wed Jan 22 15:57:29 2014
@@ -129,4 +129,9 @@ void ReceiverContext::reset(pn_session_t
configure();
}
+bool ReceiverContext::hasCurrent()
+{
+ return pn_link_current(receiver);
+}
+
}}} // namespace qpid::messaging::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1560394&r1=1560393&r2=1560394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h Wed Jan 22 15:57:29 2014
@@ -59,6 +59,7 @@ class ReceiverContext
void configure();
void verify();
Address getAddress() const;
+ bool hasCurrent();
private:
friend class ConnectionContext;
const std::string name;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1560394&r1=1560393&r2=1560394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Wed Jan 22 15:57:29 2014
@@ -89,8 +89,14 @@ void SessionContext::removeSender(const
senders.erase(n);
}
-boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/)
+boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver()
{
+ for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ if (i->second->hasCurrent()) {
+ return i->second;
+ }
+ }
+
return boost::shared_ptr<ReceiverContext>();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1560394&r1=1560393&r2=1560394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Wed Jan 22 15:57:29 2014
@@ -57,7 +57,7 @@ class SessionContext
boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const;
void removeReceiver(const std::string&);
void removeSender(const std::string&);
- boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout);
+ boost::shared_ptr<ReceiverContext> nextReceiver();
uint32_t getReceivable();
uint32_t getUnsettledAcks();
bool settled();
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp?rev=1560394&r1=1560393&r2=1560394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp Wed Jan 22 15:57:29 2014
@@ -108,7 +108,7 @@ qpid::messaging::Receiver SessionHandle:
bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout)
{
- boost::shared_ptr<ReceiverContext> r = session->nextReceiver(timeout);
+ boost::shared_ptr<ReceiverContext> r = connection->nextReceiver(session, timeout);
if (r) {
//TODO: cache handles in this case to avoid frequent allocation
receiver = qpid::messaging::Receiver(new ReceiverHandle(connection, session, r));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org