You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/10 06:49:49 UTC

svn commit: r703319 - in /incubator/qpid/trunk/qpid/cpp: ./ examples/ examples/failover/ src/ src/qpid/broker/ src/qpid/client/ src/qpid/cluster/

Author: aconway
Date: Thu Oct  9 21:49:48 2008
New Revision: 703319

URL: http://svn.apache.org/viewvc?rev=703319&view=rev
Log:
QPID-1340 froM Mick Goulish: preliminary client-side failover support.

Added:
    incubator/qpid/trunk/qpid/cpp/examples/failover/   (with props)
    incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am
    incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/configure.ac
    incubator/qpid/trunk/qpid/cpp/examples/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp

Modified: incubator/qpid/trunk/qpid/cpp/configure.ac
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/configure.ac?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/configure.ac (original)
+++ incubator/qpid/trunk/qpid/cpp/configure.ac Thu Oct  9 21:49:48 2008
@@ -331,6 +331,7 @@
   examples/fanout/Makefile
   examples/pub-sub/Makefile
   examples/request-response/Makefile
+  examples/failover/Makefile
   examples/xml-exchange/Makefile
   managementgen/Makefile
   etc/Makefile

Modified: incubator/qpid/trunk/qpid/cpp/examples/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/Makefile.am?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/Makefile.am Thu Oct  9 21:49:48 2008
@@ -1,4 +1,4 @@
-SUBDIRS = direct fanout pub-sub request-response
+SUBDIRS = direct fanout pub-sub request-response failover
 if HAVE_XML
   SUBDIRS += xml-exchange
 endif

Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Oct  9 21:49:48 2008
@@ -0,0 +1 @@
+Makefile.in

Added: incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am Thu Oct  9 21:49:48 2008
@@ -0,0 +1,21 @@
+examplesdir=$(pkgdatadir)/examples/direct
+
+include $(top_srcdir)/examples/makedist.mk
+
+noinst_PROGRAMS=direct_producer listener declare_queues
+direct_producer_SOURCES=direct_producer.cpp
+direct_producer_LDADD=$(CLIENT_LIB)
+
+listener_SOURCES=listener.cpp
+listener_LDADD=$(CLIENT_LIB)
+
+declare_queues_SOURCES=declare_queues.cpp
+declare_queues_LDADD=$(CLIENT_LIB)
+
+examples_DATA=               \
+	direct_producer.cpp  \
+	listener.cpp         \
+	declare_queues.cpp   \
+        $(MAKEDIST)
+
+# FIXME aconway 2008-10-10: add verify scripts.

Added: incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp Thu Oct  9 21:49:48 2008
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverConnection.h>
+#include <qpid/client/Session.h>
+
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+#include <fstream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+
+using namespace std;
+
+
+
+
+int 
+main ( int argc, char ** argv) 
+{
+  if ( argc < 3 )
+  {
+    std::cerr << "Usage: ./declare_queues host cluster_port_file_name\n";
+    std::cerr << "i.e. for host: 127.0.0.1\n";
+    exit(1);
+  }
+
+  const char * host = argv[1];
+  int port = atoi(argv[2]);
+
+
+  try 
+  {
+    FailoverConnection connection;
+    FailoverSession    * session;
+
+    connection.open ( host, port );
+    session = connection.newSession();
+
+    session->queueDeclare ( "message_queue");
+    
+    /*
+    session->exchangeBind 
+      ( arg::exchange="amq.direct", 
+        arg::queue="message_queue", 
+        arg::bindingKey="routing_key"
+      );
+     * */
+    session->exchangeBind ( "message_queue",
+                           "amq.direct", 
+                           "routing_key"
+                         );
+    connection.close();
+    return 0;
+  }
+  catch ( const std::exception& error ) 
+  {
+    std::cout << error.what() << std::endl;
+  }
+
+  return 1;
+}
+
+
+
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp Thu Oct  9 21:49:48 2008
@@ -0,0 +1,148 @@
+#include <qpid/client/FailoverConnection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
+#include <qpid/client/Message.h>
+
+
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+#include <fstream>
+
+#include <sstream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+
+
+
+int 
+main ( int argc, char ** argv) 
+{
+  struct timeval broker_killed_time     = {0,0},
+                 failover_complete_time = {0,0},
+                 duration               = {0,0};
+
+
+  if ( argc < 3 )
+  {
+    std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n";
+    std::cerr << "i.e. for host: 127.0.0.1\n";
+    exit(1);
+  }
+
+  char const * host = argv[1];
+  int  port = atoi(argv[2]);
+  char const * broker_to_kill = 0;
+
+  if ( argc > 3 )
+  {
+    broker_to_kill = argv[3];
+    std::cerr << "main:  Broker marked for death is process ID " 
+              << broker_to_kill
+              << endl;
+  }
+  else
+  {
+    std::cerr << "PRODUCER main: there is no broker to kill.\n";
+  }
+
+  FailoverConnection connection;
+  FailoverSession    * session;
+  Message message;
+
+  string program_name = "PRODUCER";
+
+
+  connection.failoverCompleteTime = & failover_complete_time;
+  connection.name = program_name;
+  connection.open ( host, port );
+
+  session = connection.newSession();
+  session->name    = program_name;
+
+  int send_this_many = 30,
+      messages_sent  = 0;
+
+  while ( messages_sent < send_this_many )
+  {
+    if ( (messages_sent == 13) && broker_to_kill )
+    {
+      char command[1000];
+      std::cerr << program_name << " killing broker " << broker_to_kill << ".\n";
+      sprintf(command, "kill -9 %s", broker_to_kill);
+      system ( command );
+      gettimeofday ( & broker_killed_time, 0 );
+    }
+
+    message.getDeliveryProperties().setRoutingKey("routing_key"); 
+
+    std::cerr << "sending message " 
+              << messages_sent
+              << " of " 
+              << send_this_many 
+              << ".\n";
+
+    stringstream message_data;
+    message_data << messages_sent;
+    message.setData(message_data.str());
+
+    try
+    {
+      /* MICK FIXME
+      session.messageTransfer ( arg::content=message,  
+                                arg::destination="amq.direct"
+                              ); */
+      session->messageTransfer ( "amq.direct",
+                                1,
+                                0,
+                                message
+                              );
+    }
+    catch ( const std::exception& error) 
+    {
+      cerr << program_name << " exception: " << error.what() << endl;
+    }
+
+    sleep ( 1 );
+    ++ messages_sent;
+  }
+
+  message.setData ( "That's all, folks!" );
+
+  /* MICK FIXME
+  session.messageTransfer ( arg::content=message,  
+                            arg::destination="amq.direct"
+                          ); 
+   */
+  session->messageTransfer ( "amq.direct",
+                            1,
+                            0,
+                            message
+                          ); 
+
+  session->sync();
+  connection.close();
+
+  // This will be incorrect if you killed more than one...
+  if ( broker_to_kill )
+  {
+    timersub ( & failover_complete_time, 
+               & broker_killed_time, 
+               & duration 
+             );
+    fprintf ( stderr, 
+              "Failover time: %ld.%.6ld\n",
+              duration.tv_sec,
+              duration.tv_usec
+            );
+  }
+
+  return 0;  
+}
+
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp Thu Oct  9 21:49:48 2008
@@ -0,0 +1,249 @@
+
+#include <qpid/client/FailoverConnection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/SubscriptionManager.h>
+
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+#include <fstream>
+
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+
+
+
+struct Recorder 
+{
+  unsigned int max_messages;
+  unsigned int * messages_received;
+
+  Recorder ( )
+  {
+    max_messages = 1000;
+    messages_received = new unsigned int [ max_messages ];
+    memset ( messages_received, 0, max_messages * sizeof(int) );
+  }
+
+
+  void
+  received ( int i )
+  {
+    messages_received[i] ++;
+  }
+
+
+
+  void
+  report ( )
+  {
+    int i;
+
+    int last_received_message = 0;
+
+    vector<unsigned int> missed_messages,
+                         multiple_messages;
+
+    /*----------------------------------------------------
+       Collect indices of missed and multiple messages.
+    ----------------------------------------------------*/
+    bool seen_first_message = false;
+    for ( i = max_messages - 1; i >= 0; -- i )
+    {
+      if ( ! seen_first_message )
+      {
+        if ( messages_received [i] > 0 )
+        {
+          seen_first_message = true;
+          last_received_message = i;
+        }
+      }
+      else
+      {
+        if ( messages_received [i] == 0 )
+          missed_messages.push_back ( i );
+        else
+        if ( messages_received [i] > 1 )
+        {
+          multiple_messages.push_back ( i );
+        }
+      }
+    }
+
+    /*--------------------------------------------
+       Report missed messages.
+    --------------------------------------------*/
+    char const * verb = ( missed_messages.size() == 1 ) 
+                        ? " was "
+                        : " were ";
+
+    char const * plural = ( missed_messages.size() == 1 ) 
+                          ? "."
+                          : "s.";
+
+    std::cerr << "Listener::shutdown: There"
+              << verb
+              << missed_messages.size()
+              << " missed message"
+              << plural
+              << endl;
+
+    for ( i = 0; i < int(missed_messages.size()); ++ i )
+    {
+      std::cerr << "    " << i << " was missed.\n";
+    }
+
+
+    /*--------------------------------------------
+       Report multiple messages.
+    --------------------------------------------*/
+    verb = ( multiple_messages.size() == 1 ) 
+           ? " was "
+           : " were ";
+
+    plural = ( multiple_messages.size() == 1 ) 
+           ? "."
+           : "s.";
+
+    std::cerr << "Listener::shutdown: There"
+              << verb
+              << multiple_messages.size()
+              << " multiple message"
+              << plural
+              << endl;
+
+    for ( i = 0; i < int(multiple_messages.size()); ++ i )
+    {
+      std::cerr << "    " 
+                << multiple_messages[i] 
+                << " was received " 
+                << messages_received [ multiple_messages[i] ]
+                << " times.\n";
+    }
+    
+    /*
+    for ( i = 0; i < last_received_message; ++ i )
+    {
+      std::cerr << "Message " << i << ": " << messages_received[i] << std::endl;
+    }
+    */
+  }
+
+};
+
+
+      
+
+struct Listener : public MessageListener
+{
+  FailoverSubscriptionManager & subscriptionManager;
+  Recorder & recorder;
+
+
+  Listener ( FailoverSubscriptionManager& subs, 
+             Recorder & recorder
+           );
+
+  void shutdown() { recorder.report(); }
+  void parse_message ( std::string const & msg );
+  
+  virtual void received ( Message & message );
+};
+
+
+
+
+
+Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) : 
+  subscriptionManager(s),
+  recorder(r)
+{
+}
+
+
+
+
+
+void 
+Listener::received ( Message & message ) 
+{
+  std::cerr << "Listener received: " << message.getData() << std::endl;
+  if (message.getData() == "That's all, folks!") 
+  {
+    std::cout << "Shutting down listener for " << message.getDestination()
+              << std::endl;
+    subscriptionManager.cancel(message.getDestination());
+
+    shutdown();
+  }
+  else
+  {
+    parse_message ( message.getData() );
+  }
+}
+
+
+
+
+
+void
+Listener::parse_message ( const std::string & msg )
+{
+  int msg_number;
+  if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) ) 
+  {
+    std::cerr << "Listener::parse_message error: Can't read message number from this message: |" << msg_number << "|\n";
+    return;
+  }
+  recorder.received ( msg_number );
+}
+
+
+
+
+
+
+int 
+main ( int argc, char ** argv ) 
+{
+  string program_name = "LISTENER";
+
+  if ( argc < 3 )
+  {
+    std::cerr << "Usage: ./listener host cluster_port_file_name\n";
+    std::cerr << "i.e. for host: 127.0.0.1\n";
+    exit(1);
+  }
+
+  char const * host = argv[1];
+  int port = atoi(argv[2]);
+
+  FailoverConnection connection;
+  FailoverSession    * session;
+  Recorder recorder;
+
+  connection.name = program_name; 
+
+  connection.open ( host, port );
+  session = connection.newSession();
+  session->name = program_name; 
+
+  FailoverSubscriptionManager subscriptions ( session );
+  subscriptions.name = program_name;
+  Listener listener ( subscriptions, recorder );
+  subscriptions.subscribe ( listener, "message_queue" );
+  subscriptions.run ( );
+
+  connection.close();
+
+  return 1;   
+}
+
+
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Oct  9 21:49:48 2008
@@ -371,11 +371,14 @@
   qpid/client/Bounds.cpp			\
   qpid/client/Connection.cpp			\
   qpid/client/ConnectionHandler.cpp		\
-  qpid/client/ConnectionImpl.cpp \
+  qpid/client/ConnectionImpl.cpp                \
   qpid/client/ConnectionSettings.cpp		\
-  qpid/client/Connector.cpp			\
+  qpid/client/Connector.cpp	                \
   qpid/client/Demux.cpp				\
   qpid/client/Dispatcher.cpp			\
+  qpid/client/FailoverConnection.cpp            \
+  qpid/client/FailoverSession.cpp               \
+  qpid/client/FailoverSubscriptionManager.cpp   \
   qpid/client/FailoverListener.h		\
   qpid/client/FailoverListener.cpp		\
   qpid/client/Future.cpp			\
@@ -508,6 +511,9 @@
   qpid/client/Demux.h \
   qpid/client/Dispatcher.h \
   qpid/client/Execution.h \
+  qpid/client/FailoverConnection.h \
+  qpid/client/FailoverSession.h \
+  qpid/client/FailoverSubscriptionManager.h \
   qpid/client/FlowControl.h \
   qpid/client/Future.h \
   qpid/client/FutureCompletion.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Oct  9 21:49:48 2008
@@ -140,7 +140,8 @@
         qpid::SessionState::Configuration(
             conf.replayFlushLimit*1024, // convert kb to bytes.
             conf.replayHardLimit*1024),
-        *this)
+        *this),
+   getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
 {
     if(conf.enableMgmt){
         QPID_LOG(info, "Management enabled");
@@ -426,5 +427,15 @@
 
 boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; }
 
+std::vector<Url> 
+Broker::getKnownBrokersImpl()
+{
+  knownBrokers.clear();
+  knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( getPort() ) );
+  return knownBrokers;
+}
+
+
+
 }} // namespace qpid::broker
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Oct  9 21:49:48 2008
@@ -113,6 +113,9 @@
 
     void declareStandardExchange(const std::string& name, const std::string& type);
 
+    std::vector<Url> knownBrokers;
+    std::vector<Url> getKnownBrokersImpl();
+
 
   public:
 
@@ -191,6 +194,9 @@
 
     boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
     void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
+
+    boost::function<std::vector<Url> ()> getKnownBrokers;
+    
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Thu Oct  9 21:49:48 2008
@@ -28,6 +28,7 @@
 #include "qpid/framing/ServerInvoker.h"
 #include "qpid/framing/enum.h"
 #include "qpid/log/Statement.h"
+#include "qpid/Url.h"
 #include "AclModule.h"
 
 using namespace qpid;
@@ -127,8 +128,11 @@
 void ConnectionHandler::Handler::open(const string& /*virtualHost*/,
                                       const framing::Array& /*capabilities*/, bool /*insist*/)
 {
-    framing::Array knownhosts;
-    client.openOk(knownhosts);
+    std::vector<Url> urls = connection.broker.getKnownBrokers();
+    framing::Array array(0x95); // str16 array
+    for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i) 
+        array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
+    client.openOk(array);
 }
 
         

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Thu Oct  9 21:49:48 2008
@@ -99,6 +99,15 @@
     return impl && impl->isOpen();
 }
 
+void 
+Connection::registerFailureCallback ( boost::function<void ()> fn ) {
+    failureCallback = fn;
+    if ( impl )
+        impl->registerFailureCallback ( fn );
+}
+
+
+
 void Connection::open(const ConnectionSettings& settings)
 {
     if (isOpen())
@@ -106,6 +115,8 @@
 
     impl = shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings));
     impl->open();
+    if ( failureCallback )
+        impl->registerFailureCallback ( failureCallback );
 }
 
 Session Connection::newSession(const std::string& name, uint32_t timeout) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Thu Oct  9 21:49:48 2008
@@ -45,9 +45,13 @@
 {
     framing::ProtocolVersion version;
 
+    boost::function<void ()> failureCallback;
+
+
   protected:
     boost::shared_ptr<ConnectionImpl> impl;
 
+
   public:
     /**
      * Creates a connection object, but does not open the connection.
@@ -168,6 +172,7 @@
     bool isOpen() const;
 
     std::vector<Url> getKnownBrokers();
+    void registerFailureCallback ( boost::function<void ()> fn );
 
   friend class ConnectionAccess; ///<@internal
   friend class SessionBase_0_10; ///<@internal

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Thu Oct  9 21:49:48 2008
@@ -157,13 +157,17 @@
     proxy.open(virtualhost, capabilities, insist);
 }
 
-void ConnectionHandler::openOk(const framing::Array& /*knownHosts*/)
+void ConnectionHandler::openOk ( const framing::Array& knownBrokers )
 {
     checkState(OPENING, INVALID_STATE_OPEN_OK);
-    //TODO: store knownHosts for reconnection etc
+    knownBrokersUrls.clear();
+    framing::Array::ValueVector::const_iterator i;
+    for ( i = knownBrokers.begin(); i != knownBrokers.end(); ++i )
+        knownBrokersUrls.push_back(Url((*i)->get<std::string>()));
     setState(OPEN);
 }
 
+
 void ConnectionHandler::redirect(const std::string& /*host*/, const Array& /*knownHosts*/)
 {
     throw NotImplementedException("Redirection received from broker; not yet implemented in client");

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h Thu Oct  9 21:49:48 2008
@@ -32,6 +32,7 @@
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/InputHandler.h"
+#include "qpid/Url.h"
 
 namespace qpid {
 namespace client {
@@ -103,6 +104,8 @@
 
     CloseListener onClose;
     ErrorListener onError;
+
+    std::vector<Url> knownBrokersUrls;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Thu Oct  9 21:49:48 2008
@@ -151,13 +151,20 @@
 
 void ConnectionImpl::shutdown() {
     Mutex::ScopedLock l(lock);
-    if (handler.isClosed()) return;
+    if (handler.isClosed()) 
+    {
+    std::cerr << "MDEBUG  ConnectionImpl::shutdown -- returning w/o failure callback!\n";
+    return;
+    }
     // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have
     // an appropriate close-code. connection-forced is not right.
     if (!handler.isClosing()) 
         closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
     setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
     handler.fail(CONN_CLOSED);
+
+    if ( failureCallback )
+      failureCallback();
 }
 
 void ConnectionImpl::erase(uint16_t ch) {
@@ -171,8 +178,8 @@
 }
     
 std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
-    // FIXME aconway 2008-10-08: initialize failover list from openOk or settings
-    return failover ? failover->getKnownBrokers() : std::vector<qpid::Url>();
+    // FIXME aconway 2008-10-08: ensure we never return empty list, always include self Url.
+    return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls;
 }
 
 boost::shared_ptr<SessionImpl>  ConnectionImpl::newSession(const std::string& name, uint32_t timeout, uint16_t channel) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Thu Oct  9 21:49:48 2008
@@ -69,6 +69,8 @@
     void idleIn();
     void shutdown();
 
+    boost::function<void ()> failureCallback;
+
   public:
     ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings);
     ~ConnectionImpl();
@@ -82,12 +84,11 @@
     void close();
     void handle(framing::AMQFrame& frame);
     void erase(uint16_t channel);
-    void stopFailoverListener();
-    
     const ConnectionSettings& getNegotiatedSettings();
 
     std::vector<Url> getKnownBrokers();
-
+    void registerFailureCallback ( boost::function<void ()> fn ) { failureCallback = fn; }
+    void stopFailoverListener();
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Thu Oct  9 21:49:48 2008
@@ -49,7 +49,10 @@
 }
 
 Dispatcher::Dispatcher(const Session& s, const std::string& q)
-    : session(s), running(false), autoStop(true)
+    : session(s), 
+      running(false), 
+      autoStop(true),
+      failoverHandler(0)
 {
     queue = q.empty() ? 
         session.getExecution().getDemux().getDefault() : 
@@ -91,9 +94,20 @@
         }
         session.sync(); // Make sure all our acks are received before returning.
     }
-    catch (const ClosedException&) {} //ignore it and return
+    catch (const ClosedException& e) 
+    { 
+      QPID_LOG(debug, "Ignored exception in client dispatch thread: " << e.what());
+    } //ignore it and return
     catch (const std::exception& e) {
         QPID_LOG(error, "Exception in client dispatch thread: " << e.what());
+      if ( failoverHandler )
+      {
+        failoverHandler();
+      }
+      else
+      {
+        QPID_LOG(info, "No dispatcher failover handler registered.");
+      }
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h Thu Oct  9 21:49:48 2008
@@ -69,6 +69,8 @@
     Subscriber::shared_ptr find(const std::string& name);
     bool isStopped();
 
+    boost::function<void ()> failoverHandler;
+
 public:
     Dispatcher(const Session& session, const std::string& queue = "");
 
@@ -77,6 +79,11 @@
     void stop();
     void setAutoStop(bool b);
 
+    void registerFailoverHandler ( boost::function<void ()> fh )
+    {
+      failoverHandler = fh;
+    }
+
     void listen(MessageListener* listener, AckPolicy autoAck=AckPolicy());
     void listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck=AckPolicy());
     void cancel(const std::string& destination);

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp Thu Oct  9 21:49:48 2008
@@ -0,0 +1,185 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+
+#include "qpid/log/Statement.h"
+#include "qpid/client/FailoverConnection.h"
+#include "qpid/client/ConnectionSettings.h"
+
+#include <iostream>
+#include <fstream>
+
+using namespace std;
+
+
+namespace qpid {
+namespace client {
+
+
+FailoverConnection::FailoverConnection ( ) :
+    name(),
+    failoverCompleteTime(0)
+{
+    connection.registerFailureCallback
+        ( boost::bind(&FailoverConnection::failover, this));
+}
+
+FailoverConnection::~FailoverConnection () {}
+
+void 
+FailoverConnection::open ( const std::string& host,
+                           int   port,
+                           const std::string& uid,
+                           const std::string& pwd,
+                           const std::string& virtualhost, 
+                           uint16_t maxFrameSize
+)
+{
+    ConnectionSettings settings;
+
+    settings.host         = host;
+    settings.port         = port;
+    settings.username     = uid;
+    settings.username     = uid;
+    settings.password     = pwd;
+    settings.virtualhost  = virtualhost;
+    settings.maxFrameSize = maxFrameSize;
+    settings.host         = host;
+
+    open ( settings );
+}
+
+
+void
+FailoverConnection::open ( ConnectionSettings & settings )
+{
+    connection.open ( settings );
+}
+
+
+
+void 
+FailoverConnection::close ( )
+{
+    connection.close();
+}
+
+
+
+FailoverSession *
+FailoverConnection::newSession ( const std::string& /* name */ )
+{
+    FailoverSession * fs = new FailoverSession;
+    sessions.push_back ( fs );
+    fs->session = connection.newSession();
+    return fs;
+}
+
+
+
+void
+FailoverConnection::resume ( FailoverSession & failoverSession )
+{
+    connection.resume ( failoverSession.session );
+}
+
+
+bool 
+FailoverConnection::isOpen() const
+{
+    return connection.isOpen();
+}
+
+
+void 
+FailoverConnection::getKnownBrokers ( std::vector<std::string> & /*v*/ )
+{
+}
+
+
+void 
+FailoverConnection::registerFailureCallback ( boost::function<void ()> /*fn*/ )
+{
+}
+
+void 
+FailoverConnection::failover ( )
+{
+    std::vector<Url> knownBrokers = connection.getKnownBrokers();
+    if (knownBrokers.empty())
+         throw Exception(QPID_MSG("FailoverConnection::failover " << name << " no known brokers."));
+
+    Connection newConnection;
+    for (std::vector<Url>::iterator i = knownBrokers.begin(); i != knownBrokers.end(); ++i) {
+        try {
+            newConnection.open(*i);
+            break;
+        }
+        catch (const std::exception& e) {
+            QPID_LOG(info, "Could not fail-over to " << *i << ": " << e.what());
+            if ((i + 1) == knownBrokers.end())
+                throw;
+        }
+    }
+  
+    /*
+     * We have a valid new connection.  Tell all the sessions
+     * (and, through them, their SessionManagers and whatever else)
+     * that we are about to failover to this new Connection.
+     */
+    // FIXME mgoulish -- get rid of two-passes here.
+    std::vector<FailoverSession *>::iterator sessions_iterator;
+
+    for ( sessions_iterator = sessions.begin();
+          sessions_iterator < sessions.end();
+          ++ sessions_iterator
+    )
+    {
+        FailoverSession * fs = * sessions_iterator;
+        fs->prepareForFailover ( newConnection );
+    }
+
+    /*
+     * Tell all sessions to actually failover to the new connection.
+     */
+    for ( sessions_iterator = sessions.begin();
+          sessions_iterator < sessions.end();
+          ++ sessions_iterator
+    )
+    {
+        FailoverSession * fs = * sessions_iterator;
+        fs->failover ( );
+    }
+
+    connection = newConnection;
+
+    // FIXME aconway 2008-10-09: use sys/Time.h functions.
+    if ( failoverCompleteTime )
+    {
+        gettimeofday ( failoverCompleteTime, 0 );
+    }
+}
+
+
+
+
+}} // namespace qpid::client

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h Thu Oct  9 21:49:48 2008
@@ -0,0 +1,103 @@
+#ifndef QPID_CLIENT_FAILOVERCONNECTION_H
+#define QPID_CLIENT_FAILOVERCONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <string>
+
+#include "qpid/client/Connection.h"
+#include "qpid/client/FailoverConnection.h"
+#include "qpid/client/FailoverSession.h"
+#include "qpid/client/FailoverSubscriptionManager.h"
+
+
+
+namespace qpid {
+namespace client {
+
+class ConnectionSettings;
+
+
+class FailoverConnection
+{
+  public:
+
+    FailoverConnection ( );
+
+    ~FailoverConnection ( );
+
+    void open ( const std::string& host,
+                int   port,
+                const std::string& uid = "guest",
+                const std::string& pwd = "guest",
+                const std::string& virtualhost = "/", 
+                uint16_t maxFrameSize=65535
+    );
+
+    void open ( ConnectionSettings & settings );
+
+    void close ( );
+
+    FailoverSession * newSession ( const std::string& name = std::string() );
+
+    void resume ( FailoverSession & session );
+
+    bool isOpen() const;
+
+    void getKnownBrokers ( std::vector<std::string> & v );
+
+
+    // public interface specific to Failover:
+
+    void registerFailureCallback ( boost::function<void ()> fn );
+
+    // If you have more than 1 connection and you want to give them 
+    // separate names for debugging...
+    std::string name; 
+
+    void failover ( );
+
+    struct timeval * failoverCompleteTime;
+
+
+  private:
+
+    std::string host;
+
+    Connection connection;
+
+    int currentPortNumber;
+
+    boost::function<void ()> clientFailoverCallback;
+
+    std::vector<FailoverSession *> sessions;
+
+
+  friend class FailoverSession;
+  friend class FailoverSessionManager;
+};
+
+}} // namespace qpid::client
+
+
+#endif  /*!QPID_CLIENT_FAILOVERCONNECTION_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp Thu Oct  9 21:49:48 2008
@@ -0,0 +1,592 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <iostream>
+#include <fstream>
+
+
+#include "qpid/log/Logger.h"
+#include "qpid/log/Options.h"
+#include "qpid/log/Statement.h"
+
+#include "qpid/client/FailoverConnection.h"
+#include "qpid/client/FailoverSession.h"
+
+
+using namespace std;
+
+
+namespace qpid {
+namespace client {
+
+FailoverSession::FailoverSession ( ) :
+  name("no_name")
+{
+  // The session is created by FailoverConnection::newSession
+  failoverSubscriptionManager = 0;
+}
+
+
+FailoverSession::~FailoverSession ( )
+{
+}
+
+
+framing::FrameSet::shared_ptr 
+FailoverSession::get()
+{
+  return session.get();
+}
+
+
+SessionId 
+FailoverSession::getId() const
+{
+  return session.getId();
+}
+
+
+void 
+FailoverSession::close()
+{
+  session.close();
+}
+
+
+void 
+FailoverSession::sync()
+{
+  session.sync();
+}
+
+
+uint32_t 
+FailoverSession::timeout(uint32_t /*seconds*/ )
+{
+  // MICK WTF?  return session.timeout ( seconds );
+  return 0;
+}
+
+
+Execution& 
+FailoverSession::getExecution()
+{
+  return session.getExecution();
+}
+
+
+void 
+FailoverSession::flush()
+{
+  session.flush();
+}
+
+
+void 
+FailoverSession::markCompleted(const framing::SequenceNumber& id, 
+                               bool cumulative, 
+                               bool notifyPeer
+                              )
+{
+  session.markCompleted ( id, cumulative, notifyPeer );
+}
+
+
+
+// Wrapped functions from Session ----------------------------
+
+void 
+FailoverSession::executionSync()
+{
+  session.executionSync();
+}
+
+
+
+void 
+FailoverSession::executionResult ( const SequenceNumber& commandId, 
+                                   const string& value
+                                 )
+{
+  session.executionResult ( commandId, 
+                            value 
+                          );
+}
+
+
+
+void 
+FailoverSession::executionException ( uint16_t errorCode, 
+                                      const SequenceNumber& commandId, 
+                                      uint8_t classCode, 
+                                      uint8_t commandCode, 
+                                      uint8_t fieldIndex, 
+                                      const string& description, 
+                                      const FieldTable& errorInfo
+                                    )
+{
+  session.executionException ( errorCode,
+                               commandId,
+                               classCode,
+                               commandCode,
+                               fieldIndex,
+                               description,
+                               errorInfo
+                             );
+}
+
+
+
+void 
+FailoverSession::messageTransfer ( const string& destination, 
+                                   uint8_t acceptMode, 
+                                   uint8_t acquireMode, 
+                                   const MethodContent& content
+                                 )
+{
+  session.messageTransfer ( destination,
+                            acceptMode,
+                            acquireMode,
+                            content
+                          );
+}
+
+
+
+void 
+FailoverSession::messageAccept ( const SequenceSet& transfers )
+{
+  session.messageAccept ( transfers );
+}
+
+
+
+void 
+FailoverSession::messageReject ( const SequenceSet& transfers, 
+                                 uint16_t code, 
+                                 const string& text
+                               )
+{
+  session.messageReject ( transfers, 
+                          code, 
+                          text 
+                        );
+}
+
+
+
+void 
+FailoverSession::messageRelease ( const SequenceSet& transfers, 
+                                  bool setRedelivered
+                                )
+{
+  session.messageRelease ( transfers,
+                           setRedelivered
+                         );
+}
+
+
+
+qpid::framing::MessageAcquireResult 
+FailoverSession::messageAcquire ( const SequenceSet& transfers )
+{
+  return session.messageAcquire ( transfers );
+}
+
+
+
+qpid::framing::MessageResumeResult 
+FailoverSession::messageResume ( const string& destination, 
+                                 const string& resumeId
+                               )
+{
+  return session.messageResume ( destination,
+                                 resumeId
+                               );
+}
+
+
+
+void 
+FailoverSession::messageSubscribe ( const string& queue, 
+                                    const string& destination, 
+                                    uint8_t acceptMode, 
+                                    uint8_t acquireMode, 
+                                    bool exclusive, 
+                                    const string& resumeId, 
+                                    uint64_t resumeTtl, 
+                                    const FieldTable& arguments
+                                  )
+{
+  session.messageSubscribe ( queue,
+                             destination,
+                             acceptMode,
+                             acquireMode,
+                             exclusive,
+                             resumeId,
+                             resumeTtl,
+                             arguments
+                           );
+}
+
+
+
+void 
+FailoverSession::messageCancel ( const string& destinations )
+{
+  session.messageCancel ( destinations );
+}
+
+
+
+void 
+FailoverSession::messageSetFlowMode ( const string& destination, 
+                                      uint8_t flowMode
+                                    )
+{
+  session.messageSetFlowMode ( destination,
+                               flowMode
+                             );
+}
+
+
+
+void 
+FailoverSession::messageFlow(const string& destination, 
+                             uint8_t unit, 
+                             uint32_t value)
+{
+  session.messageFlow ( destination,
+                        unit,
+                        value
+                      );
+}
+
+
+
+void 
+FailoverSession::messageFlush(const string& destination)
+{
+  session.messageFlush ( destination );
+}
+
+
+
+void 
+FailoverSession::messageStop(const string& destination)
+{
+  session.messageStop ( destination );
+}
+
+
+
+void 
+FailoverSession::txSelect()
+{
+  session.txSelect ( );
+}
+
+
+
+void 
+FailoverSession::txCommit()
+{
+  session.txCommit ( );
+}
+
+
+
+void 
+FailoverSession::txRollback()
+{
+  session.txRollback ( );
+}
+
+
+
+void 
+FailoverSession::dtxSelect()
+{
+  session.dtxSelect ( );
+}
+
+
+
+qpid::framing::XaResult 
+FailoverSession::dtxStart(const Xid& xid, 
+                          bool join, 
+                          bool resume)
+{
+  return session.dtxStart ( xid,
+                            join,
+                            resume
+                          );
+}
+
+
+
+qpid::framing::XaResult 
+FailoverSession::dtxEnd(const Xid& xid, 
+                        bool fail, 
+                        bool suspend)
+{
+  return session.dtxEnd ( xid,
+                          fail,
+                          suspend
+                        );
+}
+
+
+
+qpid::framing::XaResult 
+FailoverSession::dtxCommit(const Xid& xid, 
+                           bool onePhase)
+{
+  return session.dtxCommit ( xid,
+                             onePhase
+                           );
+}
+
+
+
+void 
+FailoverSession::dtxForget(const Xid& xid)
+{
+  session.dtxForget ( xid );
+}
+
+
+
+qpid::framing::DtxGetTimeoutResult 
+FailoverSession::dtxGetTimeout(const Xid& xid)
+{
+  return session.dtxGetTimeout ( xid );
+}
+
+
+
+qpid::framing::XaResult 
+FailoverSession::dtxPrepare(const Xid& xid)
+{
+  return session.dtxPrepare ( xid );
+}
+
+
+
+qpid::framing::DtxRecoverResult 
+FailoverSession::dtxRecover()
+{
+  return session.dtxRecover ( );
+}
+
+
+
+qpid::framing::XaResult 
+FailoverSession::dtxRollback(const Xid& xid)
+{
+  return session.dtxRollback ( xid );
+}
+
+
+
+void 
+FailoverSession::dtxSetTimeout(const Xid& xid, 
+                               uint32_t timeout)
+{
+  session.dtxSetTimeout ( xid,
+                          timeout
+                        );
+}
+
+
+
+void 
+FailoverSession::exchangeDeclare(const string& exchange, 
+                                 const string& type, 
+                                 const string& alternateExchange, 
+                                 bool passive, 
+                                 bool durable, 
+                                 bool autoDelete, 
+                                 const FieldTable& arguments)
+{
+  session.exchangeDeclare ( exchange,
+                            type,
+                            alternateExchange,
+                            passive,
+                            durable,
+                            autoDelete,
+                            arguments
+                          );
+}
+
+
+
+void 
+FailoverSession::exchangeDelete(const string& exchange, 
+                                bool ifUnused)
+{
+  session.exchangeDelete ( exchange,
+                           ifUnused
+                         );
+}
+
+
+
+qpid::framing::ExchangeQueryResult 
+FailoverSession::exchangeQuery(const string& name)
+{
+  return session.exchangeQuery ( name );
+}
+
+
+
+void 
+FailoverSession::exchangeBind(const string& queue, 
+                              const string& exchange, 
+                              const string& bindingKey, 
+                              const FieldTable& arguments)
+{
+  session.exchangeBind ( queue,
+                         exchange,
+                         bindingKey,
+                         arguments
+                       );
+}
+
+
+
+void 
+FailoverSession::exchangeUnbind(const string& queue, 
+                                const string& exchange, 
+                                const string& bindingKey)
+{
+  session.exchangeUnbind ( queue,
+                           exchange,
+                           bindingKey
+                         );
+}
+
+
+
+qpid::framing::ExchangeBoundResult 
+FailoverSession::exchangeBound(const string& exchange, 
+                               const string& queue, 
+                               const string& bindingKey, 
+                               const FieldTable& arguments)
+{
+  return session.exchangeBound ( exchange,
+                                 queue,
+                                 bindingKey,
+                                 arguments
+                               );
+}
+
+
+
+void 
+FailoverSession::queueDeclare(const string& queue, 
+                              const string& alternateExchange, 
+                              bool passive, 
+                              bool durable, 
+                              bool exclusive, 
+                              bool autoDelete, 
+                              const FieldTable& arguments)
+{
+  session.queueDeclare ( queue,
+                         alternateExchange,
+                         passive,
+                         durable,
+                         exclusive,
+                         autoDelete,
+                         arguments
+                       );
+}
+
+
+
+void 
+FailoverSession::queueDelete(const string& queue, 
+                             bool ifUnused, 
+                             bool ifEmpty)
+{
+  session.queueDelete ( queue,
+                        ifUnused,
+                        ifEmpty
+                      );
+}
+
+
+
+void 
+FailoverSession::queuePurge(const string& queue)
+{
+  session.queuePurge ( queue) ;
+}
+
+
+
+qpid::framing::QueueQueryResult 
+FailoverSession::queueQuery(const string& queue)
+{
+  return session.queueQuery ( queue );
+}
+
+
+// end Wrapped functions from Session  ---------------------------
+
+
+// Get ready for a failover.
+void
+FailoverSession::prepareForFailover ( Connection newConnection )
+{
+  try
+  {
+    newSession = newConnection.newSession();
+  }
+  catch ( const std::exception& error )
+  {
+    throw Exception(QPID_MSG("Can't create failover session."));
+  }
+
+
+  if ( failoverSubscriptionManager )
+  {
+    failoverSubscriptionManager->prepareForFailover ( newSession );
+  }
+}
+
+
+
+void 
+FailoverSession::failover (  )
+{
+  if ( failoverSubscriptionManager )
+  {
+    failoverSubscriptionManager->failover ( );
+  }
+
+  session = newSession;
+}
+
+
+
+
+}} // namespace qpid::client

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h Thu Oct  9 21:49:48 2008
@@ -0,0 +1,314 @@
+#ifndef QPID_CLIENT_FAILOVERSESSION_H
+#define QPID_CLIENT_FAILOVERSESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Session.h"
+#include "qpid/SessionId.h"
+#include "qpid/framing/amqp_structs.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/MethodContent.h"
+#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Completion.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Execution.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/TypedResult.h"
+#include "qpid/shared_ptr.h"
+#include <string>
+
+
+
+
+namespace qpid {
+namespace client {
+
+
+class FailoverConnection;
+class FailoverSubscriptionManager;
+
+
+class FailoverSession
+{
+  public:
+
+    typedef framing::TransferContent DefaultContent;
+
+    FailoverSession ( );
+    ~FailoverSession ( );
+
+    std::string name;
+
+    framing::FrameSet::shared_ptr get();
+
+    SessionId getId() const;
+
+    void close();
+
+    void sync();
+
+    uint32_t timeout ( uint32_t seconds);
+
+    Execution& getExecution();
+
+    void flush();
+
+    void markCompleted(const framing::SequenceNumber& id, 
+                       bool cumulative, 
+                       bool notifyPeer
+                      );
+    
+    void sendCompletion ( );
+
+
+
+    // Wrapped functions from Session ----------------------------
+
+    void 
+    executionSync();
+
+
+    void 
+    executionResult(const SequenceNumber& commandId=SequenceNumber(), 
+                    const string& value=string());
+
+
+    void 
+    executionException(uint16_t errorCode=0, 
+                       const SequenceNumber& commandId=SequenceNumber(), 
+                       uint8_t classCode=0, 
+                       uint8_t commandCode=0, 
+                       uint8_t fieldIndex=0, 
+                       const string& description=string(), 
+                       const FieldTable& errorInfo=FieldTable());
+
+
+    void 
+    messageTransfer(const string& destination=string(), 
+                    uint8_t acceptMode=1, 
+                    uint8_t acquireMode=0, 
+                    const MethodContent& content=DefaultContent(std::string()));
+
+
+    void 
+    messageAccept(const SequenceSet& transfers=SequenceSet());
+
+
+    void 
+    messageReject(const SequenceSet& transfers=SequenceSet(), 
+                  uint16_t code=0, 
+                  const string& text=string());
+
+
+    void 
+    messageRelease(const SequenceSet& transfers=SequenceSet(), 
+                   bool setRedelivered=false);
+
+
+    qpid::framing::MessageAcquireResult 
+    messageAcquire(const SequenceSet& transfers=SequenceSet());
+
+
+    qpid::framing::MessageResumeResult 
+    messageResume(const string& destination=string(), 
+                  const string& resumeId=string());
+
+
+    void 
+    messageSubscribe(const string& queue=string(), 
+                     const string& destination=string(), 
+                     uint8_t acceptMode=0, 
+                     uint8_t acquireMode=0, 
+                     bool exclusive=false, 
+                     const string& resumeId=string(), 
+                     uint64_t resumeTtl=0, 
+                     const FieldTable& arguments=FieldTable());
+
+
+    void 
+    messageCancel(const string& destination=string());
+
+    
+    void 
+    messageSetFlowMode(const string& destination=string(), 
+                            uint8_t flowMode=0);
+
+
+    void 
+    messageFlow(const string& destination=string(), 
+                uint8_t unit=0, 
+                uint32_t value=0);
+
+
+    void 
+    messageFlush(const string& destination=string());
+
+
+    void 
+    messageStop(const string& destination=string());
+
+
+    void 
+    txSelect();
+
+
+    void 
+    txCommit();
+
+
+    void 
+    txRollback();
+
+
+    void 
+    dtxSelect();
+
+
+    qpid::framing::XaResult 
+    dtxStart(const Xid& xid=Xid(), 
+             bool join=false, 
+             bool resume=false);
+
+
+    qpid::framing::XaResult 
+    dtxEnd(const Xid& xid=Xid(), 
+           bool fail=false, 
+           bool suspend=false);
+
+
+    qpid::framing::XaResult 
+    dtxCommit(const Xid& xid=Xid(), 
+              bool onePhase=false);
+
+
+    void 
+    dtxForget(const Xid& xid=Xid());
+
+
+    qpid::framing::DtxGetTimeoutResult 
+    dtxGetTimeout(const Xid& xid=Xid());
+
+
+    qpid::framing::XaResult 
+    dtxPrepare(const Xid& xid=Xid());
+
+
+    qpid::framing::DtxRecoverResult 
+    dtxRecover();
+
+
+    qpid::framing::XaResult 
+    dtxRollback(const Xid& xid=Xid());
+
+
+    void 
+    dtxSetTimeout(const Xid& xid=Xid(), 
+                  uint32_t timeout=0);
+
+
+    void 
+    exchangeDeclare(const string& exchange=string(), 
+                    const string& type=string(), 
+                    const string& alternateExchange=string(), 
+                    bool passive=false, 
+                    bool durable=false, 
+                    bool autoDelete=false, 
+                    const FieldTable& arguments=FieldTable());
+
+
+    void 
+    exchangeDelete(const string& exchange=string(), 
+                   bool ifUnused=false);
+
+
+    qpid::framing::ExchangeQueryResult 
+    exchangeQuery(const string& name=string());
+
+
+    void 
+    exchangeBind(const string& queue=string(), 
+                 const string& exchange=string(), 
+                 const string& bindingKey=string(), 
+                 const FieldTable& arguments=FieldTable());
+
+
+    void 
+    exchangeUnbind(const string& queue=string(), 
+                   const string& exchange=string(), 
+                   const string& bindingKey=string());
+
+
+    qpid::framing::ExchangeBoundResult 
+    exchangeBound(const string& exchange=string(), 
+                  const string& queue=string(), 
+                  const string& bindingKey=string(), 
+                  const FieldTable& arguments=FieldTable());
+
+
+    void 
+    queueDeclare(const string& queue=string(), 
+                 const string& alternateExchange=string(), 
+                 bool passive=false, 
+                 bool durable=false, 
+                 bool exclusive=false, 
+                 bool autoDelete=false, 
+                 const FieldTable& arguments=FieldTable());
+
+
+    void 
+    queueDelete(const string& queue=string(), 
+                bool ifUnused=false, 
+                bool ifEmpty=false);
+
+
+    void 
+    queuePurge(const string& queue=string());
+
+
+    qpid::framing::QueueQueryResult 
+    queueQuery(const string& queue=string());
+
+    // end Wrapped functions from Session  ---------------------------
+
+    // Tells the FailoverSession to get ready for a failover.
+    void prepareForFailover ( Connection newConnection );
+
+    void failover ( );
+
+    FailoverSubscriptionManager * failoverSubscriptionManager;
+
+
+  private:
+
+
+    Session session;
+    Session newSession;
+
+    friend class FailoverConnection;
+    friend class FailoverSubscriptionManager;
+};
+
+}} // namespace qpid::client
+
+
+#endif  /*!QPID_CLIENT_FAILOVERSESSION_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp Thu Oct  9 21:49:48 2008
@@ -0,0 +1,332 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/FailoverSession.h"
+#include "qpid/client/FailoverSubscriptionManager.h"
+
+
+
+using namespace std;
+
+
+namespace qpid {
+namespace client {
+
+
+
+FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) :
+  name("no_name"),
+  newSessionIsValid(false)
+{
+  subscriptionManager = new SubscriptionManager(fs->session);
+  fs->failoverSubscriptionManager = this;
+}
+
+
+
+void
+FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
+{
+  newSession = _newSession;
+  newSessionIsValid = true;
+}
+
+
+
+void
+FailoverSubscriptionManager::failover ( )
+{
+  subscriptionManager->stop();
+  // TODO -- save vector of boost bind fns.
+}
+
+
+
+
+FailoverSubscriptionManager::subscribeArgs::subscribeArgs 
+  ( int _interface,
+    MessageListener * _listener,
+    LocalQueue * _localQueue,
+    const std::string * _queue,
+    const FlowControl * _flow,
+    const std::string * _tag
+  ) :
+  interface(_interface),
+  listener(_listener),
+  localQueue(_localQueue),
+  queue(_queue),
+  flow(_flow),
+  tag(_tag)
+{
+}
+
+
+
+
+void 
+FailoverSubscriptionManager::subscribe ( MessageListener   & listener,
+                                         const std::string & queue,
+                                         const FlowControl & flow,
+                                         const std::string & tag 
+                                       )
+{
+  subscriptionManager->subscribe ( listener,
+                                   queue,
+                                   flow,
+                                   tag
+                                 );
+  subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) )  &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) );
+}
+
+
+
+void 
+FailoverSubscriptionManager::subscribe ( LocalQueue        & localQueue,
+                                         const std::string & queue,
+                                         const FlowControl & flow,
+                                         const std::string & tag
+                                       )
+{
+  subscriptionManager->subscribe ( localQueue,
+                                   queue,
+                                   flow,
+                                   tag
+                                 );
+  subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) )  &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) );
+}
+
+
+
+void
+FailoverSubscriptionManager::subscribe ( MessageListener   & listener,
+                                         const std::string & queue,
+                                         const std::string & tag
+                                       )
+{
+  subscriptionManager->subscribe ( listener,
+                                   queue,
+                                   tag
+                                 );
+  // TODO -- more than one subscription
+  subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) )  &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) );
+}
+
+
+
+
+void 
+FailoverSubscriptionManager::subscribe ( LocalQueue        & localQueue,
+                                         const std::string & queue,
+                                         const std::string & tag
+                                       )
+{
+  subscriptionManager->subscribe ( localQueue,
+                                   queue,
+                                   tag
+                                 );
+  subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) );
+}
+
+
+
+bool 
+FailoverSubscriptionManager::get ( Message & result,
+                                   const std::string & queue,
+                                   sys::Duration timeout
+                                 )
+{
+  return subscriptionManager->get ( result, queue, timeout );
+}
+
+
+
+void 
+FailoverSubscriptionManager::cancel ( const std::string tag )
+{
+  subscriptionManager->cancel ( tag );
+}
+
+
+
+void 
+FailoverSubscriptionManager::run ( ) // User Thread
+{
+  // FIXME mgoulish -- wait on a monitor here instead of this infinite loop
+  while ( 1 )
+  {
+    subscriptionManager->run ( );
+
+    // When we drop out of run, if there is a new Session
+    // waiting for us, this is a failover.  Otherwise, just
+    // return control to usercode.
+    sleep(1);  // FIXME mgoulish -- get rid of this when we have wait-on-monitor.
+
+    if ( newSessionIsValid )
+    {
+      delete subscriptionManager;
+      subscriptionManager = new SubscriptionManager(newSession);
+      // FIXME mgoulish make this an array of boost bind fns
+      //
+      for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin(); 
+            i < subscribeFns.end(); 
+            ++ i 
+          )
+      {
+        std::cerr << "MDEBUG  new new resubscribe.\n";
+        (*i) ();
+      }
+
+      newSessionIsValid = false;
+    }
+    else
+    {
+      // break;  TODO -- fix this
+    }
+  }
+
+}
+
+
+
+void 
+FailoverSubscriptionManager::start ( )
+{
+  subscriptionManager->start ( );
+}
+
+
+
+void 
+FailoverSubscriptionManager::setAutoStop ( bool set )
+{
+  subscriptionManager->setAutoStop ( set );
+}
+
+
+
+void 
+FailoverSubscriptionManager::stop ( )
+{
+  subscriptionManager->stop ( );
+}
+
+
+
+void 
+FailoverSubscriptionManager::setFlowControl ( const std::string & destination,
+                                              const FlowControl & flow 
+                                            )
+{
+  subscriptionManager->setFlowControl ( destination, flow );
+}
+
+
+
+void 
+FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow )
+{
+  subscriptionManager->setFlowControl ( flow );
+}
+
+
+
+const FlowControl & 
+FailoverSubscriptionManager::getFlowControl ( ) const
+{
+  return subscriptionManager->getFlowControl ( );
+}
+
+
+
+
+void 
+FailoverSubscriptionManager::setFlowControl ( const std::string & tag,
+                                              uint32_t messages,
+                                              uint32_t bytes,
+                                              bool window 
+                                            )
+{
+  subscriptionManager->setFlowControl ( tag,
+                                        messages,
+                                        bytes,
+                                        window
+                                      );
+}
+
+
+
+void 
+FailoverSubscriptionManager::setFlowControl ( uint32_t messages,
+                                              uint32_t bytes,
+                                              bool window
+                                            )
+{
+  subscriptionManager->setFlowControl ( messages,
+                                        bytes,
+                                        window
+                                      );
+}
+
+
+
+void 
+FailoverSubscriptionManager::setAcceptMode ( bool required )
+{
+  subscriptionManager->setAcceptMode ( required );
+}
+
+
+
+void 
+FailoverSubscriptionManager::setAcquireMode ( bool acquire )
+{
+  subscriptionManager->setAcquireMode ( acquire );
+}
+
+
+
+void 
+FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck )
+{
+  subscriptionManager->setAckPolicy ( autoAck );
+}
+
+
+
+AckPolicy & 
+FailoverSubscriptionManager::getAckPolicy()
+{
+  return subscriptionManager->getAckPolicy ( );
+}
+
+
+
+void 
+FailoverSubscriptionManager::registerFailoverHandler ( boost::function<void ()> /* fh */ )
+{
+  // FIXME mgoulish -- get rid of this mechanism -- i think it's unused.
+}
+
+
+
+
+
+}} // namespace qpid::client

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h Thu Oct  9 21:49:48 2008
@@ -0,0 +1,162 @@
+#ifndef QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H
+#define QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "qpid/sys/Mutex.h"
+#include <qpid/client/Dispatcher.h>
+#include <qpid/client/Completion.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/FailoverSession.h>
+#include <qpid/client/MessageListener.h>
+#include <qpid/client/SubscriptionManager.h>
+#include <qpid/client/LocalQueue.h>
+#include <qpid/client/FlowControl.h>
+#include <qpid/sys/Runnable.h>
+
+
+
+
+namespace qpid {
+namespace client {
+
+
+class FailoverSubscriptionManager
+{
+  public:
+
+    FailoverSubscriptionManager ( FailoverSession * fs );
+
+    void foo ( int& arg_1 );
+
+    void subscribe ( MessageListener   & listener,
+                     const std::string & queue,
+                     const FlowControl & flow,
+                     const std::string & tag = std::string() );
+
+    void subscribe ( LocalQueue        & localQueue,
+                     const std::string & queue,
+                     const FlowControl & flow,
+                     const std::string & tag=std::string());
+
+    void subscribe ( MessageListener   & listener,
+                     const std::string & queue,
+                     const std::string & tag = std::string());
+
+    void subscribe ( LocalQueue        & localQueue,
+                     const std::string & queue,
+                     const std::string & tag=std::string());
+
+    bool get ( Message & result, 
+               const std::string & queue, 
+               sys::Duration timeout=0);
+
+    void cancel ( const std::string tag );
+
+    void run ( );
+
+    void start ( );
+
+    void setAutoStop ( bool set = true );
+
+    void stop ( );
+
+    void setFlowControl ( const std::string & destintion, 
+                          const FlowControl & flow );
+
+    void setFlowControl ( const FlowControl & flow );
+
+    const FlowControl & getFlowControl ( ) const;
+
+    void setFlowControl ( const std::string & tag, 
+                          uint32_t messages,  
+                          uint32_t bytes, 
+                          bool window=true );
+
+    void setFlowControl ( uint32_t messages,  
+                          uint32_t bytes, 
+                          bool window = true
+                        );
+
+    void setAcceptMode ( bool required );
+
+    void setAcquireMode ( bool acquire );
+
+    void setAckPolicy ( const AckPolicy & autoAck );
+
+    AckPolicy & getAckPolicy();
+
+    void registerFailoverHandler ( boost::function<void ()> fh );
+
+    // Get ready for a failover.
+    void prepareForFailover ( Session newSession );
+    void failover ( );
+
+    std::string name;
+
+
+  private:
+    
+    SubscriptionManager * subscriptionManager;
+
+    MessageListener * savedListener;
+    std::string       savedQueue,
+                      savedTag;
+
+    friend class FailoverConnection;
+    friend class FailoverSession;
+
+    Session newSession;
+    bool    newSessionIsValid;
+
+    /*
+     * */
+    typedef boost::function<void ()> subscribeFn;
+    std::vector < subscribeFn > subscribeFns;
+
+    struct subscribeArgs
+    {
+      int interface;
+      MessageListener   * listener;
+      LocalQueue        * localQueue;
+      const std::string * queue;
+      const FlowControl * flow;
+      const std::string * tag;
+
+      subscribeArgs ( int _interface,
+                      MessageListener   *,
+                      LocalQueue  *,
+                      const std::string *,
+                      const FlowControl *,
+                      const std::string *
+                    );
+    };
+
+    std::vector < subscribeArgs * > subscriptionReplayVector;
+
+};
+
+}} // namespace qpid::client
+
+
+#endif  /*!QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Thu Oct  9 21:49:48 2008
@@ -147,6 +147,12 @@
     return lq.get(result, 0);
 }
 
+Session SubscriptionManager::getSession() const { return session; }
+
+void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) {
+    dispatcher.registerFailoverHandler(fh);
+}
+
 }} // namespace qpid::client
 
 #endif

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Thu Oct  9 21:49:48 2008
@@ -190,7 +190,7 @@
     /** Set the acquire-mode for new subscriptions. Defaults to false.
      *@param acquire: if false messages pre-acquired, if true
      * messages are dequed on acknowledgement or on transfer 
-	 * depending on acceptMode.
+     * depending on acceptMode.
      */
     void setAcquireMode(bool acquire);
 
@@ -199,9 +199,11 @@
      */
     void setAckPolicy(const AckPolicy& autoAck);
 
-     AckPolicy& getAckPolicy();
+    AckPolicy& getAckPolicy();
 
-    Session getSession() const { return session; }
+    void registerFailoverHandler ( boost::function<void ()> fh );
+
+    Session getSession() const;
 };
 
 /** AutoCancel cancels a subscription in its destructor */

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Oct  9 21:49:48 2008
@@ -110,6 +110,7 @@
         // FIXME aconway 2008-09-24: 
         // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
     }
+    broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
     failoverExchange.reset(new FailoverExchange(this));
     broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
     cpgDispatchHandle.startWatch(poller);