You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/22 15:09:34 UTC

svn commit: r707065 - in /incubator/qpid/trunk/qpid/cpp: examples/failover/ src/ src/qpid/client/ src/qpid/sys/

Author: aconway
Date: Wed Oct 22 06:09:33 2008
New Revision: 707065

URL: http://svn.apache.org/viewvc?rev=707065&view=rev
Log:
QPID-1382 from Mick Goulish: Improvement to Client-Side Cluster Failover code

Also:
Fix missing DispatchHandle.h include in sys/PollableQueue.h
Added ignore properties for failover example binaries & Makefile.

Modified:
    incubator/qpid/trunk/qpid/cpp/examples/failover/   (props changed)
    incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
    incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
    incubator/qpid/trunk/qpid/cpp/src/   (props changed)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h

Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Oct 22 06:09:33 2008
@@ -1 +1,5 @@
 Makefile.in
+listener
+Makefile
+declare_queues
+direct_producer

Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp Wed Oct 22 06:09:33 2008
@@ -60,6 +60,8 @@
             if ( count > 1000 )
               report = !(sent % 1000);
 
+            report = false;
+
             if ( report )
             {
               std::cout << "sending message " 
@@ -71,7 +73,7 @@
             message_data << sent;
             message.setData(message_data.str());
 
-            /* MICK FIXME
+            /* FIXME mgoulish 21 oct 08
                session.messageTransfer ( arg::content=message,  
                arg::destination="amq.direct"
                ); */
@@ -85,7 +87,7 @@
         }
         message.setData ( "That's all, folks!" );
 
-        /* FIXME mgoulish 16 Oct 08
+        /* FIXME mgoulish 21 oct 08
            session.messageTransfer ( arg::content=message,  
            arg::destination="amq.direct"
            ); 

Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp Wed Oct 22 06:09:33 2008
@@ -64,8 +64,10 @@
 void 
 Listener::received ( Message & message ) 
 {
+    /*
     if(! (count%1000))
       std::cerr << "\t\tListener received: " << message.getData() << std::endl;
+     * */
 
     ++ count;
 

Propchange: incubator/qpid/trunk/qpid/cpp/src/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Oct 22 06:09:33 2008
@@ -18,3 +18,5 @@
 managementgen.mk
 mgen.timestamp
 rgen.timestamp
+RdmaClient
+RdmaServer

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Wed Oct 22 06:09:33 2008
@@ -151,8 +151,6 @@
 static const std::string CONN_CLOSED("Connection closed by broker");
 
 void ConnectionImpl::shutdown() {
-    Mutex::ScopedLock l(lock);
-
     if ( failureCallback )
       failureCallback();
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp Wed Oct 22 06:09:33 2008
@@ -129,7 +129,7 @@
         ++ sessions_iterator )
   {
     FailoverSession * fs = * sessions_iterator;
-    fs->failover_in_progress = true;
+    fs->failoverStarting();
   }
 
     std::vector<Url> knownBrokers = connection.getKnownBrokers();
@@ -187,7 +187,7 @@
     )
     {
         FailoverSession * fs = * sessions_iterator;
-        fs->failover_in_progress = false;
+        fs->failoverComplete();
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp Wed Oct 22 06:09:33 2008
@@ -38,7 +38,8 @@
 namespace client {
 
 FailoverSession::FailoverSession ( ) :
-    failover_in_progress(false)
+    failover_in_progress(false),
+    failover_count(0)
 {
     // The session is created by FailoverConnection::newSession
     failoverSubscriptionManager = 0;
@@ -50,32 +51,108 @@
 }
 
 
+
 framing::FrameSet::shared_ptr 
 FailoverSession::get()
 {
+    while(1)
+    {
+    try
+    {
     return session.get();
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
+
 }
 
 
 SessionId 
-FailoverSession::getId() const
+FailoverSession::getId()
 {
+    while(1)
+    {
+    try
+    {
     return session.getId();
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
 void 
 FailoverSession::close()
 {
+    while(1)
+    {
+    try
+    {
     session.close();
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
 void 
 FailoverSession::sync()
 {
+    while(1)
+    {
+      try
+      {
+        session.sync();
+        return;
+      }
+      catch ( const std::exception& error )
+      {
+        if ( ! failover_in_progress )
+          throw ( error );
+        else
+        {
+          sys::Monitor::ScopedLock l(lock);
+          int current_failover_count = failover_count;
+          while ( current_failover_count == failover_count )
+            lock.wait();
+        }
+      }
+    }
 
-    session.sync();
 }
 
 
@@ -90,16 +167,53 @@
 Execution& 
 FailoverSession::getExecution()
 {
-
+    while(1)
+    {
+    try
+    {
     return session.getExecution();
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
+
 }
 
 
 void 
 FailoverSession::flush()
 {
-
+    while(1)
+    {
+    try
+    {
     session.flush();
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
+
 }
 
 
@@ -109,8 +223,27 @@
                                bool notifyPeer
 )
 {
-
+    while(1)
+    {
+    try
+    {
     session.markCompleted ( id, cumulative, notifyPeer );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
+
 }
 
 
@@ -120,8 +253,27 @@
 void 
 FailoverSession::executionSync()
 {
-
+    while(1)
+    {
+    try
+    {
     session.executionSync();
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
+
 }
 
 
@@ -131,10 +283,29 @@
                                    const string& value
 )
 {
-
+    while(1)
+    {
+    try
+    {
     session.executionResult ( commandId, 
                               value 
     );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
+
 }
 
 
@@ -149,7 +320,10 @@
                                       const FieldTable& errorInfo
 )
 {
-
+    while(1)
+    {
+    try
+    {
     session.executionException ( errorCode,
                                  commandId,
                                  classCode,
@@ -158,6 +332,22 @@
                                  description,
                                  errorInfo
     );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
+
 }
 
 
@@ -179,7 +369,7 @@
                                   acquireMode,
                                   content
         );
-        break;
+        return;
       }
       catch ( ... )
       {
@@ -197,8 +387,27 @@
 void 
 FailoverSession::messageAccept ( const SequenceSet& transfers )
 {
-
+    while(1)
+    {
+    try
+    {
     session.messageAccept ( transfers );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
+
 }
 
 
@@ -209,11 +418,30 @@
                                  const string& text
 )
 {
-
+    while(1)
+    {
+    try
+    {
     session.messageReject ( transfers, 
                             code, 
                             text 
     );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
+
 }
 
 
@@ -223,10 +451,29 @@
                                   bool setRedelivered
 )
 {
-
+    while(1)
+    {
+    try
+    {
     session.messageRelease ( transfers,
                              setRedelivered
     );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
+
 }
 
 
@@ -234,8 +481,26 @@
 qpid::framing::MessageAcquireResult 
 FailoverSession::messageAcquire ( const SequenceSet& transfers )
 {
-
+    while(1)
+    {
+    try
+    {
     return session.messageAcquire ( transfers );
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
+
 }
 
 
@@ -245,10 +510,28 @@
                                  const string& resumeId
 )
 {
-
+    while(1)
+    {
+    try
+    {
     return session.messageResume ( destination,
                                    resumeId
     );
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
+
 }
 
 
@@ -264,7 +547,10 @@
                                     const FieldTable& arguments
 )
 {
-
+    while(1)
+    {
+    try
+    {
     session.messageSubscribe ( queue,
                                destination,
                                acceptMode,
@@ -274,6 +560,22 @@
                                resumeTtl,
                                arguments
     );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
+
 }
 
 
@@ -281,8 +583,27 @@
 void 
 FailoverSession::messageCancel ( const string& destinations )
 {
-
+    while(1)
+    {
+    try
+    {
     session.messageCancel ( destinations );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
+
 }
 
 
@@ -292,9 +613,28 @@
                                       uint8_t flowMode
 )
 {
+    while(1)
+    {
+    try
+    {
     session.messageSetFlowMode ( destination,
                                  flowMode
     );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -304,10 +644,29 @@
                              uint8_t unit, 
                              uint32_t value)
 {
+    while(1)
+    {
+    try
+    {
     session.messageFlow ( destination,
                           unit,
                           value
     );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -315,7 +674,26 @@
 void 
 FailoverSession::messageFlush(const string& destination)
 {
+    while(1)
+    {
+    try
+    {
     session.messageFlush ( destination );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -323,7 +701,26 @@
 void 
 FailoverSession::messageStop(const string& destination)
 {
+    while(1)
+    {
+    try
+    {
     session.messageStop ( destination );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -331,7 +728,26 @@
 void 
 FailoverSession::txSelect()
 {
+    while(1)
+    {
+    try
+    {
     session.txSelect ( );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -339,7 +755,26 @@
 void 
 FailoverSession::txCommit()
 {
+    while(1)
+    {
+    try
+    {
     session.txCommit ( );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -347,7 +782,26 @@
 void 
 FailoverSession::txRollback()
 {
+    while(1)
+    {
+    try
+    {
     session.txRollback ( );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -355,7 +809,26 @@
 void 
 FailoverSession::dtxSelect()
 {
+    while(1)
+    {
+    try
+    {
     session.dtxSelect ( );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -365,10 +838,28 @@
                           bool join, 
                           bool resume)
 {
+    while(1)
+    {
+    try
+    {
     return session.dtxStart ( xid,
                               join,
                               resume
     );
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -378,10 +869,28 @@
                         bool fail, 
                         bool suspend)
 {
+    while(1)
+    {
+    try
+    {
     return session.dtxEnd ( xid,
                             fail,
                             suspend
     );
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -390,9 +899,27 @@
 FailoverSession::dtxCommit(const Xid& xid, 
                            bool onePhase)
 {
+    while(1)
+    {
+    try
+    {
     return session.dtxCommit ( xid,
                                onePhase
     );
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -400,7 +927,26 @@
 void 
 FailoverSession::dtxForget(const Xid& xid)
 {
+    while(1)
+    {
+    try
+    {
     session.dtxForget ( xid );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -408,7 +954,25 @@
 qpid::framing::DtxGetTimeoutResult 
 FailoverSession::dtxGetTimeout(const Xid& xid)
 {
+    while(1)
+    {
+    try
+    {
     return session.dtxGetTimeout ( xid );
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -416,7 +980,25 @@
 qpid::framing::XaResult 
 FailoverSession::dtxPrepare(const Xid& xid)
 {
+    while(1)
+    {
+    try
+    {
     return session.dtxPrepare ( xid );
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -424,7 +1006,25 @@
 qpid::framing::DtxRecoverResult 
 FailoverSession::dtxRecover()
 {
+    while(1)
+    {
+    try
+    {
     return session.dtxRecover ( );
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -432,7 +1032,25 @@
 qpid::framing::XaResult 
 FailoverSession::dtxRollback(const Xid& xid)
 {
+    while(1)
+    {
+    try
+    {
     return session.dtxRollback ( xid );
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -441,9 +1059,28 @@
 FailoverSession::dtxSetTimeout(const Xid& xid, 
                                uint32_t timeout)
 {
+    while(1)
+    {
+    try
+    {
     session.dtxSetTimeout ( xid,
                             timeout
     );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -457,6 +1094,10 @@
                                  bool autoDelete, 
                                  const FieldTable& arguments)
 {
+    while(1)
+    {
+    try
+    {
     session.exchangeDeclare ( exchange,
                               type,
                               alternateExchange,
@@ -465,6 +1106,21 @@
                               autoDelete,
                               arguments
     );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -473,9 +1129,28 @@
 FailoverSession::exchangeDelete(const string& exchange, 
                                 bool ifUnused)
 {
+    while(1)
+    {
+    try
+    {
     session.exchangeDelete ( exchange,
                              ifUnused
     );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -483,7 +1158,25 @@
 qpid::framing::ExchangeQueryResult 
 FailoverSession::exchangeQuery(const string& name)
 {
+    while(1)
+    {
+    try
+    {
     return session.exchangeQuery ( name );
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -494,11 +1187,30 @@
                               const string& bindingKey, 
                               const FieldTable& arguments)
 {
+    while(1)
+    {
+    try
+    {
     session.exchangeBind ( queue,
                            exchange,
                            bindingKey,
                            arguments
     );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -508,10 +1220,29 @@
                                 const string& exchange, 
                                 const string& bindingKey)
 {
+    while(1)
+    {
+    try
+    {
     session.exchangeUnbind ( queue,
                              exchange,
                              bindingKey
     );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -522,11 +1253,29 @@
                                const string& bindingKey, 
                                const FieldTable& arguments)
 {
+    while(1)
+    {
+    try
+    {
     return session.exchangeBound ( exchange,
                                    queue,
                                    bindingKey,
                                    arguments
     );
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -540,6 +1289,10 @@
                               bool autoDelete, 
                               const FieldTable& arguments)
 {
+    while(1)
+    {
+    try
+    {
     session.queueDeclare ( queue,
                            alternateExchange,
                            passive,
@@ -548,6 +1301,21 @@
                            autoDelete,
                            arguments
     );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -557,10 +1325,29 @@
                              bool ifUnused, 
                              bool ifEmpty)
 {
+    while(1)
+    {
+    try
+    {
     session.queueDelete ( queue,
                           ifUnused,
                           ifEmpty
     );
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -568,7 +1355,26 @@
 void 
 FailoverSession::queuePurge(const string& queue)
 {
+    while(1)
+    {
+    try
+    {
     session.queuePurge ( queue) ;
+    return;
+    }
+    catch ( const std::exception& error )
+    {
+      if ( ! failover_in_progress )
+        throw ( error );
+      else
+      {
+        sys::Monitor::ScopedLock l(lock);
+        int current_failover_count = failover_count;
+        while ( current_failover_count == failover_count )
+          lock.wait();
+      }
+    }
+    }
 }
 
 
@@ -576,7 +1382,25 @@
 qpid::framing::QueueQueryResult 
 FailoverSession::queueQuery(const string& queue)
 {
-    return session.queueQuery ( queue );
+    while(1)
+    {
+      try
+      {
+        return session.queueQuery ( queue );
+      }
+      catch ( const std::exception& error )
+      {
+        if ( ! failover_in_progress )
+          throw ( error );
+        else
+        {
+          sys::Monitor::ScopedLock l(lock);
+          int current_failover_count = failover_count;
+          while ( current_failover_count == failover_count )
+            lock.wait();
+        }
+      }
+    }
 }
 
 
@@ -587,6 +1411,7 @@
 void
 FailoverSession::prepareForFailover ( Connection newConnection )
 {
+    failover_in_progress = true;
     try
     {
         newSession = newConnection.newSession();
@@ -603,6 +1428,24 @@
 }
 
 
+void
+FailoverSession::failoverStarting ( )
+{
+  sys::Monitor::ScopedLock l(lock);
+  failover_in_progress = true;
+}
+
+
+void
+FailoverSession::failoverComplete ( )
+{
+  sys::Monitor::ScopedLock l(lock);
+  failover_in_progress = false;
+  ++ failover_count;
+  lock.notifyAll();
+}
+
+
 
 void 
 FailoverSession::failover (  )

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h Wed Oct 22 06:09:33 2008
@@ -35,7 +35,7 @@
 #include "qpid/client/SessionImpl.h"
 #include "qpid/client/TypedResult.h"
 #include "qpid/shared_ptr.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
 
 #include <string>
 
@@ -61,7 +61,7 @@
 
     framing::FrameSet::shared_ptr get();
 
-    SessionId getId() const;
+    SessionId getId();
 
     void close();
 
@@ -80,8 +80,6 @@
     
     void sendCompletion ( );
 
-    bool failover_in_progress; 
-
 
 
     // Wrapped functions from Session ----------------------------
@@ -293,15 +291,18 @@
     // end Wrapped functions from Session  ---------------------------
 
     // Tells the FailoverSession to get ready for a failover.
+    void failoverStarting();
     void prepareForFailover ( Connection newConnection );
-
     void failover ( );
+    void failoverComplete();
 
     void setFailoverSubscriptionManager(FailoverSubscriptionManager*);
 
   private:
-    typedef sys::Mutex::ScopedLock Lock;
-    sys::Mutex lock;
+    sys::Monitor lock;
+    bool failover_in_progress; 
+    int  failover_count;
+
 
     FailoverSubscriptionManager * failoverSubscriptionManager;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp Wed Oct 22 06:09:33 2008
@@ -48,7 +48,6 @@
     sys::Monitor::ScopedLock l(lock);
     newSession = _newSession;
     newSessionIsValid = true;
-    // lock.notifyAll();
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Wed Oct 22 06:09:33 2008
@@ -24,6 +24,7 @@
 
 #include "qpid/sys/PollableCondition.h"
 #include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/DispatchHandle.h"
 #include "qpid/sys/Monitor.h"
 #include <boost/function.hpp>
 #include <boost/bind.hpp>