You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2016/11/08 12:50:19 UTC

[2/2] qpid-cpp git commit: QPID-7501: free sessions and links under connection lock

QPID-7501: free sessions and links under connection lock


Project: http://git-wip-us.apache.org/repos/asf/qpid-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-cpp/commit/5f5790e7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-cpp/tree/5f5790e7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-cpp/diff/5f5790e7

Branch: refs/heads/master
Commit: 5f5790e7cd1c80a3ae28cb291679f6faaaa3ffc1
Parents: db6ba28
Author: Gordon Sim <gs...@redhat.com>
Authored: Tue Nov 8 10:07:09 2016 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Tue Nov 8 12:49:44 2016 +0000

----------------------------------------------------------------------
 src/qpid/messaging/amqp/ConnectionContext.cpp |  5 +++-
 src/qpid/messaging/amqp/ReceiverContext.cpp   | 20 +++++++++++++-
 src/qpid/messaging/amqp/ReceiverContext.h     |  3 +++
 src/qpid/messaging/amqp/SenderContext.cpp     | 28 +++++++++++++++++--
 src/qpid/messaging/amqp/SenderContext.h       |  3 +++
 src/qpid/messaging/amqp/SessionContext.cpp    | 31 ++++++++++++++++++++--
 src/qpid/messaging/amqp/SessionContext.h      |  1 +
 7 files changed, 85 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/ConnectionContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/ConnectionContext.cpp b/src/qpid/messaging/amqp/ConnectionContext.cpp
index 5646c6c..85408de 100644
--- a/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -193,7 +193,7 @@ void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
     while (!(pn_session_state(ssn->session) & PN_REMOTE_CLOSED)) {
         wait();
     }
-
+    ssn->cleanup();
     sessions.erase(ssn->getName());
 }
 
@@ -222,6 +222,9 @@ void ConnectionContext::close()
             }
             lock.wait();
         }
+        for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+            i->second->cleanup();
+        }
         sessions.clear();
     }
     if (state != DISCONNECTED) {

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/ReceiverContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/ReceiverContext.cpp b/src/qpid/messaging/amqp/ReceiverContext.cpp
index 427a633..f585f8b 100644
--- a/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -40,11 +40,12 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co
 
 ReceiverContext::~ReceiverContext()
 {
-    if (receiver) pn_link_free(receiver);
+    if (!error && receiver) pn_link_free(receiver);
 }
 
 void ReceiverContext::setCapacity(uint32_t c)
 {
+    error.raise();
     if (c != capacity) {
         //stop
         capacity = c;
@@ -59,17 +60,20 @@ uint32_t ReceiverContext::getCapacity()
 
 uint32_t ReceiverContext::getAvailable()
 {
+    error.raise();
     return pn_link_queued(receiver);
 }
 
 uint32_t ReceiverContext::getUnsettled()
 {
+    error.raise();
     assert(pn_link_unsettled(receiver) >= pn_link_queued(receiver));
     return pn_link_unsettled(receiver) - pn_link_queued(receiver);
 }
 
 void ReceiverContext::close()
 {
+    error.raise();
     if (receiver) pn_link_close(receiver);
 }
 
@@ -84,6 +88,7 @@ const std::string& ReceiverContext::getSource() const
 }
 void ReceiverContext::verify()
 {
+    error.raise();
     pn_terminus_t* source = pn_link_remote_source(receiver);
     if (!helper.isNameNull() && !pn_terminus_get_address(source)) {
         std::string msg("No such source : ");
@@ -98,10 +103,12 @@ void ReceiverContext::verify()
 }
 void ReceiverContext::configure()
 {
+    error.raise();
     if (receiver) configure(pn_link_source(receiver));
 }
 void ReceiverContext::configure(pn_terminus_t* source)
 {
+    error.raise();
     helper.configure(receiver, source, AddressHelper::FOR_RECEIVER);
     std::string option;
     if (helper.getLinkTarget(option)) {
@@ -124,6 +131,7 @@ void ReceiverContext::reset(pn_session_t* session)
 
 bool ReceiverContext::hasCurrent()
 {
+    error.raise();
     return receiver &&  pn_link_current(receiver);
 }
 
@@ -137,4 +145,14 @@ bool ReceiverContext::wakeupToIssueCredit()
     }
 }
 
+void ReceiverContext::cleanup()
+{
+    if (!error && receiver) {
+        error = new LinkError("receiver no longer valid");
+        pn_link_free(receiver);
+        receiver = 0;
+
+    }
+}
+
 }}} // namespace qpid::messaging::amqp

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/ReceiverContext.h
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/ReceiverContext.h b/src/qpid/messaging/amqp/ReceiverContext.h
index dd1352a..93041f1 100644
--- a/src/qpid/messaging/amqp/ReceiverContext.h
+++ b/src/qpid/messaging/amqp/ReceiverContext.h
@@ -25,6 +25,7 @@
 #include "qpid/messaging/amqp/AddressHelper.h"
 #include <string>
 #include "qpid/sys/AtomicCount.h"
+#include "qpid/sys/ExceptionHolder.h"
 #include "qpid/sys/IntegerTypes.h"
 
 struct pn_link_t;
@@ -60,6 +61,7 @@ class ReceiverContext
     void verify();
     Address getAddress() const;
     bool hasCurrent();
+    void cleanup();
   private:
     friend class ConnectionContext;
     const std::string name;
@@ -71,6 +73,7 @@ class ReceiverContext
     qpid::sys::AtomicCount fetching;
     void configure(pn_terminus_t*);
     bool wakeupToIssueCredit();
+    sys::ExceptionHolder error;
 };
 }}} // namespace qpid::messaging::amqp
 

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/SenderContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SenderContext.cpp b/src/qpid/messaging/amqp/SenderContext.cpp
index a3ffb15..7ae5e2a 100644
--- a/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/src/qpid/messaging/amqp/SenderContext.cpp
@@ -67,16 +67,18 @@ SenderContext::SenderContext(pn_session_t* session, const std::string& n,
 
 SenderContext::~SenderContext()
 {
-    if (sender) pn_link_free(sender);
+    if (!error && sender) pn_link_free(sender);
 }
 
 void SenderContext::close()
 {
+    error.raise();
     if (sender) pn_link_close(sender);
 }
 
 void SenderContext::setCapacity(uint32_t c)
 {
+    error.raise();
     if (c < deliveries.size()) throw qpid::messaging::SenderError("Desired capacity is less than unsettled message count!");
     capacity = c;
 }
@@ -88,6 +90,7 @@ uint32_t SenderContext::getCapacity()
 
 uint32_t SenderContext::getUnsettled()
 {
+    error.raise();
     return processUnsettled(true/*always allow retrieval of unsettled count, even if link has failed*/);
 }
 
@@ -103,6 +106,7 @@ const std::string& SenderContext::getTarget() const
 
 bool SenderContext::send(const qpid::messaging::Message& message, SenderContext::Delivery** out)
 {
+    error.raise();
     resend();//if there are any messages needing to be resent at the front of the queue, send them first
     if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
         types::Variant state;
@@ -135,6 +139,7 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext:
 
 void SenderContext::check()
 {
+    error.raise();
     if (pn_link_state(sender) & PN_REMOTE_CLOSED && !(pn_link_state(sender) & PN_LOCAL_CLOSED)) {
         std::string text = get_error_string(pn_link_remote_condition(sender), "Link detached by peer");
         pn_link_close(sender);
@@ -144,6 +149,7 @@ void SenderContext::check()
 
 uint32_t SenderContext::processUnsettled(bool silent)
 {
+    error.raise();
     if (!silent) {
         check();
     }
@@ -693,6 +699,7 @@ void SenderContext::Delivery::settleAndReset()
 }
 void SenderContext::verify()
 {
+    error.raise();
     pn_terminus_t* target = pn_link_remote_target(sender);
     if (!helper.isNameNull() && !pn_terminus_get_address(target)) {
         std::string msg("No such target : ");
@@ -709,11 +716,13 @@ void SenderContext::verify()
 
 void SenderContext::configure()
 {
+    error.raise();
     if (sender) configure(pn_link_target(sender));
 }
 
 void SenderContext::configure(pn_terminus_t* target)
 {
+    error.raise();
     helper.configure(sender, target, AddressHelper::FOR_SENDER);
     std::string option;
     if (helper.getLinkSource(option)) {
@@ -725,12 +734,17 @@ void SenderContext::configure(pn_terminus_t* target)
 
 bool SenderContext::settled()
 {
+    error.raise();
     return processUnsettled(false) == 0;
 }
 
 bool SenderContext::closed()
 {
-    return pn_link_state(sender) & PN_LOCAL_CLOSED;
+    if (sender) {
+        return pn_link_state(sender) & PN_LOCAL_CLOSED;
+    } else {
+        return true;
+    }
 }
 
 Address SenderContext::getAddress() const
@@ -754,4 +768,14 @@ void SenderContext::resend()
     }
 }
 
+void SenderContext::cleanup()
+{
+    if (!error && sender) {
+        error = new LinkError("sender no longer valid");
+        pn_link_free(sender);
+        sender = 0;
+
+    }
+}
+
 }}} // namespace qpid::messaging::amqp

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/SenderContext.h
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SenderContext.h b/src/qpid/messaging/amqp/SenderContext.h
index 8bed808..94d987f 100644
--- a/src/qpid/messaging/amqp/SenderContext.h
+++ b/src/qpid/messaging/amqp/SenderContext.h
@@ -26,6 +26,7 @@
 #include <vector>
 #include <boost/shared_ptr.hpp>
 #include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/ExceptionHolder.h"
 #include "qpid/sys/Time.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/amqp/AddressHelper.h"
@@ -117,6 +118,7 @@ class SenderContext
     virtual bool settled();
     virtual bool closed();
     virtual Address getAddress() const;
+    void cleanup();
 
   protected:
     pn_link_t* sender;
@@ -134,6 +136,7 @@ class SenderContext
     bool unreliable;
     const SenderOptions options;
     boost::shared_ptr<Transaction> transaction;
+    sys::ExceptionHolder error;
 
     uint32_t processUnsettled(bool silent);
     void configure(pn_terminus_t*);

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/SessionContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SessionContext.cpp b/src/qpid/messaging/amqp/SessionContext.cpp
index 3420a64..b57ae27 100644
--- a/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/src/qpid/messaging/amqp/SessionContext.cpp
@@ -97,13 +97,21 @@ boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string
 void SessionContext::removeReceiver(const std::string& n)
 {
     error.raise();
-    receivers.erase(n);
+    SessionContext::ReceiverMap::iterator i = receivers.find(n);
+    if (i != receivers.end()) {
+        i->second->cleanup();
+        receivers.erase(i);
+    }
 }
 
 void SessionContext::removeSender(const std::string& n)
 {
     error.raise();
-    senders.erase(n);
+    SessionContext::SenderMap::iterator i = senders.find(n);
+    if (i != senders.end()) {
+        i->second->cleanup();
+        senders.erase(i);
+    }
 }
 
 boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver()
@@ -258,5 +266,24 @@ void SessionContext::resetSession(pn_session_t* session_) {
     }
 }
 
+void SessionContext::cleanup() {
+    if (transaction) {
+        transaction->cleanup();
+        transaction = boost::shared_ptr<Transaction>();
+    }
+    for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
+        i->second->cleanup();
+    }
+    senders.clear();
+    for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+        i->second->cleanup();
+    }
+    receivers.clear();
+    if (!error && session) {
+        error = new SessionClosed();
+        pn_session_free(session);
+        session = 0;
+    }
+}
 
 }}} // namespace qpid::messaging::amqp

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/SessionContext.h
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SessionContext.h b/src/qpid/messaging/amqp/SessionContext.h
index 8c28208..dbefd61 100644
--- a/src/qpid/messaging/amqp/SessionContext.h
+++ b/src/qpid/messaging/amqp/SessionContext.h
@@ -90,6 +90,7 @@ class SessionContext
     void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative);
     void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end);
     void resetSession(pn_session_t*);
+    void cleanup();
 };
 }}} // namespace qpid::messaging::amqp
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org