You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2016/11/08 12:50:19 UTC
[2/2] qpid-cpp git commit: QPID-7501: free sessions and links under
connection lock
QPID-7501: free sessions and links under connection lock
Project: http://git-wip-us.apache.org/repos/asf/qpid-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-cpp/commit/5f5790e7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-cpp/tree/5f5790e7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-cpp/diff/5f5790e7
Branch: refs/heads/master
Commit: 5f5790e7cd1c80a3ae28cb291679f6faaaa3ffc1
Parents: db6ba28
Author: Gordon Sim <gs...@redhat.com>
Authored: Tue Nov 8 10:07:09 2016 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Tue Nov 8 12:49:44 2016 +0000
----------------------------------------------------------------------
src/qpid/messaging/amqp/ConnectionContext.cpp | 5 +++-
src/qpid/messaging/amqp/ReceiverContext.cpp | 20 +++++++++++++-
src/qpid/messaging/amqp/ReceiverContext.h | 3 +++
src/qpid/messaging/amqp/SenderContext.cpp | 28 +++++++++++++++++--
src/qpid/messaging/amqp/SenderContext.h | 3 +++
src/qpid/messaging/amqp/SessionContext.cpp | 31 ++++++++++++++++++++--
src/qpid/messaging/amqp/SessionContext.h | 1 +
7 files changed, 85 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/ConnectionContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/ConnectionContext.cpp b/src/qpid/messaging/amqp/ConnectionContext.cpp
index 5646c6c..85408de 100644
--- a/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -193,7 +193,7 @@ void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
while (!(pn_session_state(ssn->session) & PN_REMOTE_CLOSED)) {
wait();
}
-
+ ssn->cleanup();
sessions.erase(ssn->getName());
}
@@ -222,6 +222,9 @@ void ConnectionContext::close()
}
lock.wait();
}
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ i->second->cleanup();
+ }
sessions.clear();
}
if (state != DISCONNECTED) {
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/ReceiverContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/ReceiverContext.cpp b/src/qpid/messaging/amqp/ReceiverContext.cpp
index 427a633..f585f8b 100644
--- a/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -40,11 +40,12 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co
ReceiverContext::~ReceiverContext()
{
- if (receiver) pn_link_free(receiver);
+ if (!error && receiver) pn_link_free(receiver);
}
void ReceiverContext::setCapacity(uint32_t c)
{
+ error.raise();
if (c != capacity) {
//stop
capacity = c;
@@ -59,17 +60,20 @@ uint32_t ReceiverContext::getCapacity()
uint32_t ReceiverContext::getAvailable()
{
+ error.raise();
return pn_link_queued(receiver);
}
uint32_t ReceiverContext::getUnsettled()
{
+ error.raise();
assert(pn_link_unsettled(receiver) >= pn_link_queued(receiver));
return pn_link_unsettled(receiver) - pn_link_queued(receiver);
}
void ReceiverContext::close()
{
+ error.raise();
if (receiver) pn_link_close(receiver);
}
@@ -84,6 +88,7 @@ const std::string& ReceiverContext::getSource() const
}
void ReceiverContext::verify()
{
+ error.raise();
pn_terminus_t* source = pn_link_remote_source(receiver);
if (!helper.isNameNull() && !pn_terminus_get_address(source)) {
std::string msg("No such source : ");
@@ -98,10 +103,12 @@ void ReceiverContext::verify()
}
void ReceiverContext::configure()
{
+ error.raise();
if (receiver) configure(pn_link_source(receiver));
}
void ReceiverContext::configure(pn_terminus_t* source)
{
+ error.raise();
helper.configure(receiver, source, AddressHelper::FOR_RECEIVER);
std::string option;
if (helper.getLinkTarget(option)) {
@@ -124,6 +131,7 @@ void ReceiverContext::reset(pn_session_t* session)
bool ReceiverContext::hasCurrent()
{
+ error.raise();
return receiver && pn_link_current(receiver);
}
@@ -137,4 +145,14 @@ bool ReceiverContext::wakeupToIssueCredit()
}
}
+void ReceiverContext::cleanup()
+{
+ if (!error && receiver) {
+ error = new LinkError("receiver no longer valid");
+ pn_link_free(receiver);
+ receiver = 0;
+
+ }
+}
+
}}} // namespace qpid::messaging::amqp
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/ReceiverContext.h
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/ReceiverContext.h b/src/qpid/messaging/amqp/ReceiverContext.h
index dd1352a..93041f1 100644
--- a/src/qpid/messaging/amqp/ReceiverContext.h
+++ b/src/qpid/messaging/amqp/ReceiverContext.h
@@ -25,6 +25,7 @@
#include "qpid/messaging/amqp/AddressHelper.h"
#include <string>
#include "qpid/sys/AtomicCount.h"
+#include "qpid/sys/ExceptionHolder.h"
#include "qpid/sys/IntegerTypes.h"
struct pn_link_t;
@@ -60,6 +61,7 @@ class ReceiverContext
void verify();
Address getAddress() const;
bool hasCurrent();
+ void cleanup();
private:
friend class ConnectionContext;
const std::string name;
@@ -71,6 +73,7 @@ class ReceiverContext
qpid::sys::AtomicCount fetching;
void configure(pn_terminus_t*);
bool wakeupToIssueCredit();
+ sys::ExceptionHolder error;
};
}}} // namespace qpid::messaging::amqp
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/SenderContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SenderContext.cpp b/src/qpid/messaging/amqp/SenderContext.cpp
index a3ffb15..7ae5e2a 100644
--- a/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/src/qpid/messaging/amqp/SenderContext.cpp
@@ -67,16 +67,18 @@ SenderContext::SenderContext(pn_session_t* session, const std::string& n,
SenderContext::~SenderContext()
{
- if (sender) pn_link_free(sender);
+ if (!error && sender) pn_link_free(sender);
}
void SenderContext::close()
{
+ error.raise();
if (sender) pn_link_close(sender);
}
void SenderContext::setCapacity(uint32_t c)
{
+ error.raise();
if (c < deliveries.size()) throw qpid::messaging::SenderError("Desired capacity is less than unsettled message count!");
capacity = c;
}
@@ -88,6 +90,7 @@ uint32_t SenderContext::getCapacity()
uint32_t SenderContext::getUnsettled()
{
+ error.raise();
return processUnsettled(true/*always allow retrieval of unsettled count, even if link has failed*/);
}
@@ -103,6 +106,7 @@ const std::string& SenderContext::getTarget() const
bool SenderContext::send(const qpid::messaging::Message& message, SenderContext::Delivery** out)
{
+ error.raise();
resend();//if there are any messages needing to be resent at the front of the queue, send them first
if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
types::Variant state;
@@ -135,6 +139,7 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext:
void SenderContext::check()
{
+ error.raise();
if (pn_link_state(sender) & PN_REMOTE_CLOSED && !(pn_link_state(sender) & PN_LOCAL_CLOSED)) {
std::string text = get_error_string(pn_link_remote_condition(sender), "Link detached by peer");
pn_link_close(sender);
@@ -144,6 +149,7 @@ void SenderContext::check()
uint32_t SenderContext::processUnsettled(bool silent)
{
+ error.raise();
if (!silent) {
check();
}
@@ -693,6 +699,7 @@ void SenderContext::Delivery::settleAndReset()
}
void SenderContext::verify()
{
+ error.raise();
pn_terminus_t* target = pn_link_remote_target(sender);
if (!helper.isNameNull() && !pn_terminus_get_address(target)) {
std::string msg("No such target : ");
@@ -709,11 +716,13 @@ void SenderContext::verify()
void SenderContext::configure()
{
+ error.raise();
if (sender) configure(pn_link_target(sender));
}
void SenderContext::configure(pn_terminus_t* target)
{
+ error.raise();
helper.configure(sender, target, AddressHelper::FOR_SENDER);
std::string option;
if (helper.getLinkSource(option)) {
@@ -725,12 +734,17 @@ void SenderContext::configure(pn_terminus_t* target)
bool SenderContext::settled()
{
+ error.raise();
return processUnsettled(false) == 0;
}
bool SenderContext::closed()
{
- return pn_link_state(sender) & PN_LOCAL_CLOSED;
+ if (sender) {
+ return pn_link_state(sender) & PN_LOCAL_CLOSED;
+ } else {
+ return true;
+ }
}
Address SenderContext::getAddress() const
@@ -754,4 +768,14 @@ void SenderContext::resend()
}
}
+void SenderContext::cleanup()
+{
+ if (!error && sender) {
+ error = new LinkError("sender no longer valid");
+ pn_link_free(sender);
+ sender = 0;
+
+ }
+}
+
}}} // namespace qpid::messaging::amqp
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/SenderContext.h
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SenderContext.h b/src/qpid/messaging/amqp/SenderContext.h
index 8bed808..94d987f 100644
--- a/src/qpid/messaging/amqp/SenderContext.h
+++ b/src/qpid/messaging/amqp/SenderContext.h
@@ -26,6 +26,7 @@
#include <vector>
#include <boost/shared_ptr.hpp>
#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/ExceptionHolder.h"
#include "qpid/sys/Time.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/amqp/AddressHelper.h"
@@ -117,6 +118,7 @@ class SenderContext
virtual bool settled();
virtual bool closed();
virtual Address getAddress() const;
+ void cleanup();
protected:
pn_link_t* sender;
@@ -134,6 +136,7 @@ class SenderContext
bool unreliable;
const SenderOptions options;
boost::shared_ptr<Transaction> transaction;
+ sys::ExceptionHolder error;
uint32_t processUnsettled(bool silent);
void configure(pn_terminus_t*);
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/SessionContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SessionContext.cpp b/src/qpid/messaging/amqp/SessionContext.cpp
index 3420a64..b57ae27 100644
--- a/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/src/qpid/messaging/amqp/SessionContext.cpp
@@ -97,13 +97,21 @@ boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string
void SessionContext::removeReceiver(const std::string& n)
{
error.raise();
- receivers.erase(n);
+ SessionContext::ReceiverMap::iterator i = receivers.find(n);
+ if (i != receivers.end()) {
+ i->second->cleanup();
+ receivers.erase(i);
+ }
}
void SessionContext::removeSender(const std::string& n)
{
error.raise();
- senders.erase(n);
+ SessionContext::SenderMap::iterator i = senders.find(n);
+ if (i != senders.end()) {
+ i->second->cleanup();
+ senders.erase(i);
+ }
}
boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver()
@@ -258,5 +266,24 @@ void SessionContext::resetSession(pn_session_t* session_) {
}
}
+void SessionContext::cleanup() {
+ if (transaction) {
+ transaction->cleanup();
+ transaction = boost::shared_ptr<Transaction>();
+ }
+ for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
+ i->second->cleanup();
+ }
+ senders.clear();
+ for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ i->second->cleanup();
+ }
+ receivers.clear();
+ if (!error && session) {
+ error = new SessionClosed();
+ pn_session_free(session);
+ session = 0;
+ }
+}
}}} // namespace qpid::messaging::amqp
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/SessionContext.h
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SessionContext.h b/src/qpid/messaging/amqp/SessionContext.h
index 8c28208..dbefd61 100644
--- a/src/qpid/messaging/amqp/SessionContext.h
+++ b/src/qpid/messaging/amqp/SessionContext.h
@@ -90,6 +90,7 @@ class SessionContext
void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative);
void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end);
void resetSession(pn_session_t*);
+ void cleanup();
};
}}} // namespace qpid::messaging::amqp
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org