You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/10/08 10:55:45 UTC

svn commit: r823094 - in /qpid/trunk/qpid/cpp/src/qpid/agent: ManagementAgentImpl.cpp ManagementAgentImpl.h

Author: gsim
Date: Thu Oct  8 08:55:44 2009
New Revision: 823094

URL: http://svn.apache.org/viewvc?rev=823094&view=rev
Log:
QPID-2132: Applied patch from Ken Giusti


Modified:
    qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h

Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=823094&r1=823093&r2=823094&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Thu Oct  8 08:55:44 2009
@@ -78,7 +78,7 @@
 const string ManagementAgentImpl::storeMagicNumber("MA02");
 
 ManagementAgentImpl::ManagementAgentImpl() :
-    interval(10), extThread(false),
+    interval(10), extThread(false), pipeHandle(0),
     initialized(false), connected(false), lastFailure("never connected"),
     clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
     assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
@@ -89,13 +89,11 @@
 
 ManagementAgentImpl::~ManagementAgentImpl()
 {
+    // shutdown the connection thread
     connThreadBody.close();
+    connThread.join();
 
-    // If the thread is doing work on the connection, we must wait for it to
-    // complete before shutting down.
-    if (!connThreadBody.isSleeping()) {
-        connThread.join();
-    }
+    // @todo need to shutdown pubThread?
 
     // Release the memory associated with stored management objects.
     {
@@ -777,6 +775,7 @@
     static const int delayFactor(2);
     int delay(delayMin);
     string dest("qmfagent");
+    ConnectionThread::shared_ptr tmp;
 
     sessionId.generate();
     queueName << "qmfagent-" << sessionId;
@@ -787,7 +786,7 @@
                 QPID_LOG(debug, "QMF Agent attempting to connect to the broker...");
                 connection.open(agent.connectionSettings);
                 session = connection.newSession(queueName.str());
-                subscriptions = new client::SubscriptionManager(session);
+                subscriptions.reset(new client::SubscriptionManager(session));
 
                 session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true,
                                      arg::exclusive=true);
@@ -811,11 +810,12 @@
 
                     operational = false;
                     agent.connected = false;
+                    tmp = subscriptions;
+                    subscriptions.reset();
                 }
+                tmp.reset();    // frees the subscription outside the lock
                 delay = delayMin;
                 connection.close();
-                delete subscriptions;
-                subscriptions = 0;
             }
         } catch (exception &e) {
             if (delay < delayMax)
@@ -824,14 +824,19 @@
         }
 
         {
+            // sleep for "delay" seconds, but peridically check if the
+            // agent is shutting down so we don't hang for up to delayMax 
+            // seconds during agent shutdown
              Mutex::ScopedLock _lock(connLock);
              if (shutdown)
                  return;
              sleeping = true;
-             {
-                  Mutex::ScopedUnlock _unlock(connLock);
-                  ::sleep(delay);
-             }
+             int totalSleep = 0;
+             do {
+                 Mutex::ScopedUnlock _unlock(connLock);
+                 ::sleep(delayMin);
+                 totalSleep += delayMin;
+             } while (totalSleep < delay && !shutdown);
              sleeping = false;
              if (shutdown)
                  return;
@@ -848,10 +853,12 @@
                                                        const string& exchange,
                                                        const string& routingKey)
 {
+    ConnectionThread::shared_ptr s;
     {
         Mutex::ScopedLock _lock(connLock);
         if (!operational)
             return;
+        s = subscriptions;
     }
 
     Message msg;
@@ -866,8 +873,8 @@
     } catch(exception& e) {
         QPID_LOG(error, "Exception caught in sendBuffer: " << e.what());
         // Bounce the connection
-        if (subscriptions)
-            subscriptions->stop();
+        if (s)
+            s->stop();
     }
 }
 
@@ -881,12 +888,14 @@
 
 void ManagementAgentImpl::ConnectionThread::close()
 {
+    ConnectionThread::shared_ptr s;
     {
         Mutex::ScopedLock _lock(connLock);
         shutdown = true;
+        s = subscriptions;
     }
-    if (subscriptions)
-        subscriptions->stop();
+    if (s)
+        s->stop();
 }
 
 bool ManagementAgentImpl::ConnectionThread::isSleeping() const

Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=823094&r1=823093&r2=823094&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Thu Oct  8 08:55:44 2009
@@ -163,12 +163,14 @@
     friend class ConnectionThread;
     class ConnectionThread : public sys::Runnable
     {
+        typedef boost::shared_ptr<client::SubscriptionManager> shared_ptr;
+
         bool operational;
         ManagementAgentImpl& agent;
         framing::Uuid        sessionId;
         client::Connection   connection;
         client::Session      session;
-        client::SubscriptionManager* subscriptions;
+        ConnectionThread::shared_ptr subscriptions;
         std::stringstream queueName;
         mutable sys::Mutex   connLock;
         bool              shutdown;
@@ -176,7 +178,7 @@
         void run();
     public:
         ConnectionThread(ManagementAgentImpl& _agent) :
-            operational(false), agent(_agent), subscriptions(0),
+            operational(false), agent(_agent),
             shutdown(false), sleeping(false) {}
         ~ConnectionThread();
         void sendBuffer(qpid::framing::Buffer& buf,



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org