You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/11/06 18:06:39 UTC
svn commit: r711913 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/agent:
ManagementAgentImpl.cpp ManagementAgentImpl.h
Author: tross
Date: Thu Nov 6 09:06:21 2008
New Revision: 711913
URL: http://svn.apache.org/viewvc?rev=711913&view=rev
Log:
QPID-1437 - Fixed qmf agent shutdown
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=711913&r1=711912&r2=711913&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Thu Nov 6 09:06:21 2008
@@ -87,6 +87,17 @@
// TODO: Establish system ID
}
+ManagementAgentImpl::~ManagementAgentImpl()
+{
+ connThreadBody.close();
+
+ // If the thread is doing work on the connection, we must wait for it to
+ // complete before shutting down.
+ if (!connThreadBody.isSleeping()) {
+ connThread.join();
+ }
+}
+
void ManagementAgentImpl::init(string brokerHost,
uint16_t brokerPort,
uint16_t intervalSeconds,
@@ -725,21 +736,31 @@
delete subscriptions;
subscriptions = 0;
session.close();
+ connection.close();
}
} catch (std::exception &e) {
if (delay < delayMax)
delay *= delayFactor;
}
- ::sleep(delay);
+ {
+ Mutex::ScopedLock _lock(connLock);
+ if (shutdown)
+ return;
+ sleeping = true;
+ {
+ Mutex::ScopedUnlock _unlock(connLock);
+ ::sleep(delay);
+ }
+ sleeping = false;
+ if (shutdown)
+ return;
+ }
}
}
ManagementAgentImpl::ConnectionThread::~ConnectionThread()
{
- if (subscriptions != 0) {
- delete subscriptions;
- }
}
void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
@@ -773,6 +794,22 @@
arg::bindingKey=key.str());
}
+void ManagementAgentImpl::ConnectionThread::close()
+{
+ {
+ Mutex::ScopedLock _lock(connLock);
+ shutdown = true;
+ }
+ if (subscriptions)
+ subscriptions->stop();
+}
+
+bool ManagementAgentImpl::ConnectionThread::isSleeping() const
+{
+ Mutex::ScopedLock _lock(connLock);
+ return sleeping;
+}
+
void ManagementAgentImpl::PublishThread::run()
{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=711913&r1=711912&r2=711913&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Thu Nov 6 09:06:21 2008
@@ -43,7 +43,7 @@
public:
ManagementAgentImpl();
- virtual ~ManagementAgentImpl() {};
+ virtual ~ManagementAgentImpl();
//
// Methods from ManagementAgent
@@ -156,17 +156,22 @@
client::Session session;
client::SubscriptionManager* subscriptions;
std::stringstream queueName;
- sys::Mutex connLock;
+ mutable sys::Mutex connLock;
+ bool shutdown;
+ bool sleeping;
void run();
public:
ConnectionThread(ManagementAgentImpl& _agent) :
- operational(false), agent(_agent), subscriptions(0) {}
+ operational(false), agent(_agent), subscriptions(0),
+ shutdown(false), sleeping(false) {}
~ConnectionThread();
void sendBuffer(qpid::framing::Buffer& buf,
uint32_t length,
const std::string& exchange,
const std::string& routingKey);
void bindToBank(uint32_t brokerBank, uint32_t agentBank);
+ void close();
+ bool isSleeping() const;
};
class PublishThread : public sys::Runnable