You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/10 23:19:46 UTC

svn commit: r703575 - in /incubator/qpid/trunk/qpid/cpp: examples/failover/ src/qpid/client/

Author: aconway
Date: Fri Oct 10 14:19:46 2008
New Revision: 703575

URL: http://svn.apache.org/viewvc?rev=703575&view=rev
Log:
Failover client and example fixes & tidy up.

Modified:
    incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
    incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h

Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp?rev=703575&r1=703574&r2=703575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp Fri Oct 10 14:19:46 2008
@@ -1,14 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
 #include <qpid/client/FailoverConnection.h>
 #include <qpid/client/Session.h>
 #include <qpid/client/AsyncSession.h>
 #include <qpid/client/Message.h>
 
 
-#include <unistd.h>
-#include <cstdlib>
 #include <iostream>
-#include <fstream>
-
 #include <sstream>
 
 using namespace qpid::client;
@@ -16,102 +33,46 @@
 
 using namespace std;
 
-
-
-
 int 
 main ( int argc, char ** argv) 
 {
-    try {
-        struct timeval broker_killed_time     = {0,0};
-        struct timeval failover_complete_time = {0,0};
-        struct timeval duration               = {0,0};
-
-
-        if ( argc < 3 )
-        {
-            std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n";
-            std::cerr << "i.e. for host: 127.0.0.1\n";
-            exit(1);
-        }
-
-        char const * host = argv[1];
-        int  port = atoi(argv[2]);
-        char const * broker_to_kill = 0;
-
-        if ( argc > 3 )
-        {
-            broker_to_kill = argv[3];
-            std::cerr << "main:  Broker marked for death is process ID " 
-                      << broker_to_kill
-                      << endl;
-        }
-        else
-        {
-            std::cerr << "PRODUCER main: there is no broker to kill.\n";
-        }
+    const char* host = argc>1 ? argv[1] : "127.0.0.1";
+    int port = argc>2 ? atoi(argv[2]) : 5672;
+    int count  = argc>3 ? atoi(argv[3]) : 30;
 
+    try {
         FailoverConnection connection;
         FailoverSession    * session;
         Message message;
 
         string program_name = "PRODUCER";
 
-
-        connection.failoverCompleteTime = & failover_complete_time;
-        connection.name = program_name;
         connection.open ( host, port );
-
         session = connection.newSession();
-        session->name    = program_name;
-
-        int send_this_many = 30,
-            messages_sent  = 0;
-
-        while ( messages_sent < send_this_many )
-        {
-            if ( (messages_sent == 13) && broker_to_kill )
-            {
-                char command[1000];
-                std::cerr << program_name << " killing broker " << broker_to_kill << ".\n";
-                sprintf(command, "kill -9 %s", broker_to_kill);
-                system ( command );
-                gettimeofday ( & broker_killed_time, 0 );
-            }
-
+        int sent  = 0;
+        while ( sent < count ) {
             message.getDeliveryProperties().setRoutingKey("routing_key"); 
-
-            std::cerr << "sending message " 
-                      << messages_sent
+            std::cout << "sending message " 
+                      << sent
                       << " of " 
-                      << send_this_many 
+                      << count 
                       << ".\n";
-
             stringstream message_data;
-            message_data << messages_sent;
+            message_data << sent;
             message.setData(message_data.str());
 
-            try
-            {
-                /* MICK FIXME
-                   session.messageTransfer ( arg::content=message,  
-                   arg::destination="amq.direct"
-                   ); */
-                session->messageTransfer ( "amq.direct",
-                                           1,
-                                           0,
-                                           message
-                );
-            }
-            catch ( const std::exception& error) 
-            {
-                cerr << program_name << " exception: " << error.what() << endl;
-            }
-
+            /* MICK FIXME
+               session.messageTransfer ( arg::content=message,  
+               arg::destination="amq.direct"
+               ); */
+            session->messageTransfer ( "amq.direct",
+                                       1,
+                                       0,
+                                       message
+            );
             sleep ( 1 );
-            ++ messages_sent;
+            ++ sent;
         }
-
         message.setData ( "That's all, folks!" );
 
         /* MICK FIXME
@@ -127,22 +88,7 @@
 
         session->sync();
         connection.close();
-
-        // This will be incorrect if you killed more than one...
-        if ( broker_to_kill )
-        {
-            timersub ( & failover_complete_time, 
-                       & broker_killed_time, 
-                       & duration 
-            );
-            fprintf ( stderr, 
-                      "Failover time: %ld.%.6ld\n",
-                      duration.tv_sec,
-                      duration.tv_usec
-            );
-        }
         return 0;  
-
     } catch(const std::exception& error) {
         std::cout << error.what() << std::endl;
     }

Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp?rev=703575&r1=703574&r2=703575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp Fri Oct 10 14:19:46 2008
@@ -1,11 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 
 #include <qpid/client/FailoverConnection.h>
 #include <qpid/client/Session.h>
 #include <qpid/client/Message.h>
 #include <qpid/client/SubscriptionManager.h>
 
-#include <unistd.h>
-#include <cstdlib>
 #include <iostream>
 #include <fstream>
 
@@ -16,8 +34,6 @@
 using namespace std;
 
 
-
-
 struct Recorder 
 {
     unsigned int max_messages;
@@ -211,19 +227,12 @@
 int 
 main ( int argc, char ** argv ) 
 {
+    const char* host = argc>1 ? argv[1] : "127.0.0.1";
+    int port = argc>2 ? atoi(argv[2]) : 5672;
+
     try {
         string program_name = "LISTENER";
 
-        if ( argc < 3 )
-        {
-            std::cerr << "Usage: ./listener host cluster_port_file_name\n";
-            std::cerr << "i.e. for host: 127.0.0.1\n";
-            exit(1);
-        }
-
-        char const * host = argv[1];
-        int port = atoi(argv[2]);
-
         FailoverConnection connection;
         FailoverSession    * session;
         Recorder recorder;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp?rev=703575&r1=703574&r2=703575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp Fri Oct 10 14:19:46 2008
@@ -146,9 +146,9 @@
      * (and, through them, their SessionManagers and whatever else)
      * that we are about to failover to this new Connection.
      */
-    // FIXME mgoulish -- get rid of two-passes here.
-    std::vector<FailoverSession *>::iterator sessions_iterator;
 
+    // FIXME aconway 2008-10-10: thread unsafe, possible race with concurrent newSession 
+    std::vector<FailoverSession *>::iterator sessions_iterator;
     for ( sessions_iterator = sessions.begin();
           sessions_iterator < sessions.end();
           ++ sessions_iterator

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h?rev=703575&r1=703574&r2=703575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h Fri Oct 10 14:19:46 2008
@@ -29,7 +29,7 @@
 #include "qpid/client/FailoverConnection.h"
 #include "qpid/client/FailoverSession.h"
 #include "qpid/client/FailoverSubscriptionManager.h"
-
+#include "qpid/sys/Mutex.h"
 
 
 namespace qpid {
@@ -82,11 +82,11 @@
 
   private:
 
-    std::string host;
+    typedef sys::Mutex::ScopedLock Lock;
 
-    Connection connection;
+    sys::Mutex lock;
 
-    int currentPortNumber;
+    Connection connection;
 
     boost::function<void ()> clientFailoverCallback;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp?rev=703575&r1=703574&r2=703575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp Fri Oct 10 14:19:46 2008
@@ -38,10 +38,10 @@
 namespace client {
 
 FailoverSession::FailoverSession ( ) :
-  name("no_name")
+    name("no_name")
 {
-  // The session is created by FailoverConnection::newSession
-  failoverSubscriptionManager = 0;
+    // The session is created by FailoverConnection::newSession
+    failoverSubscriptionManager = 0;
 }
 
 
@@ -53,50 +53,54 @@
 framing::FrameSet::shared_ptr 
 FailoverSession::get()
 {
-  return session.get();
+    return session.get();
 }
 
 
 SessionId 
 FailoverSession::getId() const
 {
-  return session.getId();
+    return session.getId();
 }
 
 
 void 
 FailoverSession::close()
 {
-  session.close();
+    session.close();
 }
 
 
 void 
 FailoverSession::sync()
 {
-  session.sync();
+
+    session.sync();
 }
 
 
 uint32_t 
 FailoverSession::timeout(uint32_t /*seconds*/ )
 {
-  // MICK WTF?  return session.timeout ( seconds );
-  return 0;
+
+    // MICK WTF?  return session.timeout ( seconds );
+    return 0;
 }
 
 
 Execution& 
 FailoverSession::getExecution()
 {
-  return session.getExecution();
+
+    return session.getExecution();
 }
 
 
 void 
 FailoverSession::flush()
 {
-  session.flush();
+
+    session.flush();
 }
 
 
@@ -104,9 +108,10 @@
 FailoverSession::markCompleted(const framing::SequenceNumber& id, 
                                bool cumulative, 
                                bool notifyPeer
-                              )
+)
 {
-  session.markCompleted ( id, cumulative, notifyPeer );
+
+    session.markCompleted ( id, cumulative, notifyPeer );
 }
 
 
@@ -116,7 +121,8 @@
 void 
 FailoverSession::executionSync()
 {
-  session.executionSync();
+
+    session.executionSync();
 }
 
 
@@ -124,11 +130,12 @@
 void 
 FailoverSession::executionResult ( const SequenceNumber& commandId, 
                                    const string& value
-                                 )
+)
 {
-  session.executionResult ( commandId, 
-                            value 
-                          );
+
+    session.executionResult ( commandId, 
+                              value 
+    );
 }
 
 
@@ -141,16 +148,17 @@
                                       uint8_t fieldIndex, 
                                       const string& description, 
                                       const FieldTable& errorInfo
-                                    )
+)
 {
-  session.executionException ( errorCode,
-                               commandId,
-                               classCode,
-                               commandCode,
-                               fieldIndex,
-                               description,
-                               errorInfo
-                             );
+
+    session.executionException ( errorCode,
+                                 commandId,
+                                 classCode,
+                                 commandCode,
+                                 fieldIndex,
+                                 description,
+                                 errorInfo
+    );
 }
 
 
@@ -160,13 +168,14 @@
                                    uint8_t acceptMode, 
                                    uint8_t acquireMode, 
                                    const MethodContent& content
-                                 )
+)
 {
-  session.messageTransfer ( destination,
-                            acceptMode,
-                            acquireMode,
-                            content
-                          );
+
+    session.messageTransfer ( destination,
+                              acceptMode,
+                              acquireMode,
+                              content
+    );
 }
 
 
@@ -174,7 +183,8 @@
 void 
 FailoverSession::messageAccept ( const SequenceSet& transfers )
 {
-  session.messageAccept ( transfers );
+
+    session.messageAccept ( transfers );
 }
 
 
@@ -183,12 +193,13 @@
 FailoverSession::messageReject ( const SequenceSet& transfers, 
                                  uint16_t code, 
                                  const string& text
-                               )
+)
 {
-  session.messageReject ( transfers, 
-                          code, 
-                          text 
-                        );
+
+    session.messageReject ( transfers, 
+                            code, 
+                            text 
+    );
 }
 
 
@@ -196,11 +207,12 @@
 void 
 FailoverSession::messageRelease ( const SequenceSet& transfers, 
                                   bool setRedelivered
-                                )
+)
 {
-  session.messageRelease ( transfers,
-                           setRedelivered
-                         );
+
+    session.messageRelease ( transfers,
+                             setRedelivered
+    );
 }
 
 
@@ -208,7 +220,8 @@
 qpid::framing::MessageAcquireResult 
 FailoverSession::messageAcquire ( const SequenceSet& transfers )
 {
-  return session.messageAcquire ( transfers );
+
+    return session.messageAcquire ( transfers );
 }
 
 
@@ -216,11 +229,12 @@
 qpid::framing::MessageResumeResult 
 FailoverSession::messageResume ( const string& destination, 
                                  const string& resumeId
-                               )
+)
 {
-  return session.messageResume ( destination,
-                                 resumeId
-                               );
+
+    return session.messageResume ( destination,
+                                   resumeId
+    );
 }
 
 
@@ -234,17 +248,18 @@
                                     const string& resumeId, 
                                     uint64_t resumeTtl, 
                                     const FieldTable& arguments
-                                  )
+)
 {
-  session.messageSubscribe ( queue,
-                             destination,
-                             acceptMode,
-                             acquireMode,
-                             exclusive,
-                             resumeId,
-                             resumeTtl,
-                             arguments
-                           );
+
+    session.messageSubscribe ( queue,
+                               destination,
+                               acceptMode,
+                               acquireMode,
+                               exclusive,
+                               resumeId,
+                               resumeTtl,
+                               arguments
+    );
 }
 
 
@@ -252,7 +267,8 @@
 void 
 FailoverSession::messageCancel ( const string& destinations )
 {
-  session.messageCancel ( destinations );
+
+    session.messageCancel ( destinations );
 }
 
 
@@ -260,11 +276,11 @@
 void 
 FailoverSession::messageSetFlowMode ( const string& destination, 
                                       uint8_t flowMode
-                                    )
+)
 {
-  session.messageSetFlowMode ( destination,
-                               flowMode
-                             );
+    session.messageSetFlowMode ( destination,
+                                 flowMode
+    );
 }
 
 
@@ -274,10 +290,10 @@
                              uint8_t unit, 
                              uint32_t value)
 {
-  session.messageFlow ( destination,
-                        unit,
-                        value
-                      );
+    session.messageFlow ( destination,
+                          unit,
+                          value
+    );
 }
 
 
@@ -285,7 +301,7 @@
 void 
 FailoverSession::messageFlush(const string& destination)
 {
-  session.messageFlush ( destination );
+    session.messageFlush ( destination );
 }
 
 
@@ -293,7 +309,7 @@
 void 
 FailoverSession::messageStop(const string& destination)
 {
-  session.messageStop ( destination );
+    session.messageStop ( destination );
 }
 
 
@@ -301,7 +317,7 @@
 void 
 FailoverSession::txSelect()
 {
-  session.txSelect ( );
+    session.txSelect ( );
 }
 
 
@@ -309,7 +325,7 @@
 void 
 FailoverSession::txCommit()
 {
-  session.txCommit ( );
+    session.txCommit ( );
 }
 
 
@@ -317,7 +333,7 @@
 void 
 FailoverSession::txRollback()
 {
-  session.txRollback ( );
+    session.txRollback ( );
 }
 
 
@@ -325,7 +341,7 @@
 void 
 FailoverSession::dtxSelect()
 {
-  session.dtxSelect ( );
+    session.dtxSelect ( );
 }
 
 
@@ -335,10 +351,10 @@
                           bool join, 
                           bool resume)
 {
-  return session.dtxStart ( xid,
-                            join,
-                            resume
-                          );
+    return session.dtxStart ( xid,
+                              join,
+                              resume
+    );
 }
 
 
@@ -348,10 +364,10 @@
                         bool fail, 
                         bool suspend)
 {
-  return session.dtxEnd ( xid,
-                          fail,
-                          suspend
-                        );
+    return session.dtxEnd ( xid,
+                            fail,
+                            suspend
+    );
 }
 
 
@@ -360,9 +376,9 @@
 FailoverSession::dtxCommit(const Xid& xid, 
                            bool onePhase)
 {
-  return session.dtxCommit ( xid,
-                             onePhase
-                           );
+    return session.dtxCommit ( xid,
+                               onePhase
+    );
 }
 
 
@@ -370,7 +386,7 @@
 void 
 FailoverSession::dtxForget(const Xid& xid)
 {
-  session.dtxForget ( xid );
+    session.dtxForget ( xid );
 }
 
 
@@ -378,7 +394,7 @@
 qpid::framing::DtxGetTimeoutResult 
 FailoverSession::dtxGetTimeout(const Xid& xid)
 {
-  return session.dtxGetTimeout ( xid );
+    return session.dtxGetTimeout ( xid );
 }
 
 
@@ -386,7 +402,7 @@
 qpid::framing::XaResult 
 FailoverSession::dtxPrepare(const Xid& xid)
 {
-  return session.dtxPrepare ( xid );
+    return session.dtxPrepare ( xid );
 }
 
 
@@ -394,7 +410,7 @@
 qpid::framing::DtxRecoverResult 
 FailoverSession::dtxRecover()
 {
-  return session.dtxRecover ( );
+    return session.dtxRecover ( );
 }
 
 
@@ -402,7 +418,7 @@
 qpid::framing::XaResult 
 FailoverSession::dtxRollback(const Xid& xid)
 {
-  return session.dtxRollback ( xid );
+    return session.dtxRollback ( xid );
 }
 
 
@@ -411,9 +427,9 @@
 FailoverSession::dtxSetTimeout(const Xid& xid, 
                                uint32_t timeout)
 {
-  session.dtxSetTimeout ( xid,
-                          timeout
-                        );
+    session.dtxSetTimeout ( xid,
+                            timeout
+    );
 }
 
 
@@ -427,14 +443,14 @@
                                  bool autoDelete, 
                                  const FieldTable& arguments)
 {
-  session.exchangeDeclare ( exchange,
-                            type,
-                            alternateExchange,
-                            passive,
-                            durable,
-                            autoDelete,
-                            arguments
-                          );
+    session.exchangeDeclare ( exchange,
+                              type,
+                              alternateExchange,
+                              passive,
+                              durable,
+                              autoDelete,
+                              arguments
+    );
 }
 
 
@@ -443,9 +459,9 @@
 FailoverSession::exchangeDelete(const string& exchange, 
                                 bool ifUnused)
 {
-  session.exchangeDelete ( exchange,
-                           ifUnused
-                         );
+    session.exchangeDelete ( exchange,
+                             ifUnused
+    );
 }
 
 
@@ -453,7 +469,7 @@
 qpid::framing::ExchangeQueryResult 
 FailoverSession::exchangeQuery(const string& name)
 {
-  return session.exchangeQuery ( name );
+    return session.exchangeQuery ( name );
 }
 
 
@@ -464,11 +480,11 @@
                               const string& bindingKey, 
                               const FieldTable& arguments)
 {
-  session.exchangeBind ( queue,
-                         exchange,
-                         bindingKey,
-                         arguments
-                       );
+    session.exchangeBind ( queue,
+                           exchange,
+                           bindingKey,
+                           arguments
+    );
 }
 
 
@@ -478,10 +494,10 @@
                                 const string& exchange, 
                                 const string& bindingKey)
 {
-  session.exchangeUnbind ( queue,
-                           exchange,
-                           bindingKey
-                         );
+    session.exchangeUnbind ( queue,
+                             exchange,
+                             bindingKey
+    );
 }
 
 
@@ -492,11 +508,11 @@
                                const string& bindingKey, 
                                const FieldTable& arguments)
 {
-  return session.exchangeBound ( exchange,
-                                 queue,
-                                 bindingKey,
-                                 arguments
-                               );
+    return session.exchangeBound ( exchange,
+                                   queue,
+                                   bindingKey,
+                                   arguments
+    );
 }
 
 
@@ -510,14 +526,14 @@
                               bool autoDelete, 
                               const FieldTable& arguments)
 {
-  session.queueDeclare ( queue,
-                         alternateExchange,
-                         passive,
-                         durable,
-                         exclusive,
-                         autoDelete,
-                         arguments
-                       );
+    session.queueDeclare ( queue,
+                           alternateExchange,
+                           passive,
+                           durable,
+                           exclusive,
+                           autoDelete,
+                           arguments
+    );
 }
 
 
@@ -527,10 +543,10 @@
                              bool ifUnused, 
                              bool ifEmpty)
 {
-  session.queueDelete ( queue,
-                        ifUnused,
-                        ifEmpty
-                      );
+    session.queueDelete ( queue,
+                          ifUnused,
+                          ifEmpty
+    );
 }
 
 
@@ -538,7 +554,7 @@
 void 
 FailoverSession::queuePurge(const string& queue)
 {
-  session.queuePurge ( queue) ;
+    session.queuePurge ( queue) ;
 }
 
 
@@ -546,7 +562,7 @@
 qpid::framing::QueueQueryResult 
 FailoverSession::queueQuery(const string& queue)
 {
-  return session.queueQuery ( queue );
+    return session.queueQuery ( queue );
 }
 
 
@@ -557,20 +573,20 @@
 void
 FailoverSession::prepareForFailover ( Connection newConnection )
 {
-  try
-  {
-    newSession = newConnection.newSession();
-  }
-  catch ( const std::exception& error )
-  {
-    throw Exception(QPID_MSG("Can't create failover session."));
-  }
-
-
-  if ( failoverSubscriptionManager )
-  {
-    failoverSubscriptionManager->prepareForFailover ( newSession );
-  }
+    try
+    {
+        newSession = newConnection.newSession();
+    }
+    catch ( const std::exception& error )
+    {
+        throw Exception(QPID_MSG("Can't create failover session."));
+    }
+
+    if ( failoverSubscriptionManager )
+    {
+        // 
+        failoverSubscriptionManager->prepareForFailover ( newSession );
+    }
 }
 
 
@@ -578,15 +594,16 @@
 void 
 FailoverSession::failover (  )
 {
-  if ( failoverSubscriptionManager )
-  {
-    failoverSubscriptionManager->failover ( );
-  }
-
-  session = newSession;
+    if ( failoverSubscriptionManager )
+    {
+        failoverSubscriptionManager->failover ( );
+    }
+    session = newSession;
 }
 
 
-
+void FailoverSession::setFailoverSubscriptionManager(FailoverSubscriptionManager* fsm) {
+    failoverSubscriptionManager = fsm;
+}
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h?rev=703575&r1=703574&r2=703575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h Fri Oct 10 14:19:46 2008
@@ -35,6 +35,8 @@
 #include "qpid/client/SessionImpl.h"
 #include "qpid/client/TypedResult.h"
 #include "qpid/shared_ptr.h"
+#include "qpid/sys/Mutex.h"
+
 #include <string>
 
 
@@ -295,11 +297,13 @@
 
     void failover ( );
 
-    FailoverSubscriptionManager * failoverSubscriptionManager;
-
+    void setFailoverSubscriptionManager(FailoverSubscriptionManager*);
 
   private:
+    typedef sys::Mutex::ScopedLock Lock;
+    sys::Mutex lock;
 
+    FailoverSubscriptionManager * failoverSubscriptionManager;
 
     Session session;
     Session newSession;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp?rev=703575&r1=703574&r2=703575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp Fri Oct 10 14:19:46 2008
@@ -33,11 +33,11 @@
 
 
 FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) :
-  name("no_name"),
-  newSessionIsValid(false)
+    name("no_name"),
+    newSessionIsValid(false)
 {
-  subscriptionManager = new SubscriptionManager(fs->session);
-  fs->failoverSubscriptionManager = this;
+    subscriptionManager = new SubscriptionManager(fs->session);
+    fs->setFailoverSubscriptionManager(this);
 }
 
 
@@ -45,8 +45,9 @@
 void
 FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
 {
-  newSession = _newSession;
-  newSessionIsValid = true;
+    Lock l(lock);
+    newSession = _newSession;
+    newSessionIsValid = true;
 }
 
 
@@ -54,27 +55,27 @@
 void
 FailoverSubscriptionManager::failover ( )
 {
-  subscriptionManager->stop();
-  // TODO -- save vector of boost bind fns.
+    // Stop the subscription manager thread so it can notice the failover in progress.
+    subscriptionManager->stop();
 }
 
 
 
 
 FailoverSubscriptionManager::subscribeArgs::subscribeArgs 
-  ( int _interface,
-    MessageListener * _listener,
-    LocalQueue * _localQueue,
-    const std::string * _queue,
-    const FlowControl * _flow,
-    const std::string * _tag
-  ) :
-  interface(_interface),
-  listener(_listener),
-  localQueue(_localQueue),
-  queue(_queue),
-  flow(_flow),
-  tag(_tag)
+( int _interface,
+  MessageListener * _listener,
+  LocalQueue * _localQueue,
+  const std::string * _queue,
+  const FlowControl * _flow,
+  const std::string * _tag
+) :
+    interface(_interface),
+    listener(_listener),
+    localQueue(_localQueue),
+    queue(_queue),
+    flow(_flow),
+    tag(_tag)
 {
 }
 
@@ -86,14 +87,14 @@
                                          const std::string & queue,
                                          const FlowControl & flow,
                                          const std::string & tag 
-                                       )
+)
 {
-  subscriptionManager->subscribe ( listener,
-                                   queue,
-                                   flow,
-                                   tag
-                                 );
-  subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) )  &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) );
+    subscriptionManager->subscribe ( listener,
+                                     queue,
+                                     flow,
+                                     tag
+    );
+    subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) )  &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) );
 }
 
 
@@ -103,14 +104,14 @@
                                          const std::string & queue,
                                          const FlowControl & flow,
                                          const std::string & tag
-                                       )
+)
 {
-  subscriptionManager->subscribe ( localQueue,
-                                   queue,
-                                   flow,
-                                   tag
-                                 );
-  subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) )  &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) );
+    subscriptionManager->subscribe ( localQueue,
+                                     queue,
+                                     flow,
+                                     tag
+    );
+    subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) )  &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) );
 }
 
 
@@ -119,14 +120,14 @@
 FailoverSubscriptionManager::subscribe ( MessageListener   & listener,
                                          const std::string & queue,
                                          const std::string & tag
-                                       )
+)
 {
-  subscriptionManager->subscribe ( listener,
-                                   queue,
-                                   tag
-                                 );
-  // TODO -- more than one subscription
-  subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) )  &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) );
+    subscriptionManager->subscribe ( listener,
+                                     queue,
+                                     tag
+    );
+    // TODO -- more than one subscription
+    subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) )  &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) );
 }
 
 
@@ -136,13 +137,13 @@
 FailoverSubscriptionManager::subscribe ( LocalQueue        & localQueue,
                                          const std::string & queue,
                                          const std::string & tag
-                                       )
+)
 {
-  subscriptionManager->subscribe ( localQueue,
-                                   queue,
-                                   tag
-                                 );
-  subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) );
+    subscriptionManager->subscribe ( localQueue,
+                                     queue,
+                                     tag
+    );
+    subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) );
 }
 
 
@@ -151,9 +152,10 @@
 FailoverSubscriptionManager::get ( Message & result,
                                    const std::string & queue,
                                    sys::Duration timeout
-                                 )
+)
 {
-  return subscriptionManager->get ( result, queue, timeout );
+
+    return subscriptionManager->get ( result, queue, timeout );
 }
 
 
@@ -161,7 +163,8 @@
 void 
 FailoverSubscriptionManager::cancel ( const std::string tag )
 {
-  subscriptionManager->cancel ( tag );
+
+    subscriptionManager->cancel ( tag );
 }
 
 
@@ -169,47 +172,40 @@
 void 
 FailoverSubscriptionManager::run ( ) // User Thread
 {
-  // FIXME mgoulish -- wait on a monitor here instead of this infinite loop
-  while ( 1 )
-  {
-    subscriptionManager->run ( );
-
-    // When we drop out of run, if there is a new Session
-    // waiting for us, this is a failover.  Otherwise, just
-    // return control to usercode.
-    sleep(1);  // FIXME mgoulish -- get rid of this when we have wait-on-monitor.
-
-    if ( newSessionIsValid )
-    {
-      delete subscriptionManager;
-      subscriptionManager = new SubscriptionManager(newSession);
-      // FIXME mgoulish make this an array of boost bind fns
-      //
-      for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin(); 
-            i < subscribeFns.end(); 
-            ++ i 
-          )
-      {
-        std::cerr << "MDEBUG  new new resubscribe.\n";
-        (*i) ();
-      }
-
-      newSessionIsValid = false;
-    }
-    else
+    while ( 1 )
     {
-      // break;  TODO -- fix this
+        subscriptionManager->run ( );
+        Lock l(lock);
+        // When we drop out of run, if there is a new Session
+        // waiting for us, this is a failover.  Otherwise, just
+        // return control to usercode.
+        if ( newSessionIsValid ) 
+        {
+            delete subscriptionManager;
+            subscriptionManager = new SubscriptionManager(newSession);
+            for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin(); 
+                  i < subscribeFns.end(); 
+                  ++ i 
+            )
+            {
+                (*i) ();
+            }
+            newSessionIsValid = false;
+        }
+        else
+        {
+            // Not a failover, return to user code.
+            break;
+        }
     }
-  }
-
 }
 
 
-
 void 
 FailoverSubscriptionManager::start ( )
 {
-  subscriptionManager->start ( );
+
+    subscriptionManager->start ( );
 }
 
 
@@ -217,7 +213,8 @@
 void 
 FailoverSubscriptionManager::setAutoStop ( bool set )
 {
-  subscriptionManager->setAutoStop ( set );
+
+    subscriptionManager->setAutoStop ( set );
 }
 
 
@@ -225,7 +222,8 @@
 void 
 FailoverSubscriptionManager::stop ( )
 {
-  subscriptionManager->stop ( );
+
+    subscriptionManager->stop ( );
 }
 
 
@@ -233,9 +231,10 @@
 void 
 FailoverSubscriptionManager::setFlowControl ( const std::string & destination,
                                               const FlowControl & flow 
-                                            )
+)
 {
-  subscriptionManager->setFlowControl ( destination, flow );
+
+    subscriptionManager->setFlowControl ( destination, flow );
 }
 
 
@@ -243,7 +242,8 @@
 void 
 FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow )
 {
-  subscriptionManager->setFlowControl ( flow );
+
+    subscriptionManager->setFlowControl ( flow );
 }
 
 
@@ -251,7 +251,8 @@
 const FlowControl & 
 FailoverSubscriptionManager::getFlowControl ( ) const
 {
-  return subscriptionManager->getFlowControl ( );
+
+    return subscriptionManager->getFlowControl ( );
 }
 
 
@@ -262,13 +263,14 @@
                                               uint32_t messages,
                                               uint32_t bytes,
                                               bool window 
-                                            )
+)
 {
-  subscriptionManager->setFlowControl ( tag,
-                                        messages,
-                                        bytes,
-                                        window
-                                      );
+
+    subscriptionManager->setFlowControl ( tag,
+                                          messages,
+                                          bytes,
+                                          window
+    );
 }
 
 
@@ -277,12 +279,13 @@
 FailoverSubscriptionManager::setFlowControl ( uint32_t messages,
                                               uint32_t bytes,
                                               bool window
-                                            )
+)
 {
-  subscriptionManager->setFlowControl ( messages,
-                                        bytes,
-                                        window
-                                      );
+
+    subscriptionManager->setFlowControl ( messages,
+                                          bytes,
+                                          window
+    );
 }
 
 
@@ -290,7 +293,8 @@
 void 
 FailoverSubscriptionManager::setAcceptMode ( bool required )
 {
-  subscriptionManager->setAcceptMode ( required );
+
+    subscriptionManager->setAcceptMode ( required );
 }
 
 
@@ -298,7 +302,8 @@
 void 
 FailoverSubscriptionManager::setAcquireMode ( bool acquire )
 {
-  subscriptionManager->setAcquireMode ( acquire );
+
+    subscriptionManager->setAcquireMode ( acquire );
 }
 
 
@@ -306,7 +311,8 @@
 void 
 FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck )
 {
-  subscriptionManager->setAckPolicy ( autoAck );
+
+    subscriptionManager->setAckPolicy ( autoAck );
 }
 
 
@@ -314,16 +320,12 @@
 AckPolicy & 
 FailoverSubscriptionManager::getAckPolicy()
 {
-  return subscriptionManager->getAckPolicy ( );
+
+    return subscriptionManager->getAckPolicy ( );
 }
 
 
 
-void 
-FailoverSubscriptionManager::registerFailoverHandler ( boost::function<void ()> /* fh */ )
-{
-  // FIXME mgoulish -- get rid of this mechanism -- i think it's unused.
-}
 
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h?rev=703575&r1=703574&r2=703575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h Fri Oct 10 14:19:46 2008
@@ -33,6 +33,7 @@
 #include <qpid/client/LocalQueue.h>
 #include <qpid/client/FlowControl.h>
 #include <qpid/sys/Runnable.h>
+#include <qpid/sys/Mutex.h>
 
 
 
@@ -106,7 +107,6 @@
 
     AckPolicy & getAckPolicy();
 
-    void registerFailoverHandler ( boost::function<void ()> fh );
 
     // Get ready for a failover.
     void prepareForFailover ( Session newSession );
@@ -116,6 +116,8 @@
 
 
   private:
+    typedef sys::Mutex::ScopedLock Lock;
+    sys::Mutex lock;
     
     SubscriptionManager * subscriptionManager;