You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/11/06 18:06:39 UTC

svn commit: r711913 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/agent: ManagementAgentImpl.cpp ManagementAgentImpl.h

Author: tross
Date: Thu Nov  6 09:06:21 2008
New Revision: 711913

URL: http://svn.apache.org/viewvc?rev=711913&view=rev
Log:
QPID-1437 - Fixed qmf agent shutdown

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=711913&r1=711912&r2=711913&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Thu Nov  6 09:06:21 2008
@@ -87,6 +87,17 @@
     // TODO: Establish system ID
 }
 
+ManagementAgentImpl::~ManagementAgentImpl()
+{
+    connThreadBody.close();
+
+    // If the thread is doing work on the connection, we must wait for it to
+    // complete before shutting down.
+    if (!connThreadBody.isSleeping()) {
+        connThread.join();
+    }
+}
+
 void ManagementAgentImpl::init(string    brokerHost,
                                uint16_t  brokerPort,
                                uint16_t  intervalSeconds,
@@ -725,21 +736,31 @@
                 delete subscriptions;
                 subscriptions = 0;
                 session.close();
+                connection.close();
             }
         } catch (std::exception &e) {
             if (delay < delayMax)
                 delay *= delayFactor;
         }
 
-        ::sleep(delay);
+        {
+             Mutex::ScopedLock _lock(connLock);
+             if (shutdown)
+                 return;
+             sleeping = true;
+             {
+                  Mutex::ScopedUnlock _unlock(connLock);
+                  ::sleep(delay);
+             }
+             sleeping = false;
+             if (shutdown)
+                 return;
+        }
     }
 }
 
 ManagementAgentImpl::ConnectionThread::~ConnectionThread()
 {
-    if (subscriptions != 0) {
-        delete subscriptions;
-    }
 }
 
 void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer&  buf,
@@ -773,6 +794,22 @@
                           arg::bindingKey=key.str());
 }
 
+void ManagementAgentImpl::ConnectionThread::close()
+{
+    {
+        Mutex::ScopedLock _lock(connLock);
+        shutdown = true;
+    }
+    if (subscriptions)
+        subscriptions->stop();
+}
+
+bool ManagementAgentImpl::ConnectionThread::isSleeping() const
+{
+    Mutex::ScopedLock _lock(connLock);
+    return sleeping;
+}
+
 
 void ManagementAgentImpl::PublishThread::run()
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=711913&r1=711912&r2=711913&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Thu Nov  6 09:06:21 2008
@@ -43,7 +43,7 @@
   public:
 
     ManagementAgentImpl();
-    virtual ~ManagementAgentImpl() {};
+    virtual ~ManagementAgentImpl();
 
     //
     // Methods from ManagementAgent
@@ -156,17 +156,22 @@
         client::Session      session;
         client::SubscriptionManager* subscriptions;
         std::stringstream queueName;
-        sys::Mutex        connLock;
+        mutable sys::Mutex   connLock;
+        bool              shutdown;
+        bool              sleeping;
         void run();
     public:
         ConnectionThread(ManagementAgentImpl& _agent) :
-            operational(false), agent(_agent), subscriptions(0) {}
+            operational(false), agent(_agent), subscriptions(0),
+            shutdown(false), sleeping(false) {}
         ~ConnectionThread();
         void sendBuffer(qpid::framing::Buffer& buf,
                         uint32_t               length,
                         const std::string&     exchange,
                         const std::string&     routingKey);
         void bindToBank(uint32_t brokerBank, uint32_t agentBank);
+        void close();
+        bool isSleeping() const;
     };
 
     class PublishThread : public sys::Runnable