You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/10/08 10:55:45 UTC
svn commit: r823094 - in /qpid/trunk/qpid/cpp/src/qpid/agent:
ManagementAgentImpl.cpp ManagementAgentImpl.h
Author: gsim
Date: Thu Oct 8 08:55:44 2009
New Revision: 823094
URL: http://svn.apache.org/viewvc?rev=823094&view=rev
Log:
QPID-2132: Applied patch from Ken Giusti
Modified:
qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=823094&r1=823093&r2=823094&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Thu Oct 8 08:55:44 2009
@@ -78,7 +78,7 @@
const string ManagementAgentImpl::storeMagicNumber("MA02");
ManagementAgentImpl::ManagementAgentImpl() :
- interval(10), extThread(false),
+ interval(10), extThread(false), pipeHandle(0),
initialized(false), connected(false), lastFailure("never connected"),
clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
@@ -89,13 +89,11 @@
ManagementAgentImpl::~ManagementAgentImpl()
{
+ // shutdown the connection thread
connThreadBody.close();
+ connThread.join();
- // If the thread is doing work on the connection, we must wait for it to
- // complete before shutting down.
- if (!connThreadBody.isSleeping()) {
- connThread.join();
- }
+ // @todo need to shutdown pubThread?
// Release the memory associated with stored management objects.
{
@@ -777,6 +775,7 @@
static const int delayFactor(2);
int delay(delayMin);
string dest("qmfagent");
+ ConnectionThread::shared_ptr tmp;
sessionId.generate();
queueName << "qmfagent-" << sessionId;
@@ -787,7 +786,7 @@
QPID_LOG(debug, "QMF Agent attempting to connect to the broker...");
connection.open(agent.connectionSettings);
session = connection.newSession(queueName.str());
- subscriptions = new client::SubscriptionManager(session);
+ subscriptions.reset(new client::SubscriptionManager(session));
session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true,
arg::exclusive=true);
@@ -811,11 +810,12 @@
operational = false;
agent.connected = false;
+ tmp = subscriptions;
+ subscriptions.reset();
}
+ tmp.reset(); // frees the subscription outside the lock
delay = delayMin;
connection.close();
- delete subscriptions;
- subscriptions = 0;
}
} catch (exception &e) {
if (delay < delayMax)
@@ -824,14 +824,19 @@
}
{
+ // sleep for "delay" seconds, but peridically check if the
+ // agent is shutting down so we don't hang for up to delayMax
+ // seconds during agent shutdown
Mutex::ScopedLock _lock(connLock);
if (shutdown)
return;
sleeping = true;
- {
- Mutex::ScopedUnlock _unlock(connLock);
- ::sleep(delay);
- }
+ int totalSleep = 0;
+ do {
+ Mutex::ScopedUnlock _unlock(connLock);
+ ::sleep(delayMin);
+ totalSleep += delayMin;
+ } while (totalSleep < delay && !shutdown);
sleeping = false;
if (shutdown)
return;
@@ -848,10 +853,12 @@
const string& exchange,
const string& routingKey)
{
+ ConnectionThread::shared_ptr s;
{
Mutex::ScopedLock _lock(connLock);
if (!operational)
return;
+ s = subscriptions;
}
Message msg;
@@ -866,8 +873,8 @@
} catch(exception& e) {
QPID_LOG(error, "Exception caught in sendBuffer: " << e.what());
// Bounce the connection
- if (subscriptions)
- subscriptions->stop();
+ if (s)
+ s->stop();
}
}
@@ -881,12 +888,14 @@
void ManagementAgentImpl::ConnectionThread::close()
{
+ ConnectionThread::shared_ptr s;
{
Mutex::ScopedLock _lock(connLock);
shutdown = true;
+ s = subscriptions;
}
- if (subscriptions)
- subscriptions->stop();
+ if (s)
+ s->stop();
}
bool ManagementAgentImpl::ConnectionThread::isSleeping() const
Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=823094&r1=823093&r2=823094&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Thu Oct 8 08:55:44 2009
@@ -163,12 +163,14 @@
friend class ConnectionThread;
class ConnectionThread : public sys::Runnable
{
+ typedef boost::shared_ptr<client::SubscriptionManager> shared_ptr;
+
bool operational;
ManagementAgentImpl& agent;
framing::Uuid sessionId;
client::Connection connection;
client::Session session;
- client::SubscriptionManager* subscriptions;
+ ConnectionThread::shared_ptr subscriptions;
std::stringstream queueName;
mutable sys::Mutex connLock;
bool shutdown;
@@ -176,7 +178,7 @@
void run();
public:
ConnectionThread(ManagementAgentImpl& _agent) :
- operational(false), agent(_agent), subscriptions(0),
+ operational(false), agent(_agent),
shutdown(false), sleeping(false) {}
~ConnectionThread();
void sendBuffer(qpid::framing::Buffer& buf,
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org