You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/10 06:49:49 UTC
svn commit: r703319 - in /incubator/qpid/trunk/qpid/cpp: ./ examples/
examples/failover/ src/ src/qpid/broker/ src/qpid/client/ src/qpid/cluster/
Author: aconway
Date: Thu Oct 9 21:49:48 2008
New Revision: 703319
URL: http://svn.apache.org/viewvc?rev=703319&view=rev
Log:
QPID-1340 froM Mick Goulish: preliminary client-side failover support.
Added:
incubator/qpid/trunk/qpid/cpp/examples/failover/ (with props)
incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am
incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp (with props)
incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp (with props)
incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/configure.ac
incubator/qpid/trunk/qpid/cpp/examples/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
Modified: incubator/qpid/trunk/qpid/cpp/configure.ac
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/configure.ac?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/configure.ac (original)
+++ incubator/qpid/trunk/qpid/cpp/configure.ac Thu Oct 9 21:49:48 2008
@@ -331,6 +331,7 @@
examples/fanout/Makefile
examples/pub-sub/Makefile
examples/request-response/Makefile
+ examples/failover/Makefile
examples/xml-exchange/Makefile
managementgen/Makefile
etc/Makefile
Modified: incubator/qpid/trunk/qpid/cpp/examples/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/Makefile.am?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/Makefile.am Thu Oct 9 21:49:48 2008
@@ -1,4 +1,4 @@
-SUBDIRS = direct fanout pub-sub request-response
+SUBDIRS = direct fanout pub-sub request-response failover
if HAVE_XML
SUBDIRS += xml-exchange
endif
Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Oct 9 21:49:48 2008
@@ -0,0 +1 @@
+Makefile.in
Added: incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am Thu Oct 9 21:49:48 2008
@@ -0,0 +1,21 @@
+examplesdir=$(pkgdatadir)/examples/direct
+
+include $(top_srcdir)/examples/makedist.mk
+
+noinst_PROGRAMS=direct_producer listener declare_queues
+direct_producer_SOURCES=direct_producer.cpp
+direct_producer_LDADD=$(CLIENT_LIB)
+
+listener_SOURCES=listener.cpp
+listener_LDADD=$(CLIENT_LIB)
+
+declare_queues_SOURCES=declare_queues.cpp
+declare_queues_LDADD=$(CLIENT_LIB)
+
+examples_DATA= \
+ direct_producer.cpp \
+ listener.cpp \
+ declare_queues.cpp \
+ $(MAKEDIST)
+
+# FIXME aconway 2008-10-10: add verify scripts.
Added: incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp Thu Oct 9 21:49:48 2008
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverConnection.h>
+#include <qpid/client/Session.h>
+
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+#include <fstream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+
+using namespace std;
+
+
+
+
+int
+main ( int argc, char ** argv)
+{
+ if ( argc < 3 )
+ {
+ std::cerr << "Usage: ./declare_queues host cluster_port_file_name\n";
+ std::cerr << "i.e. for host: 127.0.0.1\n";
+ exit(1);
+ }
+
+ const char * host = argv[1];
+ int port = atoi(argv[2]);
+
+
+ try
+ {
+ FailoverConnection connection;
+ FailoverSession * session;
+
+ connection.open ( host, port );
+ session = connection.newSession();
+
+ session->queueDeclare ( "message_queue");
+
+ /*
+ session->exchangeBind
+ ( arg::exchange="amq.direct",
+ arg::queue="message_queue",
+ arg::bindingKey="routing_key"
+ );
+ * */
+ session->exchangeBind ( "message_queue",
+ "amq.direct",
+ "routing_key"
+ );
+ connection.close();
+ return 0;
+ }
+ catch ( const std::exception& error )
+ {
+ std::cout << error.what() << std::endl;
+ }
+
+ return 1;
+}
+
+
+
+
+
Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/declare_queues.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp Thu Oct 9 21:49:48 2008
@@ -0,0 +1,148 @@
+#include <qpid/client/FailoverConnection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
+#include <qpid/client/Message.h>
+
+
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+#include <fstream>
+
+#include <sstream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+
+
+
+int
+main ( int argc, char ** argv)
+{
+ struct timeval broker_killed_time = {0,0},
+ failover_complete_time = {0,0},
+ duration = {0,0};
+
+
+ if ( argc < 3 )
+ {
+ std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n";
+ std::cerr << "i.e. for host: 127.0.0.1\n";
+ exit(1);
+ }
+
+ char const * host = argv[1];
+ int port = atoi(argv[2]);
+ char const * broker_to_kill = 0;
+
+ if ( argc > 3 )
+ {
+ broker_to_kill = argv[3];
+ std::cerr << "main: Broker marked for death is process ID "
+ << broker_to_kill
+ << endl;
+ }
+ else
+ {
+ std::cerr << "PRODUCER main: there is no broker to kill.\n";
+ }
+
+ FailoverConnection connection;
+ FailoverSession * session;
+ Message message;
+
+ string program_name = "PRODUCER";
+
+
+ connection.failoverCompleteTime = & failover_complete_time;
+ connection.name = program_name;
+ connection.open ( host, port );
+
+ session = connection.newSession();
+ session->name = program_name;
+
+ int send_this_many = 30,
+ messages_sent = 0;
+
+ while ( messages_sent < send_this_many )
+ {
+ if ( (messages_sent == 13) && broker_to_kill )
+ {
+ char command[1000];
+ std::cerr << program_name << " killing broker " << broker_to_kill << ".\n";
+ sprintf(command, "kill -9 %s", broker_to_kill);
+ system ( command );
+ gettimeofday ( & broker_killed_time, 0 );
+ }
+
+ message.getDeliveryProperties().setRoutingKey("routing_key");
+
+ std::cerr << "sending message "
+ << messages_sent
+ << " of "
+ << send_this_many
+ << ".\n";
+
+ stringstream message_data;
+ message_data << messages_sent;
+ message.setData(message_data.str());
+
+ try
+ {
+ /* MICK FIXME
+ session.messageTransfer ( arg::content=message,
+ arg::destination="amq.direct"
+ ); */
+ session->messageTransfer ( "amq.direct",
+ 1,
+ 0,
+ message
+ );
+ }
+ catch ( const std::exception& error)
+ {
+ cerr << program_name << " exception: " << error.what() << endl;
+ }
+
+ sleep ( 1 );
+ ++ messages_sent;
+ }
+
+ message.setData ( "That's all, folks!" );
+
+ /* MICK FIXME
+ session.messageTransfer ( arg::content=message,
+ arg::destination="amq.direct"
+ );
+ */
+ session->messageTransfer ( "amq.direct",
+ 1,
+ 0,
+ message
+ );
+
+ session->sync();
+ connection.close();
+
+ // This will be incorrect if you killed more than one...
+ if ( broker_to_kill )
+ {
+ timersub ( & failover_complete_time,
+ & broker_killed_time,
+ & duration
+ );
+ fprintf ( stderr,
+ "Failover time: %ld.%.6ld\n",
+ duration.tv_sec,
+ duration.tv_usec
+ );
+ }
+
+ return 0;
+}
+
+
+
Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp Thu Oct 9 21:49:48 2008
@@ -0,0 +1,249 @@
+
+#include <qpid/client/FailoverConnection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/SubscriptionManager.h>
+
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+#include <fstream>
+
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+
+
+
+struct Recorder
+{
+ unsigned int max_messages;
+ unsigned int * messages_received;
+
+ Recorder ( )
+ {
+ max_messages = 1000;
+ messages_received = new unsigned int [ max_messages ];
+ memset ( messages_received, 0, max_messages * sizeof(int) );
+ }
+
+
+ void
+ received ( int i )
+ {
+ messages_received[i] ++;
+ }
+
+
+
+ void
+ report ( )
+ {
+ int i;
+
+ int last_received_message = 0;
+
+ vector<unsigned int> missed_messages,
+ multiple_messages;
+
+ /*----------------------------------------------------
+ Collect indices of missed and multiple messages.
+ ----------------------------------------------------*/
+ bool seen_first_message = false;
+ for ( i = max_messages - 1; i >= 0; -- i )
+ {
+ if ( ! seen_first_message )
+ {
+ if ( messages_received [i] > 0 )
+ {
+ seen_first_message = true;
+ last_received_message = i;
+ }
+ }
+ else
+ {
+ if ( messages_received [i] == 0 )
+ missed_messages.push_back ( i );
+ else
+ if ( messages_received [i] > 1 )
+ {
+ multiple_messages.push_back ( i );
+ }
+ }
+ }
+
+ /*--------------------------------------------
+ Report missed messages.
+ --------------------------------------------*/
+ char const * verb = ( missed_messages.size() == 1 )
+ ? " was "
+ : " were ";
+
+ char const * plural = ( missed_messages.size() == 1 )
+ ? "."
+ : "s.";
+
+ std::cerr << "Listener::shutdown: There"
+ << verb
+ << missed_messages.size()
+ << " missed message"
+ << plural
+ << endl;
+
+ for ( i = 0; i < int(missed_messages.size()); ++ i )
+ {
+ std::cerr << " " << i << " was missed.\n";
+ }
+
+
+ /*--------------------------------------------
+ Report multiple messages.
+ --------------------------------------------*/
+ verb = ( multiple_messages.size() == 1 )
+ ? " was "
+ : " were ";
+
+ plural = ( multiple_messages.size() == 1 )
+ ? "."
+ : "s.";
+
+ std::cerr << "Listener::shutdown: There"
+ << verb
+ << multiple_messages.size()
+ << " multiple message"
+ << plural
+ << endl;
+
+ for ( i = 0; i < int(multiple_messages.size()); ++ i )
+ {
+ std::cerr << " "
+ << multiple_messages[i]
+ << " was received "
+ << messages_received [ multiple_messages[i] ]
+ << " times.\n";
+ }
+
+ /*
+ for ( i = 0; i < last_received_message; ++ i )
+ {
+ std::cerr << "Message " << i << ": " << messages_received[i] << std::endl;
+ }
+ */
+ }
+
+};
+
+
+
+
+struct Listener : public MessageListener
+{
+ FailoverSubscriptionManager & subscriptionManager;
+ Recorder & recorder;
+
+
+ Listener ( FailoverSubscriptionManager& subs,
+ Recorder & recorder
+ );
+
+ void shutdown() { recorder.report(); }
+ void parse_message ( std::string const & msg );
+
+ virtual void received ( Message & message );
+};
+
+
+
+
+
+Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) :
+ subscriptionManager(s),
+ recorder(r)
+{
+}
+
+
+
+
+
+void
+Listener::received ( Message & message )
+{
+ std::cerr << "Listener received: " << message.getData() << std::endl;
+ if (message.getData() == "That's all, folks!")
+ {
+ std::cout << "Shutting down listener for " << message.getDestination()
+ << std::endl;
+ subscriptionManager.cancel(message.getDestination());
+
+ shutdown();
+ }
+ else
+ {
+ parse_message ( message.getData() );
+ }
+}
+
+
+
+
+
+void
+Listener::parse_message ( const std::string & msg )
+{
+ int msg_number;
+ if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) )
+ {
+ std::cerr << "Listener::parse_message error: Can't read message number from this message: |" << msg_number << "|\n";
+ return;
+ }
+ recorder.received ( msg_number );
+}
+
+
+
+
+
+
+int
+main ( int argc, char ** argv )
+{
+ string program_name = "LISTENER";
+
+ if ( argc < 3 )
+ {
+ std::cerr << "Usage: ./listener host cluster_port_file_name\n";
+ std::cerr << "i.e. for host: 127.0.0.1\n";
+ exit(1);
+ }
+
+ char const * host = argv[1];
+ int port = atoi(argv[2]);
+
+ FailoverConnection connection;
+ FailoverSession * session;
+ Recorder recorder;
+
+ connection.name = program_name;
+
+ connection.open ( host, port );
+ session = connection.newSession();
+ session->name = program_name;
+
+ FailoverSubscriptionManager subscriptions ( session );
+ subscriptions.name = program_name;
+ Listener listener ( subscriptions, recorder );
+ subscriptions.subscribe ( listener, "message_queue" );
+ subscriptions.run ( );
+
+ connection.close();
+
+ return 1;
+}
+
+
+
+
Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Oct 9 21:49:48 2008
@@ -371,11 +371,14 @@
qpid/client/Bounds.cpp \
qpid/client/Connection.cpp \
qpid/client/ConnectionHandler.cpp \
- qpid/client/ConnectionImpl.cpp \
+ qpid/client/ConnectionImpl.cpp \
qpid/client/ConnectionSettings.cpp \
- qpid/client/Connector.cpp \
+ qpid/client/Connector.cpp \
qpid/client/Demux.cpp \
qpid/client/Dispatcher.cpp \
+ qpid/client/FailoverConnection.cpp \
+ qpid/client/FailoverSession.cpp \
+ qpid/client/FailoverSubscriptionManager.cpp \
qpid/client/FailoverListener.h \
qpid/client/FailoverListener.cpp \
qpid/client/Future.cpp \
@@ -508,6 +511,9 @@
qpid/client/Demux.h \
qpid/client/Dispatcher.h \
qpid/client/Execution.h \
+ qpid/client/FailoverConnection.h \
+ qpid/client/FailoverSession.h \
+ qpid/client/FailoverSubscriptionManager.h \
qpid/client/FlowControl.h \
qpid/client/Future.h \
qpid/client/FutureCompletion.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Oct 9 21:49:48 2008
@@ -140,7 +140,8 @@
qpid::SessionState::Configuration(
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
- *this)
+ *this),
+ getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
if(conf.enableMgmt){
QPID_LOG(info, "Management enabled");
@@ -426,5 +427,15 @@
boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; }
+std::vector<Url>
+Broker::getKnownBrokersImpl()
+{
+ knownBrokers.clear();
+ knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( getPort() ) );
+ return knownBrokers;
+}
+
+
+
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Oct 9 21:49:48 2008
@@ -113,6 +113,9 @@
void declareStandardExchange(const std::string& name, const std::string& type);
+ std::vector<Url> knownBrokers;
+ std::vector<Url> getKnownBrokersImpl();
+
public:
@@ -191,6 +194,9 @@
boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
+
+ boost::function<std::vector<Url> ()> getKnownBrokers;
+
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Thu Oct 9 21:49:48 2008
@@ -28,6 +28,7 @@
#include "qpid/framing/ServerInvoker.h"
#include "qpid/framing/enum.h"
#include "qpid/log/Statement.h"
+#include "qpid/Url.h"
#include "AclModule.h"
using namespace qpid;
@@ -127,8 +128,11 @@
void ConnectionHandler::Handler::open(const string& /*virtualHost*/,
const framing::Array& /*capabilities*/, bool /*insist*/)
{
- framing::Array knownhosts;
- client.openOk(knownhosts);
+ std::vector<Url> urls = connection.broker.getKnownBrokers();
+ framing::Array array(0x95); // str16 array
+ for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i)
+ array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
+ client.openOk(array);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Thu Oct 9 21:49:48 2008
@@ -99,6 +99,15 @@
return impl && impl->isOpen();
}
+void
+Connection::registerFailureCallback ( boost::function<void ()> fn ) {
+ failureCallback = fn;
+ if ( impl )
+ impl->registerFailureCallback ( fn );
+}
+
+
+
void Connection::open(const ConnectionSettings& settings)
{
if (isOpen())
@@ -106,6 +115,8 @@
impl = shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings));
impl->open();
+ if ( failureCallback )
+ impl->registerFailureCallback ( failureCallback );
}
Session Connection::newSession(const std::string& name, uint32_t timeout) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Thu Oct 9 21:49:48 2008
@@ -45,9 +45,13 @@
{
framing::ProtocolVersion version;
+ boost::function<void ()> failureCallback;
+
+
protected:
boost::shared_ptr<ConnectionImpl> impl;
+
public:
/**
* Creates a connection object, but does not open the connection.
@@ -168,6 +172,7 @@
bool isOpen() const;
std::vector<Url> getKnownBrokers();
+ void registerFailureCallback ( boost::function<void ()> fn );
friend class ConnectionAccess; ///<@internal
friend class SessionBase_0_10; ///<@internal
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Thu Oct 9 21:49:48 2008
@@ -157,13 +157,17 @@
proxy.open(virtualhost, capabilities, insist);
}
-void ConnectionHandler::openOk(const framing::Array& /*knownHosts*/)
+void ConnectionHandler::openOk ( const framing::Array& knownBrokers )
{
checkState(OPENING, INVALID_STATE_OPEN_OK);
- //TODO: store knownHosts for reconnection etc
+ knownBrokersUrls.clear();
+ framing::Array::ValueVector::const_iterator i;
+ for ( i = knownBrokers.begin(); i != knownBrokers.end(); ++i )
+ knownBrokersUrls.push_back(Url((*i)->get<std::string>()));
setState(OPEN);
}
+
void ConnectionHandler::redirect(const std::string& /*host*/, const Array& /*knownHosts*/)
{
throw NotImplementedException("Redirection received from broker; not yet implemented in client");
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h Thu Oct 9 21:49:48 2008
@@ -32,6 +32,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/InputHandler.h"
+#include "qpid/Url.h"
namespace qpid {
namespace client {
@@ -103,6 +104,8 @@
CloseListener onClose;
ErrorListener onError;
+
+ std::vector<Url> knownBrokersUrls;
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Thu Oct 9 21:49:48 2008
@@ -151,13 +151,20 @@
void ConnectionImpl::shutdown() {
Mutex::ScopedLock l(lock);
- if (handler.isClosed()) return;
+ if (handler.isClosed())
+ {
+ std::cerr << "MDEBUG ConnectionImpl::shutdown -- returning w/o failure callback!\n";
+ return;
+ }
// FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have
// an appropriate close-code. connection-forced is not right.
if (!handler.isClosing())
closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
handler.fail(CONN_CLOSED);
+
+ if ( failureCallback )
+ failureCallback();
}
void ConnectionImpl::erase(uint16_t ch) {
@@ -171,8 +178,8 @@
}
std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
- // FIXME aconway 2008-10-08: initialize failover list from openOk or settings
- return failover ? failover->getKnownBrokers() : std::vector<qpid::Url>();
+ // FIXME aconway 2008-10-08: ensure we never return empty list, always include self Url.
+ return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls;
}
boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& name, uint32_t timeout, uint16_t channel) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Thu Oct 9 21:49:48 2008
@@ -69,6 +69,8 @@
void idleIn();
void shutdown();
+ boost::function<void ()> failureCallback;
+
public:
ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings);
~ConnectionImpl();
@@ -82,12 +84,11 @@
void close();
void handle(framing::AMQFrame& frame);
void erase(uint16_t channel);
- void stopFailoverListener();
-
const ConnectionSettings& getNegotiatedSettings();
std::vector<Url> getKnownBrokers();
-
+ void registerFailureCallback ( boost::function<void ()> fn ) { failureCallback = fn; }
+ void stopFailoverListener();
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Thu Oct 9 21:49:48 2008
@@ -49,7 +49,10 @@
}
Dispatcher::Dispatcher(const Session& s, const std::string& q)
- : session(s), running(false), autoStop(true)
+ : session(s),
+ running(false),
+ autoStop(true),
+ failoverHandler(0)
{
queue = q.empty() ?
session.getExecution().getDemux().getDefault() :
@@ -91,9 +94,20 @@
}
session.sync(); // Make sure all our acks are received before returning.
}
- catch (const ClosedException&) {} //ignore it and return
+ catch (const ClosedException& e)
+ {
+ QPID_LOG(debug, "Ignored exception in client dispatch thread: " << e.what());
+ } //ignore it and return
catch (const std::exception& e) {
QPID_LOG(error, "Exception in client dispatch thread: " << e.what());
+ if ( failoverHandler )
+ {
+ failoverHandler();
+ }
+ else
+ {
+ QPID_LOG(info, "No dispatcher failover handler registered.");
+ }
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h Thu Oct 9 21:49:48 2008
@@ -69,6 +69,8 @@
Subscriber::shared_ptr find(const std::string& name);
bool isStopped();
+ boost::function<void ()> failoverHandler;
+
public:
Dispatcher(const Session& session, const std::string& queue = "");
@@ -77,6 +79,11 @@
void stop();
void setAutoStop(bool b);
+ void registerFailoverHandler ( boost::function<void ()> fh )
+ {
+ failoverHandler = fh;
+ }
+
void listen(MessageListener* listener, AckPolicy autoAck=AckPolicy());
void listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck=AckPolicy());
void cancel(const std::string& destination);
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp Thu Oct 9 21:49:48 2008
@@ -0,0 +1,185 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+
+#include "qpid/log/Statement.h"
+#include "qpid/client/FailoverConnection.h"
+#include "qpid/client/ConnectionSettings.h"
+
+#include <iostream>
+#include <fstream>
+
+using namespace std;
+
+
+namespace qpid {
+namespace client {
+
+
+FailoverConnection::FailoverConnection ( ) :
+ name(),
+ failoverCompleteTime(0)
+{
+ connection.registerFailureCallback
+ ( boost::bind(&FailoverConnection::failover, this));
+}
+
+FailoverConnection::~FailoverConnection () {}
+
+void
+FailoverConnection::open ( const std::string& host,
+ int port,
+ const std::string& uid,
+ const std::string& pwd,
+ const std::string& virtualhost,
+ uint16_t maxFrameSize
+)
+{
+ ConnectionSettings settings;
+
+ settings.host = host;
+ settings.port = port;
+ settings.username = uid;
+ settings.username = uid;
+ settings.password = pwd;
+ settings.virtualhost = virtualhost;
+ settings.maxFrameSize = maxFrameSize;
+ settings.host = host;
+
+ open ( settings );
+}
+
+
+void
+FailoverConnection::open ( ConnectionSettings & settings )
+{
+ connection.open ( settings );
+}
+
+
+
+void
+FailoverConnection::close ( )
+{
+ connection.close();
+}
+
+
+
+FailoverSession *
+FailoverConnection::newSession ( const std::string& /* name */ )
+{
+ FailoverSession * fs = new FailoverSession;
+ sessions.push_back ( fs );
+ fs->session = connection.newSession();
+ return fs;
+}
+
+
+
+void
+FailoverConnection::resume ( FailoverSession & failoverSession )
+{
+ connection.resume ( failoverSession.session );
+}
+
+
+bool
+FailoverConnection::isOpen() const
+{
+ return connection.isOpen();
+}
+
+
+void
+FailoverConnection::getKnownBrokers ( std::vector<std::string> & /*v*/ )
+{
+}
+
+
+void
+FailoverConnection::registerFailureCallback ( boost::function<void ()> /*fn*/ )
+{
+}
+
+void
+FailoverConnection::failover ( )
+{
+ std::vector<Url> knownBrokers = connection.getKnownBrokers();
+ if (knownBrokers.empty())
+ throw Exception(QPID_MSG("FailoverConnection::failover " << name << " no known brokers."));
+
+ Connection newConnection;
+ for (std::vector<Url>::iterator i = knownBrokers.begin(); i != knownBrokers.end(); ++i) {
+ try {
+ newConnection.open(*i);
+ break;
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(info, "Could not fail-over to " << *i << ": " << e.what());
+ if ((i + 1) == knownBrokers.end())
+ throw;
+ }
+ }
+
+ /*
+ * We have a valid new connection. Tell all the sessions
+ * (and, through them, their SessionManagers and whatever else)
+ * that we are about to failover to this new Connection.
+ */
+ // FIXME mgoulish -- get rid of two-passes here.
+ std::vector<FailoverSession *>::iterator sessions_iterator;
+
+ for ( sessions_iterator = sessions.begin();
+ sessions_iterator < sessions.end();
+ ++ sessions_iterator
+ )
+ {
+ FailoverSession * fs = * sessions_iterator;
+ fs->prepareForFailover ( newConnection );
+ }
+
+ /*
+ * Tell all sessions to actually failover to the new connection.
+ */
+ for ( sessions_iterator = sessions.begin();
+ sessions_iterator < sessions.end();
+ ++ sessions_iterator
+ )
+ {
+ FailoverSession * fs = * sessions_iterator;
+ fs->failover ( );
+ }
+
+ connection = newConnection;
+
+ // FIXME aconway 2008-10-09: use sys/Time.h functions.
+ if ( failoverCompleteTime )
+ {
+ gettimeofday ( failoverCompleteTime, 0 );
+ }
+}
+
+
+
+
+}} // namespace qpid::client
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h Thu Oct 9 21:49:48 2008
@@ -0,0 +1,103 @@
+#ifndef QPID_CLIENT_FAILOVERCONNECTION_H
+#define QPID_CLIENT_FAILOVERCONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <string>
+
+#include "qpid/client/Connection.h"
+#include "qpid/client/FailoverConnection.h"
+#include "qpid/client/FailoverSession.h"
+#include "qpid/client/FailoverSubscriptionManager.h"
+
+
+
+namespace qpid {
+namespace client {
+
+class ConnectionSettings;
+
+
+class FailoverConnection
+{
+ public:
+
+ FailoverConnection ( );
+
+ ~FailoverConnection ( );
+
+ void open ( const std::string& host,
+ int port,
+ const std::string& uid = "guest",
+ const std::string& pwd = "guest",
+ const std::string& virtualhost = "/",
+ uint16_t maxFrameSize=65535
+ );
+
+ void open ( ConnectionSettings & settings );
+
+ void close ( );
+
+ FailoverSession * newSession ( const std::string& name = std::string() );
+
+ void resume ( FailoverSession & session );
+
+ bool isOpen() const;
+
+ void getKnownBrokers ( std::vector<std::string> & v );
+
+
+ // public interface specific to Failover:
+
+ void registerFailureCallback ( boost::function<void ()> fn );
+
+ // If you have more than 1 connection and you want to give them
+ // separate names for debugging...
+ std::string name;
+
+ void failover ( );
+
+ struct timeval * failoverCompleteTime;
+
+
+ private:
+
+ std::string host;
+
+ Connection connection;
+
+ int currentPortNumber;
+
+ boost::function<void ()> clientFailoverCallback;
+
+ std::vector<FailoverSession *> sessions;
+
+
+ friend class FailoverSession;
+ friend class FailoverSessionManager;
+};
+
+}} // namespace qpid::client
+
+
+#endif /*!QPID_CLIENT_FAILOVERCONNECTION_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp Thu Oct 9 21:49:48 2008
@@ -0,0 +1,592 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <iostream>
+#include <fstream>
+
+
+#include "qpid/log/Logger.h"
+#include "qpid/log/Options.h"
+#include "qpid/log/Statement.h"
+
+#include "qpid/client/FailoverConnection.h"
+#include "qpid/client/FailoverSession.h"
+
+
+using namespace std;
+
+
+namespace qpid {
+namespace client {
+
+FailoverSession::FailoverSession ( ) :
+ name("no_name")
+{
+ // The session is created by FailoverConnection::newSession
+ failoverSubscriptionManager = 0;
+}
+
+
+FailoverSession::~FailoverSession ( )
+{
+}
+
+
+framing::FrameSet::shared_ptr
+FailoverSession::get()
+{
+ return session.get();
+}
+
+
+SessionId
+FailoverSession::getId() const
+{
+ return session.getId();
+}
+
+
+void
+FailoverSession::close()
+{
+ session.close();
+}
+
+
+void
+FailoverSession::sync()
+{
+ session.sync();
+}
+
+
+uint32_t
+FailoverSession::timeout(uint32_t /*seconds*/ )
+{
+ // MICK WTF? return session.timeout ( seconds );
+ return 0;
+}
+
+
+Execution&
+FailoverSession::getExecution()
+{
+ return session.getExecution();
+}
+
+
+void
+FailoverSession::flush()
+{
+ session.flush();
+}
+
+
+void
+FailoverSession::markCompleted(const framing::SequenceNumber& id,
+ bool cumulative,
+ bool notifyPeer
+ )
+{
+ session.markCompleted ( id, cumulative, notifyPeer );
+}
+
+
+
+// Wrapped functions from Session ----------------------------
+
+void
+FailoverSession::executionSync()
+{
+ session.executionSync();
+}
+
+
+
+void
+FailoverSession::executionResult ( const SequenceNumber& commandId,
+ const string& value
+ )
+{
+ session.executionResult ( commandId,
+ value
+ );
+}
+
+
+
+void
+FailoverSession::executionException ( uint16_t errorCode,
+ const SequenceNumber& commandId,
+ uint8_t classCode,
+ uint8_t commandCode,
+ uint8_t fieldIndex,
+ const string& description,
+ const FieldTable& errorInfo
+ )
+{
+ session.executionException ( errorCode,
+ commandId,
+ classCode,
+ commandCode,
+ fieldIndex,
+ description,
+ errorInfo
+ );
+}
+
+
+
+void
+FailoverSession::messageTransfer ( const string& destination,
+ uint8_t acceptMode,
+ uint8_t acquireMode,
+ const MethodContent& content
+ )
+{
+ session.messageTransfer ( destination,
+ acceptMode,
+ acquireMode,
+ content
+ );
+}
+
+
+
+void
+FailoverSession::messageAccept ( const SequenceSet& transfers )
+{
+ session.messageAccept ( transfers );
+}
+
+
+
+void
+FailoverSession::messageReject ( const SequenceSet& transfers,
+ uint16_t code,
+ const string& text
+ )
+{
+ session.messageReject ( transfers,
+ code,
+ text
+ );
+}
+
+
+
+void
+FailoverSession::messageRelease ( const SequenceSet& transfers,
+ bool setRedelivered
+ )
+{
+ session.messageRelease ( transfers,
+ setRedelivered
+ );
+}
+
+
+
+qpid::framing::MessageAcquireResult
+FailoverSession::messageAcquire ( const SequenceSet& transfers )
+{
+ return session.messageAcquire ( transfers );
+}
+
+
+
+qpid::framing::MessageResumeResult
+FailoverSession::messageResume ( const string& destination,
+ const string& resumeId
+ )
+{
+ return session.messageResume ( destination,
+ resumeId
+ );
+}
+
+
+
+void
+FailoverSession::messageSubscribe ( const string& queue,
+ const string& destination,
+ uint8_t acceptMode,
+ uint8_t acquireMode,
+ bool exclusive,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const FieldTable& arguments
+ )
+{
+ session.messageSubscribe ( queue,
+ destination,
+ acceptMode,
+ acquireMode,
+ exclusive,
+ resumeId,
+ resumeTtl,
+ arguments
+ );
+}
+
+
+
+void
+FailoverSession::messageCancel ( const string& destinations )
+{
+ session.messageCancel ( destinations );
+}
+
+
+
+void
+FailoverSession::messageSetFlowMode ( const string& destination,
+ uint8_t flowMode
+ )
+{
+ session.messageSetFlowMode ( destination,
+ flowMode
+ );
+}
+
+
+
+void
+FailoverSession::messageFlow(const string& destination,
+ uint8_t unit,
+ uint32_t value)
+{
+ session.messageFlow ( destination,
+ unit,
+ value
+ );
+}
+
+
+
+void
+FailoverSession::messageFlush(const string& destination)
+{
+ session.messageFlush ( destination );
+}
+
+
+
+void
+FailoverSession::messageStop(const string& destination)
+{
+ session.messageStop ( destination );
+}
+
+
+
+void
+FailoverSession::txSelect()
+{
+ session.txSelect ( );
+}
+
+
+
+void
+FailoverSession::txCommit()
+{
+ session.txCommit ( );
+}
+
+
+
+void
+FailoverSession::txRollback()
+{
+ session.txRollback ( );
+}
+
+
+
+void
+FailoverSession::dtxSelect()
+{
+ session.dtxSelect ( );
+}
+
+
+
+qpid::framing::XaResult
+FailoverSession::dtxStart(const Xid& xid,
+ bool join,
+ bool resume)
+{
+ return session.dtxStart ( xid,
+ join,
+ resume
+ );
+}
+
+
+
+qpid::framing::XaResult
+FailoverSession::dtxEnd(const Xid& xid,
+ bool fail,
+ bool suspend)
+{
+ return session.dtxEnd ( xid,
+ fail,
+ suspend
+ );
+}
+
+
+
+qpid::framing::XaResult
+FailoverSession::dtxCommit(const Xid& xid,
+ bool onePhase)
+{
+ return session.dtxCommit ( xid,
+ onePhase
+ );
+}
+
+
+
+void
+FailoverSession::dtxForget(const Xid& xid)
+{
+ session.dtxForget ( xid );
+}
+
+
+
+qpid::framing::DtxGetTimeoutResult
+FailoverSession::dtxGetTimeout(const Xid& xid)
+{
+ return session.dtxGetTimeout ( xid );
+}
+
+
+
+qpid::framing::XaResult
+FailoverSession::dtxPrepare(const Xid& xid)
+{
+ return session.dtxPrepare ( xid );
+}
+
+
+
+qpid::framing::DtxRecoverResult
+FailoverSession::dtxRecover()
+{
+ return session.dtxRecover ( );
+}
+
+
+
+qpid::framing::XaResult
+FailoverSession::dtxRollback(const Xid& xid)
+{
+ return session.dtxRollback ( xid );
+}
+
+
+
+void
+FailoverSession::dtxSetTimeout(const Xid& xid,
+ uint32_t timeout)
+{
+ session.dtxSetTimeout ( xid,
+ timeout
+ );
+}
+
+
+
+void
+FailoverSession::exchangeDeclare(const string& exchange,
+ const string& type,
+ const string& alternateExchange,
+ bool passive,
+ bool durable,
+ bool autoDelete,
+ const FieldTable& arguments)
+{
+ session.exchangeDeclare ( exchange,
+ type,
+ alternateExchange,
+ passive,
+ durable,
+ autoDelete,
+ arguments
+ );
+}
+
+
+
+void
+FailoverSession::exchangeDelete(const string& exchange,
+ bool ifUnused)
+{
+ session.exchangeDelete ( exchange,
+ ifUnused
+ );
+}
+
+
+
+qpid::framing::ExchangeQueryResult
+FailoverSession::exchangeQuery(const string& name)
+{
+ return session.exchangeQuery ( name );
+}
+
+
+
+void
+FailoverSession::exchangeBind(const string& queue,
+ const string& exchange,
+ const string& bindingKey,
+ const FieldTable& arguments)
+{
+ session.exchangeBind ( queue,
+ exchange,
+ bindingKey,
+ arguments
+ );
+}
+
+
+
+void
+FailoverSession::exchangeUnbind(const string& queue,
+ const string& exchange,
+ const string& bindingKey)
+{
+ session.exchangeUnbind ( queue,
+ exchange,
+ bindingKey
+ );
+}
+
+
+
+qpid::framing::ExchangeBoundResult
+FailoverSession::exchangeBound(const string& exchange,
+ const string& queue,
+ const string& bindingKey,
+ const FieldTable& arguments)
+{
+ return session.exchangeBound ( exchange,
+ queue,
+ bindingKey,
+ arguments
+ );
+}
+
+
+
+void
+FailoverSession::queueDeclare(const string& queue,
+ const string& alternateExchange,
+ bool passive,
+ bool durable,
+ bool exclusive,
+ bool autoDelete,
+ const FieldTable& arguments)
+{
+ session.queueDeclare ( queue,
+ alternateExchange,
+ passive,
+ durable,
+ exclusive,
+ autoDelete,
+ arguments
+ );
+}
+
+
+
+void
+FailoverSession::queueDelete(const string& queue,
+ bool ifUnused,
+ bool ifEmpty)
+{
+ session.queueDelete ( queue,
+ ifUnused,
+ ifEmpty
+ );
+}
+
+
+
+void
+FailoverSession::queuePurge(const string& queue)
+{
+ session.queuePurge ( queue) ;
+}
+
+
+
+qpid::framing::QueueQueryResult
+FailoverSession::queueQuery(const string& queue)
+{
+ return session.queueQuery ( queue );
+}
+
+
+// end Wrapped functions from Session ---------------------------
+
+
+// Get ready for a failover.
+void
+FailoverSession::prepareForFailover ( Connection newConnection )
+{
+ try
+ {
+ newSession = newConnection.newSession();
+ }
+ catch ( const std::exception& error )
+ {
+ throw Exception(QPID_MSG("Can't create failover session."));
+ }
+
+
+ if ( failoverSubscriptionManager )
+ {
+ failoverSubscriptionManager->prepareForFailover ( newSession );
+ }
+}
+
+
+
+void
+FailoverSession::failover ( )
+{
+ if ( failoverSubscriptionManager )
+ {
+ failoverSubscriptionManager->failover ( );
+ }
+
+ session = newSession;
+}
+
+
+
+
+}} // namespace qpid::client
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h Thu Oct 9 21:49:48 2008
@@ -0,0 +1,314 @@
+#ifndef QPID_CLIENT_FAILOVERSESSION_H
+#define QPID_CLIENT_FAILOVERSESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Session.h"
+#include "qpid/SessionId.h"
+#include "qpid/framing/amqp_structs.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/MethodContent.h"
+#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Completion.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Execution.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/TypedResult.h"
+#include "qpid/shared_ptr.h"
+#include <string>
+
+
+
+
+namespace qpid {
+namespace client {
+
+
+class FailoverConnection;
+class FailoverSubscriptionManager;
+
+
+class FailoverSession
+{
+ public:
+
+ typedef framing::TransferContent DefaultContent;
+
+ FailoverSession ( );
+ ~FailoverSession ( );
+
+ std::string name;
+
+ framing::FrameSet::shared_ptr get();
+
+ SessionId getId() const;
+
+ void close();
+
+ void sync();
+
+ uint32_t timeout ( uint32_t seconds);
+
+ Execution& getExecution();
+
+ void flush();
+
+ void markCompleted(const framing::SequenceNumber& id,
+ bool cumulative,
+ bool notifyPeer
+ );
+
+ void sendCompletion ( );
+
+
+
+ // Wrapped functions from Session ----------------------------
+
+ void
+ executionSync();
+
+
+ void
+ executionResult(const SequenceNumber& commandId=SequenceNumber(),
+ const string& value=string());
+
+
+ void
+ executionException(uint16_t errorCode=0,
+ const SequenceNumber& commandId=SequenceNumber(),
+ uint8_t classCode=0,
+ uint8_t commandCode=0,
+ uint8_t fieldIndex=0,
+ const string& description=string(),
+ const FieldTable& errorInfo=FieldTable());
+
+
+ void
+ messageTransfer(const string& destination=string(),
+ uint8_t acceptMode=1,
+ uint8_t acquireMode=0,
+ const MethodContent& content=DefaultContent(std::string()));
+
+
+ void
+ messageAccept(const SequenceSet& transfers=SequenceSet());
+
+
+ void
+ messageReject(const SequenceSet& transfers=SequenceSet(),
+ uint16_t code=0,
+ const string& text=string());
+
+
+ void
+ messageRelease(const SequenceSet& transfers=SequenceSet(),
+ bool setRedelivered=false);
+
+
+ qpid::framing::MessageAcquireResult
+ messageAcquire(const SequenceSet& transfers=SequenceSet());
+
+
+ qpid::framing::MessageResumeResult
+ messageResume(const string& destination=string(),
+ const string& resumeId=string());
+
+
+ void
+ messageSubscribe(const string& queue=string(),
+ const string& destination=string(),
+ uint8_t acceptMode=0,
+ uint8_t acquireMode=0,
+ bool exclusive=false,
+ const string& resumeId=string(),
+ uint64_t resumeTtl=0,
+ const FieldTable& arguments=FieldTable());
+
+
+ void
+ messageCancel(const string& destination=string());
+
+
+ void
+ messageSetFlowMode(const string& destination=string(),
+ uint8_t flowMode=0);
+
+
+ void
+ messageFlow(const string& destination=string(),
+ uint8_t unit=0,
+ uint32_t value=0);
+
+
+ void
+ messageFlush(const string& destination=string());
+
+
+ void
+ messageStop(const string& destination=string());
+
+
+ void
+ txSelect();
+
+
+ void
+ txCommit();
+
+
+ void
+ txRollback();
+
+
+ void
+ dtxSelect();
+
+
+ qpid::framing::XaResult
+ dtxStart(const Xid& xid=Xid(),
+ bool join=false,
+ bool resume=false);
+
+
+ qpid::framing::XaResult
+ dtxEnd(const Xid& xid=Xid(),
+ bool fail=false,
+ bool suspend=false);
+
+
+ qpid::framing::XaResult
+ dtxCommit(const Xid& xid=Xid(),
+ bool onePhase=false);
+
+
+ void
+ dtxForget(const Xid& xid=Xid());
+
+
+ qpid::framing::DtxGetTimeoutResult
+ dtxGetTimeout(const Xid& xid=Xid());
+
+
+ qpid::framing::XaResult
+ dtxPrepare(const Xid& xid=Xid());
+
+
+ qpid::framing::DtxRecoverResult
+ dtxRecover();
+
+
+ qpid::framing::XaResult
+ dtxRollback(const Xid& xid=Xid());
+
+
+ void
+ dtxSetTimeout(const Xid& xid=Xid(),
+ uint32_t timeout=0);
+
+
+ void
+ exchangeDeclare(const string& exchange=string(),
+ const string& type=string(),
+ const string& alternateExchange=string(),
+ bool passive=false,
+ bool durable=false,
+ bool autoDelete=false,
+ const FieldTable& arguments=FieldTable());
+
+
+ void
+ exchangeDelete(const string& exchange=string(),
+ bool ifUnused=false);
+
+
+ qpid::framing::ExchangeQueryResult
+ exchangeQuery(const string& name=string());
+
+
+ void
+ exchangeBind(const string& queue=string(),
+ const string& exchange=string(),
+ const string& bindingKey=string(),
+ const FieldTable& arguments=FieldTable());
+
+
+ void
+ exchangeUnbind(const string& queue=string(),
+ const string& exchange=string(),
+ const string& bindingKey=string());
+
+
+ qpid::framing::ExchangeBoundResult
+ exchangeBound(const string& exchange=string(),
+ const string& queue=string(),
+ const string& bindingKey=string(),
+ const FieldTable& arguments=FieldTable());
+
+
+ void
+ queueDeclare(const string& queue=string(),
+ const string& alternateExchange=string(),
+ bool passive=false,
+ bool durable=false,
+ bool exclusive=false,
+ bool autoDelete=false,
+ const FieldTable& arguments=FieldTable());
+
+
+ void
+ queueDelete(const string& queue=string(),
+ bool ifUnused=false,
+ bool ifEmpty=false);
+
+
+ void
+ queuePurge(const string& queue=string());
+
+
+ qpid::framing::QueueQueryResult
+ queueQuery(const string& queue=string());
+
+ // end Wrapped functions from Session ---------------------------
+
+ // Tells the FailoverSession to get ready for a failover.
+ void prepareForFailover ( Connection newConnection );
+
+ void failover ( );
+
+ FailoverSubscriptionManager * failoverSubscriptionManager;
+
+
+ private:
+
+
+ Session session;
+ Session newSession;
+
+ friend class FailoverConnection;
+ friend class FailoverSubscriptionManager;
+};
+
+}} // namespace qpid::client
+
+
+#endif /*!QPID_CLIENT_FAILOVERSESSION_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp Thu Oct 9 21:49:48 2008
@@ -0,0 +1,332 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/FailoverSession.h"
+#include "qpid/client/FailoverSubscriptionManager.h"
+
+
+
+using namespace std;
+
+
+namespace qpid {
+namespace client {
+
+
+
+FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) :
+ name("no_name"),
+ newSessionIsValid(false)
+{
+ subscriptionManager = new SubscriptionManager(fs->session);
+ fs->failoverSubscriptionManager = this;
+}
+
+
+
+void
+FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
+{
+ newSession = _newSession;
+ newSessionIsValid = true;
+}
+
+
+
+void
+FailoverSubscriptionManager::failover ( )
+{
+ subscriptionManager->stop();
+ // TODO -- save vector of boost bind fns.
+}
+
+
+
+
+FailoverSubscriptionManager::subscribeArgs::subscribeArgs
+ ( int _interface,
+ MessageListener * _listener,
+ LocalQueue * _localQueue,
+ const std::string * _queue,
+ const FlowControl * _flow,
+ const std::string * _tag
+ ) :
+ interface(_interface),
+ listener(_listener),
+ localQueue(_localQueue),
+ queue(_queue),
+ flow(_flow),
+ tag(_tag)
+{
+}
+
+
+
+
+void
+FailoverSubscriptionManager::subscribe ( MessageListener & listener,
+ const std::string & queue,
+ const FlowControl & flow,
+ const std::string & tag
+ )
+{
+ subscriptionManager->subscribe ( listener,
+ queue,
+ flow,
+ tag
+ );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) );
+}
+
+
+
+void
+FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
+ const std::string & queue,
+ const FlowControl & flow,
+ const std::string & tag
+ )
+{
+ subscriptionManager->subscribe ( localQueue,
+ queue,
+ flow,
+ tag
+ );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) );
+}
+
+
+
+void
+FailoverSubscriptionManager::subscribe ( MessageListener & listener,
+ const std::string & queue,
+ const std::string & tag
+ )
+{
+ subscriptionManager->subscribe ( listener,
+ queue,
+ tag
+ );
+ // TODO -- more than one subscription
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) );
+}
+
+
+
+
+void
+FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
+ const std::string & queue,
+ const std::string & tag
+ )
+{
+ subscriptionManager->subscribe ( localQueue,
+ queue,
+ tag
+ );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) );
+}
+
+
+
+bool
+FailoverSubscriptionManager::get ( Message & result,
+ const std::string & queue,
+ sys::Duration timeout
+ )
+{
+ return subscriptionManager->get ( result, queue, timeout );
+}
+
+
+
+void
+FailoverSubscriptionManager::cancel ( const std::string tag )
+{
+ subscriptionManager->cancel ( tag );
+}
+
+
+
+void
+FailoverSubscriptionManager::run ( ) // User Thread
+{
+ // FIXME mgoulish -- wait on a monitor here instead of this infinite loop
+ while ( 1 )
+ {
+ subscriptionManager->run ( );
+
+ // When we drop out of run, if there is a new Session
+ // waiting for us, this is a failover. Otherwise, just
+ // return control to usercode.
+ sleep(1); // FIXME mgoulish -- get rid of this when we have wait-on-monitor.
+
+ if ( newSessionIsValid )
+ {
+ delete subscriptionManager;
+ subscriptionManager = new SubscriptionManager(newSession);
+ // FIXME mgoulish make this an array of boost bind fns
+ //
+ for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin();
+ i < subscribeFns.end();
+ ++ i
+ )
+ {
+ std::cerr << "MDEBUG new new resubscribe.\n";
+ (*i) ();
+ }
+
+ newSessionIsValid = false;
+ }
+ else
+ {
+ // break; TODO -- fix this
+ }
+ }
+
+}
+
+
+
+void
+FailoverSubscriptionManager::start ( )
+{
+ subscriptionManager->start ( );
+}
+
+
+
+void
+FailoverSubscriptionManager::setAutoStop ( bool set )
+{
+ subscriptionManager->setAutoStop ( set );
+}
+
+
+
+void
+FailoverSubscriptionManager::stop ( )
+{
+ subscriptionManager->stop ( );
+}
+
+
+
+void
+FailoverSubscriptionManager::setFlowControl ( const std::string & destination,
+ const FlowControl & flow
+ )
+{
+ subscriptionManager->setFlowControl ( destination, flow );
+}
+
+
+
+void
+FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow )
+{
+ subscriptionManager->setFlowControl ( flow );
+}
+
+
+
+const FlowControl &
+FailoverSubscriptionManager::getFlowControl ( ) const
+{
+ return subscriptionManager->getFlowControl ( );
+}
+
+
+
+
+void
+FailoverSubscriptionManager::setFlowControl ( const std::string & tag,
+ uint32_t messages,
+ uint32_t bytes,
+ bool window
+ )
+{
+ subscriptionManager->setFlowControl ( tag,
+ messages,
+ bytes,
+ window
+ );
+}
+
+
+
+void
+FailoverSubscriptionManager::setFlowControl ( uint32_t messages,
+ uint32_t bytes,
+ bool window
+ )
+{
+ subscriptionManager->setFlowControl ( messages,
+ bytes,
+ window
+ );
+}
+
+
+
+void
+FailoverSubscriptionManager::setAcceptMode ( bool required )
+{
+ subscriptionManager->setAcceptMode ( required );
+}
+
+
+
+void
+FailoverSubscriptionManager::setAcquireMode ( bool acquire )
+{
+ subscriptionManager->setAcquireMode ( acquire );
+}
+
+
+
+void
+FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck )
+{
+ subscriptionManager->setAckPolicy ( autoAck );
+}
+
+
+
+AckPolicy &
+FailoverSubscriptionManager::getAckPolicy()
+{
+ return subscriptionManager->getAckPolicy ( );
+}
+
+
+
+void
+FailoverSubscriptionManager::registerFailoverHandler ( boost::function<void ()> /* fh */ )
+{
+ // FIXME mgoulish -- get rid of this mechanism -- i think it's unused.
+}
+
+
+
+
+
+}} // namespace qpid::client
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h?rev=703319&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h Thu Oct 9 21:49:48 2008
@@ -0,0 +1,162 @@
+#ifndef QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H
+#define QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "qpid/sys/Mutex.h"
+#include <qpid/client/Dispatcher.h>
+#include <qpid/client/Completion.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/FailoverSession.h>
+#include <qpid/client/MessageListener.h>
+#include <qpid/client/SubscriptionManager.h>
+#include <qpid/client/LocalQueue.h>
+#include <qpid/client/FlowControl.h>
+#include <qpid/sys/Runnable.h>
+
+
+
+
+namespace qpid {
+namespace client {
+
+
+class FailoverSubscriptionManager
+{
+ public:
+
+ FailoverSubscriptionManager ( FailoverSession * fs );
+
+ void foo ( int& arg_1 );
+
+ void subscribe ( MessageListener & listener,
+ const std::string & queue,
+ const FlowControl & flow,
+ const std::string & tag = std::string() );
+
+ void subscribe ( LocalQueue & localQueue,
+ const std::string & queue,
+ const FlowControl & flow,
+ const std::string & tag=std::string());
+
+ void subscribe ( MessageListener & listener,
+ const std::string & queue,
+ const std::string & tag = std::string());
+
+ void subscribe ( LocalQueue & localQueue,
+ const std::string & queue,
+ const std::string & tag=std::string());
+
+ bool get ( Message & result,
+ const std::string & queue,
+ sys::Duration timeout=0);
+
+ void cancel ( const std::string tag );
+
+ void run ( );
+
+ void start ( );
+
+ void setAutoStop ( bool set = true );
+
+ void stop ( );
+
+ void setFlowControl ( const std::string & destintion,
+ const FlowControl & flow );
+
+ void setFlowControl ( const FlowControl & flow );
+
+ const FlowControl & getFlowControl ( ) const;
+
+ void setFlowControl ( const std::string & tag,
+ uint32_t messages,
+ uint32_t bytes,
+ bool window=true );
+
+ void setFlowControl ( uint32_t messages,
+ uint32_t bytes,
+ bool window = true
+ );
+
+ void setAcceptMode ( bool required );
+
+ void setAcquireMode ( bool acquire );
+
+ void setAckPolicy ( const AckPolicy & autoAck );
+
+ AckPolicy & getAckPolicy();
+
+ void registerFailoverHandler ( boost::function<void ()> fh );
+
+ // Get ready for a failover.
+ void prepareForFailover ( Session newSession );
+ void failover ( );
+
+ std::string name;
+
+
+ private:
+
+ SubscriptionManager * subscriptionManager;
+
+ MessageListener * savedListener;
+ std::string savedQueue,
+ savedTag;
+
+ friend class FailoverConnection;
+ friend class FailoverSession;
+
+ Session newSession;
+ bool newSessionIsValid;
+
+ /*
+ * */
+ typedef boost::function<void ()> subscribeFn;
+ std::vector < subscribeFn > subscribeFns;
+
+ struct subscribeArgs
+ {
+ int interface;
+ MessageListener * listener;
+ LocalQueue * localQueue;
+ const std::string * queue;
+ const FlowControl * flow;
+ const std::string * tag;
+
+ subscribeArgs ( int _interface,
+ MessageListener *,
+ LocalQueue *,
+ const std::string *,
+ const FlowControl *,
+ const std::string *
+ );
+ };
+
+ std::vector < subscribeArgs * > subscriptionReplayVector;
+
+};
+
+}} // namespace qpid::client
+
+
+#endif /*!QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Thu Oct 9 21:49:48 2008
@@ -147,6 +147,12 @@
return lq.get(result, 0);
}
+Session SubscriptionManager::getSession() const { return session; }
+
+void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) {
+ dispatcher.registerFailoverHandler(fh);
+}
+
}} // namespace qpid::client
#endif
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Thu Oct 9 21:49:48 2008
@@ -190,7 +190,7 @@
/** Set the acquire-mode for new subscriptions. Defaults to false.
*@param acquire: if false messages pre-acquired, if true
* messages are dequed on acknowledgement or on transfer
- * depending on acceptMode.
+ * depending on acceptMode.
*/
void setAcquireMode(bool acquire);
@@ -199,9 +199,11 @@
*/
void setAckPolicy(const AckPolicy& autoAck);
- AckPolicy& getAckPolicy();
+ AckPolicy& getAckPolicy();
- Session getSession() const { return session; }
+ void registerFailoverHandler ( boost::function<void ()> fh );
+
+ Session getSession() const;
};
/** AutoCancel cancels a subscription in its destructor */
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=703319&r1=703318&r2=703319&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Oct 9 21:49:48 2008
@@ -110,6 +110,7 @@
// FIXME aconway 2008-09-24:
// if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
}
+ broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
failoverExchange.reset(new FailoverExchange(this));
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
cpgDispatchHandle.startWatch(poller);