You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/14 21:35:34 UTC

svn commit: r704637 - in /incubator/qpid/trunk/qpid/cpp: examples/failover/ src/qpid/client/ src/qpid/cluster/

Author: aconway
Date: Tue Oct 14 12:35:33 2008
New Revision: 704637

URL: http://svn.apache.org/viewvc?rev=704637&view=rev
Log:
Minor cleanup for client failover.

Modified:
    incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
    incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp

Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp?rev=704637&r1=704636&r2=704637&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp Tue Oct 14 12:35:33 2008
@@ -40,14 +40,13 @@
     int port = argc>2 ? atoi(argv[2]) : 5672;
     int count  = argc>3 ? atoi(argv[3]) : 30;
     int delayMs  = argc>4 ? atoi(argv[4]) : 1000;
+    string program_name = "PRODUCER";
 
     try {
         FailoverConnection connection;
         FailoverSession    * session;
         Message message;
 
-        string program_name = "PRODUCER";
-
         connection.open ( host, port );
         session = connection.newSession();
         int sent  = 0;
@@ -89,9 +88,10 @@
 
         session->sync();
         connection.close();
+        std::cout << program_name << ": " << " completed without error." << std::endl;
         return 0;  
     } catch(const std::exception& error) {
-        std::cout << error.what() << std::endl;
+        std::cout << program_name << ": " << error.what() << std::endl;
     }
     return 1;
 }

Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp?rev=704637&r1=704636&r2=704637&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp Tue Oct 14 12:35:33 2008
@@ -229,9 +229,9 @@
 {
     const char* host = argc>1 ? argv[1] : "127.0.0.1";
     int port = argc>2 ? atoi(argv[2]) : 5672;
+    string program_name = "LISTENER";
 
     try {
-        string program_name = "LISTENER";
 
         FailoverConnection connection;
         FailoverSession    * session;
@@ -250,10 +250,11 @@
         subscriptions.run ( );
 
         connection.close();
+        std::cout << program_name << ": " << " completed without error." << std::endl;
         return 0;
 
     } catch(const std::exception& error) {
-        std::cout << error.what() << std::endl;
+        std::cout << program_name  << ": " << error.what() << std::endl;
     }
     return 1;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=704637&r1=704636&r2=704637&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Tue Oct 14 12:35:33 2008
@@ -166,7 +166,7 @@
     for ( i = knownBrokers.begin(); i != knownBrokers.end(); ++i )
         knownBrokersUrls.push_back(Url((*i)->get<std::string>()));
     setState(OPEN);
-    QPID_LOG(info, "Known-brokers for connection: " << log::formatList(knownBrokersUrls));
+    QPID_LOG(debug, "Known-brokers for connection: " << log::formatList(knownBrokersUrls));
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=704637&r1=704636&r2=704637&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Tue Oct 14 12:35:33 2008
@@ -38,7 +38,7 @@
 namespace client {
 
 Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a)
-  : session(s), listener(l), autoAck(a) {}
+    : session(s), listener(l), autoAck(a) {}
 
 void Subscriber::received(Message& msg)
 {
@@ -96,18 +96,12 @@
     }
     catch (const ClosedException& e) 
     { 
-      QPID_LOG(debug, "Ignored exception in client dispatch thread: " << e.what());
+        QPID_LOG(debug, "Ignored exception in client dispatch thread: " << e.what());
     } //ignore it and return
     catch (const std::exception& e) {
         QPID_LOG(error, "Exception in client dispatch thread: " << e.what());
-      if ( failoverHandler )
-      {
-        failoverHandler();
-      }
-      else
-      {
-        QPID_LOG(info, "No dispatcher failover handler registered.");
-      }
+        if ( failoverHandler )
+            failoverHandler();
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp?rev=704637&r1=704636&r2=704637&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp Tue Oct 14 12:35:33 2008
@@ -65,9 +65,7 @@
 }    
 FailoverListener::~FailoverListener() {
     try { stop(); }
-    catch (const std::exception& e) { 
-        QPID_LOG(warning, QPID_MSG("Ignoring exception in destructor" << e.what()));
-    }
+    catch (const std::exception& e) {}
 }
 
 void FailoverListener::received(Message& msg) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=704637&r1=704636&r2=704637&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Oct 14 12:35:33 2008
@@ -36,6 +36,7 @@
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
 #include "qpid/log/Statement.h"
+#include "qpid/log/Helpers.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/memory.h"
 #include "qpid/shared_ptr.h"
@@ -163,10 +164,8 @@
 void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); }
 
 void Cluster::mcast(const Event& e, Lock&) {
-    if (state == LEFT) {
-        lock.notifyAll();        // threads waiting in getUrls()
+    if (state == LEFT) 
         return;
-    }
     if (state < READY && e.isConnection()) {
         // Stall outgoing connection events.
         QPID_LOG(trace, *this << " MCAST deferred: " << e );
@@ -354,7 +353,6 @@
             map = ClusterMap(memberId, myUrl, true);
             memberUpdate(l);
             unstall(l);
-            lock.notifyAll();   // threads waiting in getUrls()
         }
         else {                  // Joining established group.
             state = NEWBIE;
@@ -383,7 +381,7 @@
 void Cluster::unstall(Lock& l) {
     // Called with lock held
     switch (state) {
-      case INIT: case DUMPEE: case DUMPER:
+      case INIT: case DUMPEE: case DUMPER: case READY:
         QPID_LOG(debug, *this << " unstall: deliver=" << deliverQueue.size()
                  << " mcast=" << mcastQueue.size());
         deliverQueue.start();
@@ -393,7 +391,7 @@
         if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
         break;
       case LEFT: break;
-      case NEWBIE: case READY: case OFFER:
+      case NEWBIE: case OFFER:
         assert(0);
     }
 }
@@ -422,7 +420,7 @@
 void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
     map.ready(id, Url(url));
     if (id == memberId)
-        lock.notifyAll(); // threads waiting in getUrls()
+        unstall(l);
     memberUpdate(l);
 }
 
@@ -474,7 +472,8 @@
         map = *dumpedMap;
         QPID_LOG(debug, *this << " incoming dump complete. Members: " << map);
         mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l);
-        unstall(l);
+        state = READY;
+        // unstall when ready control is self-delivered.
     }
 }