You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/06/08 22:23:19 UTC

svn commit: r782766 - in /qpid/trunk/qpid/cpp: examples/qmf-console/ping.cpp src/qpid/console/Broker.cpp src/qpid/console/Broker.h src/qpid/console/SessionManager.cpp src/qpid/console/SessionManager.h

Author: tross
Date: Mon Jun  8 20:23:19 2009
New Revision: 782766

URL: http://svn.apache.org/viewvc?rev=782766&view=rev
Log:
Bugfixes in the c++ console API:
  - Connection threads now shut down cleanly
  - get-query timeouts now work properly
  - waitForStable now only waits for connected brokers

The ping example was improved.  It now more cleanly handles connection loss/reconnect.

Modified:
    qpid/trunk/qpid/cpp/examples/qmf-console/ping.cpp
    qpid/trunk/qpid/cpp/src/qpid/console/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/console/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.h

Modified: qpid/trunk/qpid/cpp/examples/qmf-console/ping.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/qmf-console/ping.cpp?rev=782766&r1=782765&r2=782766&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/qmf-console/ping.cpp (original)
+++ qpid/trunk/qpid/cpp/examples/qmf-console/ping.cpp Mon Jun  8 20:23:19 2009
@@ -43,6 +43,7 @@
     //
     SessionManager::Settings smSettings;
     smSettings.methodTimeout = 2;
+    smSettings.getTimeout = 2;
 
     //
     // Declare the console session manager.  With a null listener argument, it defaults to
@@ -58,18 +59,21 @@
     uint32_t count = 5;  // The number of echo requests we will send to the broker.
     Object::Vector list; // A container for holding objects retrieved from the broker.
 
-    //
-    // Query for a list of 'broker' objects from the Management Database
-    //
-    sm.getObjects(list, "broker");
-
-    //
-    // We expect one object (since we are connected to only one broker)
-    //
-    if (list.size() == 1) {
-        Object& brokerObject = *(list.begin());
+    for (uint32_t iter = 0; iter < count; iter++) {
+        cout << "Ping Broker: " << broker->getUrl() << "... ";
+        cout.flush();
+
+        //
+        // Query for a list of 'broker' objects from the Management Database
+        //
+        sm.getObjects(list, "broker");
+
+        //
+        // We expect one object (since we are connected to only one broker)
+        //
+        if (list.size() == 1) {
+            Object& brokerObject = *(list.begin());
 
-        for (uint32_t iter = 0; iter < count; iter++) {
             //
             // Declare a container for arguments to be sent with the "echo" method
             // that we will invoke on the remote "broker" object.
@@ -87,9 +91,6 @@
             args.addUint("sequence", iter);
             args.addString("body", "ABCDEFGHIJKLMNOPQRSTUVWXYZ");
 
-            cout << "Ping Broker: " << broker->getUrl() << "... ";
-            cout.flush();
-
             //
             // Invoke the method.  This is a synchronous operation that will block until
             // the method completes and returns a result.
@@ -109,6 +110,9 @@
 
             if (result.code == 0 && iter < count - 1)
               qpid::sys::sleep(1);
+        } else {
+            cout << "Disconnected..." << endl;
+            qpid::sys::sleep(1);
         }
     }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/console/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/console/Broker.cpp?rev=782766&r1=782765&r2=782766&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/console/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/console/Broker.cpp Mon Jun  8 20:23:19 2009
@@ -57,6 +57,8 @@
 
 Broker::~Broker()
 {
+    connThreadBody.shutdown();
+    connThread.join();
 }
 
 string Broker::getUrl() const
@@ -184,6 +186,8 @@
             subscriptions->setFlowControl(dest, FlowControl::unlimited());
             {
                 Mutex::ScopedLock _lock(connLock);
+                if (shuttingDown)
+                    return;
                 operational = true;
                 broker.resetAgents();
                 broker.connected = true;
@@ -199,16 +203,26 @@
                 broker.sessionManager.handleBrokerDisconnect(&broker);
             }
             delay = delayMin;
+            connection.close();
             delete subscriptions;
             subscriptions = 0;
-            session.close();
         } catch (std::exception &e) {
             QPID_LOG(debug, "  outer exception: " << e.what());
             if (delay < delayMax)
                 delay *= delayFactor;
         }
 
-        ::sleep(delay);
+            {
+                Mutex::ScopedLock _lock(connLock);
+                if (shuttingDown)
+                    return;
+                {
+                    Mutex::ScopedUnlock _unlock(connLock);
+                    ::sleep(delay);
+                }
+                if (shuttingDown)
+                    return;
+            }
     }
 }
 
@@ -253,6 +267,16 @@
                           arg::bindingKey=key);
 }
 
+void Broker::ConnectionThread::shutdown()
+{
+    {
+        Mutex::ScopedLock _lock(connLock);
+        shuttingDown = true;
+    }
+    if (subscriptions)
+        subscriptions->stop();
+}
+
 void Broker::waitForStable()
 {
     Mutex::ScopedLock l(lock);

Modified: qpid/trunk/qpid/cpp/src/qpid/console/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/console/Broker.h?rev=782766&r1=782765&r2=782766&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/console/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/console/Broker.h Mon Jun  8 20:23:19 2009
@@ -73,7 +73,6 @@
 
         SessionManager& sessionManager;
         AgentMap agents;
-        client::SubscriptionManager* subscription;
         bool connected;
         std::string error;
         std::string amqpSessionId;
@@ -88,25 +87,27 @@
 
         friend class ConnectionThread;
         class ConnectionThread : public sys::Runnable {
-                bool operational;
-                Broker&              broker;
-                framing::Uuid        sessionId;
-                client::Connection   connection;
-                client::Session      session;
-                client::SubscriptionManager* subscriptions;
-                std::stringstream queueName;
-                sys::Mutex        connLock;
-                void run();
-            public:
+            bool operational;
+            bool shuttingDown;
+            Broker&              broker;
+            framing::Uuid        sessionId;
+            client::Connection   connection;
+            client::Session      session;
+            client::SubscriptionManager* subscriptions;
+            std::stringstream queueName;
+            sys::Mutex        connLock;
+            void run();
+        public:
             ConnectionThread(Broker& _broker) :
-                operational(false), broker(_broker), subscriptions(0) {}
-                ~ConnectionThread();
-                void sendBuffer(qpid::framing::Buffer& buf,
-                                uint32_t               length,
-                                const std::string&     exchange = "qpid.management",
-                                const std::string&     routingKey = "broker");
-                void bindExchange(const std::string& exchange, const std::string& key);
-            };
+                operational(false), shuttingDown(false), broker(_broker), subscriptions(0) {}
+            ~ConnectionThread();
+            void sendBuffer(qpid::framing::Buffer& buf,
+                            uint32_t               length,
+                            const std::string&     exchange = "qpid.management",
+                            const std::string&     routingKey = "broker");
+            void bindExchange(const std::string& exchange, const std::string& key);
+            void shutdown();
+        };
 
         ConnectionThread connThreadBody;
         sys::Thread      connThread;

Modified: qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.cpp?rev=782766&r1=782765&r2=782766&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.cpp Mon Jun  8 20:23:19 2009
@@ -41,6 +41,13 @@
     bindingKeys();
 }
 
+SessionManager::~SessionManager()
+{
+    for (vector<Broker*>::iterator iter = brokers.begin();
+         iter != brokers.end(); iter++)
+        delete *iter;
+}
+
 Broker* SessionManager::addBroker(client::ConnectionSettings& settings)
 {
     Broker* broker(new Broker(*this, settings));
@@ -58,6 +65,7 @@
          iter != brokers.end(); iter++)
         if (*iter == broker) {
             brokers.erase(iter);
+            delete broker;
             return;
         }
 }
@@ -171,6 +179,11 @@
     syncSequenceList.clear();
     error = string();
 
+    if (agentList.empty()) {
+        objects = getResult;
+        return;
+    }
+
     for (Agent::Vector::iterator iter = agentList.begin(); iter != agentList.end(); iter++) {
         Agent* agent = *iter;
         char rawbuffer[512];
@@ -191,8 +204,12 @@
 
     {
         Mutex::ScopedLock _lock(lock);
+        sys::AbsTime startTime = sys::now();
         while (!syncSequenceList.empty() && error.empty()) {
             cv.wait(lock, AbsTime(now(), settings.getTimeout * TIME_SEC));
+            sys::AbsTime currTime = sys::now();
+            if (sys::Duration(startTime, currTime) > settings.getTimeout * TIME_SEC)
+                break;
         }
     }
 
@@ -221,7 +238,8 @@
     Mutex::ScopedLock l(brokerListLock);
     for (vector<Broker*>::iterator iter = brokers.begin();
          iter != brokers.end(); iter++)
-        (*iter)->waitForStable();
+        if ((*iter)->isConnected())
+            (*iter)->waitForStable();
 }
 
 void SessionManager::startProtocol(Broker* broker)

Modified: qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.h?rev=782766&r1=782765&r2=782766&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.h Mon Jun  8 20:23:19 2009
@@ -52,7 +52,7 @@
   public:
     typedef std::vector<std::string> NameVector;
     typedef std::vector<ClassKey> KeyVector;
-    ~SessionManager() {}
+    ~SessionManager();
 
     struct Settings {
         bool rcvObjects;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org