You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/10/26 21:11:09 UTC
svn commit: r829931 - in /qpid/trunk/qpid/cpp: include/qpid/client/ src/
src/qpid/client/ src/qpid/client/amqp0_10/ src/tests/
Author: aconway
Date: Mon Oct 26 20:11:08 2009
New Revision: 829931
URL: http://svn.apache.org/viewvc?rev=829931&view=rev
Log:
Separate FailoverListener from client::Connection.
client::ConnectionImpl used to contain a FailoverListener to subscribe
for updates on the amq.failover exchange. This caused some lifecycle
issues including memory leaks.
Now FailoverListener is a public API class that the user must create
associated with a session to get known-broker updates.
Removed the weak_ptr logic in client::SessionImpl which was only
required because of FailoverListener.
Made SessionImpl::close() idempotent. Gets rid of spurious warning
messages in some tests.
Added:
qpid/trunk/qpid/cpp/include/qpid/client/FailoverListener.h
- copied, changed from r829912, qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
Removed:
qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
Modified:
qpid/trunk/qpid/cpp/include/qpid/client/Connection.h
qpid/trunk/qpid/cpp/include/qpid/client/FailoverManager.h
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp
qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp
qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
Modified: qpid/trunk/qpid/cpp/include/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/client/Connection.h?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/client/Connection.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/client/Connection.h Mon Oct 26 20:11:08 2009
@@ -200,7 +200,11 @@
QPID_CLIENT_EXTERN bool isOpen() const;
- QPID_CLIENT_EXTERN std::vector<Url> getKnownBrokers();
+ /** In a cluster, returns the initial set of known broker URLs
+ * at the time of connection.
+ */
+ QPID_CLIENT_EXTERN std::vector<Url> getInitialBrokers();
+
QPID_CLIENT_EXTERN void registerFailureCallback ( boost::function<void ()> fn );
/**
Copied: qpid/trunk/qpid/cpp/include/qpid/client/FailoverListener.h (from r829912, qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/client/FailoverListener.h?p2=qpid/trunk/qpid/cpp/include/qpid/client/FailoverListener.h&p1=qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h&r1=829912&r2=829931&rev=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/client/FailoverListener.h Mon Oct 26 20:11:08 2009
@@ -22,7 +22,11 @@
*
*/
+#include "qpid/client/ClientImportExport.h"
#include "qpid/client/MessageListener.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Runnable.h"
@@ -32,26 +36,43 @@
namespace qpid {
namespace client {
-class SubscriptionManager;
-class ConnectionImpl;
/**
- * @internal Listen for failover updates from the amq.failover exchange.
+ * Listen for updates from the amq.failover exchange.
+ *
+ * In a cluster, the amq.failover exchange provides updates whenever
+ * the cluster membership changes. This class subscribes to the
+ * failover exchange and providees the latest list of known brokers.
+ *
+ * You can also subscribe to amq.failover yourself and use
+ * FailoverListener::decode to extract a list of broker URLs from a
+ * failover exchange message.
*/
-class FailoverListener : public MessageListener, private qpid::sys::Runnable
+class FailoverListener : private MessageListener, private qpid::sys::Runnable
{
public:
- FailoverListener(const boost::shared_ptr<ConnectionImpl>&, const std::vector<Url>& initUrls);
- ~FailoverListener();
- void stop();
+ /** The name of the standard failover exchange amq.failover */
+ static QPID_CLIENT_EXTERN const std::string AMQ_FAILOVER;
- std::vector<Url> getKnownBrokers() const;
- void received(Message& msg);
- void run();
+ /** Extract the broker list from a failover exchange message */
+ static QPID_CLIENT_EXTERN std::vector<Url> getKnownBrokers(const Message& m);
+
+ /** Subscribe to amq.failover exchange. */
+ QPID_CLIENT_EXTERN FailoverListener(Connection);
+
+ QPID_CLIENT_EXTERN ~FailoverListener();
+
+ /** Returns the latest list of known broker URLs. */
+ QPID_CLIENT_EXTERN std::vector<Url> getKnownBrokers() const;
private:
+ void received(Message& msg);
+ void run();
+
mutable sys::Mutex lock;
- std::auto_ptr<SubscriptionManager> subscriptions;
+ Connection connection;
+ Session session;
+ SubscriptionManager subscriptions;
sys::Thread thread;
std::vector<Url> knownBrokers;
};
Modified: qpid/trunk/qpid/cpp/include/qpid/client/FailoverManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/client/FailoverManager.h?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/client/FailoverManager.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/client/FailoverManager.h Mon Oct 26 20:11:08 2009
@@ -22,12 +22,13 @@
*
*/
-#include "qpid/client/Connection.h"
-#include "qpid/client/ConnectionSettings.h"
#include "qpid/Exception.h"
#include "qpid/client/AsyncSession.h"
-#include "qpid/sys/Monitor.h"
#include "qpid/client/ClientImportExport.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/client/FailoverListener.h"
+#include "qpid/sys/Monitor.h"
#include <vector>
namespace qpid {
@@ -123,6 +124,7 @@
qpid::sys::Monitor lock;
Connection connection;
+ std::auto_ptr<FailoverListener> failoverListener;
ConnectionSettings settings;
ReconnectionStrategy* strategy;
State state;
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Oct 26 20:11:08 2009
@@ -652,7 +652,6 @@
qpid/client/Dispatcher.h \
qpid/client/Execution.h \
qpid/client/FailoverListener.cpp \
- qpid/client/FailoverListener.h \
qpid/client/FailoverManager.cpp \
qpid/client/Future.cpp \
qpid/client/FutureCompletion.cpp \
@@ -744,6 +743,7 @@
../include/qpid/client/Completion.h \
../include/qpid/client/Connection.h \
../include/qpid/client/ConnectionSettings.h \
+ ../include/qpid/client/FailoverListener.h \
../include/qpid/client/FailoverManager.h \
../include/qpid/client/FlowControl.h \
../include/qpid/client/Future.h \
Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Mon Oct 26 20:11:08 2009
@@ -152,8 +152,8 @@
impl->close();
}
-std::vector<Url> Connection::getKnownBrokers() {
- return impl ? impl->getKnownBrokers() : std::vector<Url>();
+std::vector<Url> Connection::getInitialBrokers() {
+ return impl ? impl->getInitialBrokers() : std::vector<Url>();
}
}} // namespace qpid::client
Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Mon Oct 26 20:11:08 2009
@@ -22,7 +22,6 @@
#include "qpid/client/Connector.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/client/SessionImpl.h"
-#include "qpid/client/FailoverListener.h"
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
@@ -88,7 +87,6 @@
// Important to close the connector first, to ensure the
// connector thread does not call on us while the destructor
// is running.
- failover.reset();
if (connector) connector->close();
}
@@ -175,7 +173,6 @@
} else {
QPID_LOG(debug, "No security layer in place");
}
- failover.reset(new FailoverListener(shared_from_this(), handler.knownBrokersUrls));
}
void ConnectionImpl::idleIn()
@@ -256,8 +253,8 @@
return handler;
}
-std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
- return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls;
+std::vector<qpid::Url> ConnectionImpl::getInitialBrokers() {
+ return handler.knownBrokersUrls;
}
boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& name, uint32_t timeout, uint16_t channel) {
@@ -267,6 +264,4 @@
return simpl;
}
-void ConnectionImpl::stopFailoverListener() { failover->stop(); }
-
}} // namespace qpid::client
Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Mon Oct 26 20:11:08 2009
@@ -42,7 +42,6 @@
class Connector;
struct ConnectionSettings;
class SessionImpl;
-class FailoverListener;
class ConnectionImpl : public Bounds,
public framing::FrameHandler,
@@ -58,7 +57,6 @@
SessionMap sessions;
ConnectionHandler handler;
boost::scoped_ptr<Connector> connector;
- boost::scoped_ptr<FailoverListener> failover;
framing::ProtocolVersion version;
uint16_t nextChannel;
sys::Mutex lock;
@@ -90,9 +88,8 @@
void erase(uint16_t channel);
const ConnectionSettings& getNegotiatedSettings();
- std::vector<Url> getKnownBrokers();
+ std::vector<Url> getInitialBrokers();
void registerFailureCallback ( boost::function<void ()> fn ) { failureCallback = fn; }
- void stopFailoverListener();
framing::ProtocolVersion getVersion() { return version; }
};
Modified: qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp Mon Oct 26 20:11:08 2009
@@ -19,11 +19,7 @@
*
*/
#include "qpid/client/FailoverListener.h"
-#include "qpid/client/SessionBase_0_10Access.h"
-#include "qpid/client/SessionImpl.h"
-#include "qpid/client/ConnectionImpl.h"
-#include "qpid/client/SubscriptionImpl.h"
-#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Session.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include "qpid/log/Helpers.h"
@@ -31,83 +27,46 @@
namespace qpid {
namespace client {
-static const std::string AMQ_FAILOVER("amq.failover");
+const std::string FailoverListener::AMQ_FAILOVER("amq.failover");
-static Session makeSession(boost::shared_ptr<SessionImpl> si) {
- // Hold only a weak pointer to the ConnectionImpl so a
- // FailoverListener in a ConnectionImpl won't createa a shared_ptr
- // cycle.
- //
- si->setWeakPtr(true);
- Session s;
- SessionBase_0_10Access(s).set(si);
- return s;
-}
-
-FailoverListener::FailoverListener(const boost::shared_ptr<ConnectionImpl>& c, const std::vector<Url>& initUrls)
- : knownBrokers(initUrls)
- {
- // Special versions used to mark cluster catch-up connections
- // which do not need a FailoverListener
- if (c->getVersion().getMajor() >= 0x80) {
- QPID_LOG(debug, "No failover listener for catch-up connection.");
- return;
- }
-
- Session session = makeSession(c->newSession(AMQ_FAILOVER+framing::Uuid(true).str(), 0));
+FailoverListener::FailoverListener(Connection c) :
+ connection(c),
+ session(c.newSession(AMQ_FAILOVER+"."+framing::Uuid(true).str())),
+ subscriptions(session)
+{
+ knownBrokers = c.getInitialBrokers();
if (session.exchangeQuery(arg::name=AMQ_FAILOVER).getNotFound()) {
session.close();
return;
}
- subscriptions.reset(new SubscriptionManager(session));
std::string qname=session.getId().getName();
session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true);
session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
- subscriptions->subscribe(*this, qname, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE));
+ subscriptions.subscribe(*this, qname, SubscriptionSettings(FlowControl::unlimited(),
+ ACCEPT_MODE_NONE));
thread = sys::Thread(*this);
}
-void FailoverListener::run()
-{
+void FailoverListener::run() {
try {
- subscriptions->run();
- } catch (const TransportFailure&) {
- } catch (const std::exception& e) {
- QPID_LOG(error, QPID_MSG(e.what()));
- }
+ subscriptions.run();
+ } catch(...) {}
}
FailoverListener::~FailoverListener() {
- try { stop(); }
- catch (const std::exception& /*e*/) {}
-}
-
-void FailoverListener::stop() {
- if (subscriptions.get())
- subscriptions->stop();
-
- if (thread.id() == sys::Thread::current().id()) {
- // FIXME aconway 2008-10-16: this can happen if ConnectionImpl
- // dtor runs when my session drops its weak pointer lock.
- // For now, leak subscriptions to prevent a core if we delete
- // without joining.
- subscriptions.release();
- }
- else if (thread.id()) {
+ try {
+ subscriptions.stop();
thread.join();
- thread=sys::Thread();
- subscriptions.reset(); // Safe to delete after join.
- }
+ if (connection.isOpen()) {
+ session.sync();
+ session.close();
+ }
+ } catch (...) {}
}
void FailoverListener::received(Message& msg) {
sys::Mutex::ScopedLock l(lock);
- knownBrokers.clear();
- framing::Array urlArray;
- msg.getHeaders().getArray("amq.failover", urlArray);
- for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i != urlArray.end(); ++i )
- knownBrokers.push_back(Url((*i)->get<std::string>()));
- QPID_LOG(info, "Known-brokers update: " << log::formatList(knownBrokers));
+ knownBrokers = getKnownBrokers(msg);
}
std::vector<Url> FailoverListener::getKnownBrokers() const {
@@ -115,4 +74,16 @@
return knownBrokers;
}
+std::vector<Url> FailoverListener::getKnownBrokers(const Message& msg) {
+ std::vector<Url> knownBrokers;
+ framing::Array urlArray;
+ msg.getHeaders().getArray("amq.failover", urlArray);
+ for (framing::Array::ValueVector::const_iterator i = urlArray.begin();
+ i != urlArray.end();
+ ++i )
+ knownBrokers.push_back(Url((*i)->get<std::string>()));
+ return knownBrokers;
+}
+
+
}} // namespace qpid::client
Modified: qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp Mon Oct 26 20:11:08 2009
@@ -77,7 +77,9 @@
} else {
state = CONNECTING;
Connection c;
- attempt(c, settings, brokers.empty() ? connection.getKnownBrokers() : brokers);
+ if (brokers.empty() && failoverListener.get())
+ brokers = failoverListener->getKnownBrokers();
+ attempt(c, settings, brokers);
if (c.isOpen()) state = IDLE;
else state = CANT_CONNECT;
connection = c;
@@ -118,6 +120,7 @@
try {
QPID_LOG(info, "Attempting to connect to " << s.host << " on " << s.port << "...");
c.open(s);
+ failoverListener.reset(new FailoverListener(c));
QPID_LOG(info, "Connected to " << s.host << " on " << s.port);
} catch (const Exception& e) {
QPID_LOG(info, "Could not connect to " << s.host << " on " << s.port << ": " << e.what());
Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Mon Oct 26 20:11:08 2009
@@ -57,9 +57,7 @@
detachedLifetime(0),
maxFrameSize(conn->getNegotiatedSettings().maxFrameSize),
id(conn->getNegotiatedSettings().username, name.empty() ? Uuid(true).str() : name),
- connectionShared(conn),
- connectionWeak(conn),
- weakPtr(false),
+ connection(conn),
ioHandler(*this),
proxy(ioHandler),
nextIn(0),
@@ -68,7 +66,7 @@
doClearDeliveryPropertiesExchange(true),
autoDetach(true)
{
- channel.next = connectionShared.get();
+ channel.next = connection.get();
}
SessionImpl::~SessionImpl() {
@@ -87,8 +85,7 @@
}
delete sendMsgCredit;
}
- boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock();
- if (c) c->erase(channel);
+ connection->erase(channel);
}
@@ -122,6 +119,7 @@
void SessionImpl::close() //user thread
{
Lock l(state);
+ if (state == DETACHED || state == DETACHING) return;
if (detachedLifetime) setTimeout(0);
detach();
waitFor(DETACHED);
@@ -129,8 +127,6 @@
void SessionImpl::resume(boost::shared_ptr<ConnectionImpl>) // user thread
{
- // weakPtr sessions should not be resumed.
- if (weakPtr) return;
throw NotImplementedException("Resume not yet implemented by client!");
}
@@ -509,11 +505,8 @@
void SessionImpl::sendFrame(AMQFrame& frame, bool canBlock)
{
- boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock();
- if (c) {
- channel.handle(frame);
- c->expand(frame.encodedSize(), canBlock);
- }
+ channel.handle(frame);
+ connection->expand(frame.encodedSize(), canBlock);
}
void SessionImpl::deliver(AMQFrame& frame) // network thread
@@ -809,17 +802,9 @@
return detachedLifetime;
}
-void SessionImpl::setWeakPtr(bool weak) {
- weakPtr = weak;
- if (weakPtr)
- connectionShared.reset(); // Only keep weak pointer
- else
- connectionShared = connectionWeak.lock();
-}
-
boost::shared_ptr<ConnectionImpl> SessionImpl::getConnection()
{
- return connectionWeak.lock();
+ return connection;
}
void SessionImpl::disableAutoDetach() { autoDetach = false; }
Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Mon Oct 26 20:11:08 2009
@@ -120,11 +120,6 @@
/** Get timeout in seconds. */
uint32_t getTimeout() const;
- /** Make this session use a weak_ptr to the ConnectionImpl.
- * Used for sessions created by the ConnectionImpl itself.
- */
- void setWeakPtr(bool weak=true);
-
/**
* get the Connection associated with this connection
*/
@@ -224,9 +219,7 @@
const uint64_t maxFrameSize;
const SessionId id;
- boost::shared_ptr<ConnectionImpl> connectionShared;
- boost::weak_ptr<ConnectionImpl> connectionWeak;
- bool weakPtr;
+ boost::shared_ptr<ConnectionImpl> connection;
framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
framing::ChannelHandler channel;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Mon Oct 26 20:11:08 2009
@@ -136,7 +136,9 @@
bool ConnectionImpl::tryConnect()
{
- if (tryConnect(url) || tryConnect(connection.getKnownBrokers())) {
+ if (tryConnect(url) ||
+ (failoverListener.get() && tryConnect(failoverListener->getKnownBrokers())))
+ {
return resetSessions();
} else {
return false;
@@ -148,6 +150,7 @@
try {
QPID_LOG(info, "Trying to connect to " << url << "...");
connection.open(u, settings);
+ failoverListener.reset(new FailoverListener(connection));
return true;
} catch (const Exception& e) {
//TODO: need to fix timeout on open so that it throws TransportFailure
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Mon Oct 26 20:11:08 2009
@@ -25,6 +25,7 @@
#include "qpid/messaging/Variant.h"
#include "qpid/Url.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/FailoverListener.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Semaphore.h"
@@ -50,6 +51,7 @@
qpid::sys::Mutex lock;//used to protect data structures
qpid::sys::Semaphore semaphore;//used to coordinate reconnection
qpid::client::Connection connection;
+ std::auto_ptr<FailoverListener> failoverListener;
qpid::Url url;
qpid::client::ConnectionSettings settings;
Sessions sessions;
Modified: qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Mon Oct 26 20:11:08 2009
@@ -126,8 +126,8 @@
ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string())
: connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {}
- ~ClientT() { connection.close(); }
- void close() { session.close(); connection.close(); }
+ ~ClientT() { close(); }
+ void close() { if (connection.isOpen()) { session.close(); connection.close(); } }
};
typedef ClientT<> Client;
Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp Mon Oct 26 20:11:08 2009
@@ -141,13 +141,14 @@
* Get the known broker ports from a Connection.
*@param n if specified wait for the cluster size to be n, up to a timeout.
*/
-std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n) {
- std::vector<qpid::Url> urls = source.getKnownBrokers();
+std::set<int> knownBrokerPorts(qpid::client::Connection& c, int n) {
+ FailoverListener fl(c);
+ std::vector<qpid::Url> urls = fl.getKnownBrokers();
if (n >= 0 && unsigned(n) != urls.size()) {
// Retry up to 10 secs in .1 second intervals.
for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
qpid::sys::usleep(1000*100); // 0.1 secs
- urls = source.getKnownBrokers();
+ urls = fl.getKnownBrokers();
}
}
std::set<int> s;
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Mon Oct 26 20:11:08 2009
@@ -358,13 +358,15 @@
rollbackSession.txRollback();
rollbackSession.messageRelease(rollbackMessage.getId());
-
// Verify queue status: just the comitted messages and dequeues should remain.
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B");
BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a");
BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b");
BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c");
+
+ commitSession.close();
+ rollbackSession.close();
}
QPID_AUTO_TEST_CASE(testUnacked) {
@@ -859,9 +861,12 @@
Receiver receiver(fmgr, "my-queue", "my-data");
qpid::sys::Thread runner(receiver);
receiver.waitForReady();
- cluster.kill(1);
- //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection:
- ::usleep(2*1000*1000);
+ {
+ ScopedSuppressLogging allQuiet; // suppress connection closed messages
+ cluster.kill(1);
+ //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection:
+ ::usleep(2*1000*1000);
+ }
fmgr.execute(sender);
runner.join();
BOOST_CHECK(!receiver.failed);
Modified: qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Mon Oct 26 20:11:08 2009
@@ -112,8 +112,8 @@
Catcher<TransportFailure> runner(bind(&SubscriptionManager::run, boost::ref(fix.subs)));
fix.connection.proxy.close();
- runner.join();
- BOOST_CHECK_THROW(fix.session.close(), TransportFailure);
+ runner.join();
+ BOOST_CHECK_THROW(fix.session.queueDeclare(arg::queue="x"), TransportFailure);
}
QPID_AUTO_TEST_CASE(NoSuchQueueTest) {
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org